• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python modularinput_testlib.data_open函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tests.modularinput.modularinput_testlib.data_open函数的典型用法代码示例。如果您正苦于以下问题:Python data_open函数的具体用法?Python data_open怎么用?Python data_open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了data_open函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_failed_validation

    def test_failed_validation(self):
        """Check that failed validation writes sensible XML to stdout."""

        # Override abstract methods
        class NewScript(Script):
            def get_scheme(self):
                return None

            def validate_input(self, definition):
                raise ValueError("Big fat validation error!")

            def stream_events(self, inputs, ew):
                # unused
                return

        script = NewScript()

        out = BytesIO()
        err = BytesIO()
        ew = EventWriter(out, err)

        args = [TEST_SCRIPT_PATH, "--validate-arguments"]

        return_value = script.run_script(args, ew, data_open("data/validation.xml"))

        expected = ET.parse(data_open("data/validation_error.xml")).getroot()
        found = ET.fromstring(out.getvalue().decode('utf-8'))

        self.assertEqual(b"", err.getvalue())
        self.assertTrue(xml_compare(expected, found))
        self.assertNotEqual(0, return_value)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:31,代码来源:test_script.py


示例2: test_writing_events_on_event_writer

    def test_writing_events_on_event_writer(self):
        """Write a pair of events with an EventWriter, and ensure that they
        are being encoded immediately and correctly onto the output stream"""
        out = StringIO()
        err = StringIO()

        ew = EventWriter(out, err)

        e = Event(
            data="This is a test of the emergency broadcast system.",
            stanza="fubar",
            time="%.3f" % 1372275124.466,
            host="localhost",
            index="main",
            source="hilda",
            sourcetype="misc",
            done=True,
            unbroken=True
        )
        ew.write_event(e)

        found = ET.fromstring("%s</stream>" % out.getvalue())
        expected = ET.parse(data_open("data/stream_with_one_event.xml")).getroot()

        self.assertTrue(xml_compare(expected, found))
        self.assertEqual(err.getvalue(), "")

        ew.write_event(e)
        ew.close()

        found = ET.fromstring(out.getvalue())
        expected = ET.parse(data_open("data/stream_with_two_events.xml")).getroot()

        self.assertTrue(xml_compare(expected, found))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:34,代码来源:test_event.py


示例3: test_parse_inputdef_with_two_inputs

    def test_parse_inputdef_with_two_inputs(self):
        """Check parsing of XML that contains 2 inputs"""

        found = InputDefinition.parse(data_open("data/conf_with_2_inputs.xml"))

        expectedDefinition = InputDefinition()
        expectedDefinition.metadata = {
            "server_host": "tiny",
            "server_uri": "https://127.0.0.1:8089",
            "checkpoint_dir": "/some/dir",
            "session_key": "123102983109283019283"
        }
        expectedDefinition.inputs["foobar://aaa"] = {
            "param1": "value1",
            "param2": "value2",
            "disabled": "0",
            "index": "default"
        }
        expectedDefinition.inputs["foobar://bbb"] = {
            "param1": "value11",
            "param2": "value22",
            "disabled": "0",
            "index": "default",
            "multiValue": ["value1", "value2"],
            "multiValue2": ["value3", "value4"]
        }

        self.assertEqual(expectedDefinition, found)
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:28,代码来源:test_input_definition.py


示例4: test_successful_validation

    def test_successful_validation(self):
        """Check that successful validation yield no text and a 0 exit value."""

        # Override abstract methods
        class NewScript(Script):
            def get_scheme(self):
                return None

            def validate_input(self, definition):
                # always succeed...
                return

            def stream_events(self, inputs, ew):
                # unused
                return

        script = NewScript()

        out = StringIO()
        err = StringIO()
        ew = EventWriter(out, err)

        args = [TEST_SCRIPT_PATH, "--validate-arguments"]

        return_value = script.run_script(args, ew, data_open("data/validation.xml"))

        self.assertEqual("", err.getvalue())
        self.assertEqual("", out.getvalue())
        self.assertEqual(0, return_value)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:29,代码来源:test_script.py


示例5: test_generate_xml_from_scheme

    def test_generate_xml_from_scheme(self):
        """Checks that the XML generated by a Scheme object with all its fields set and
        some arguments added matches what we expect."""

        scheme = Scheme("abcd")
        scheme.description = u"쎼 and 쎶 and <&> für"
        scheme.streaming_mode = Scheme.streaming_mode_simple
        scheme.use_external_validation = "false"
        scheme.use_single_instance = "true"

        arg1 = Argument(name="arg1")
        scheme.add_argument(arg1)

        arg2 = Argument(
            name="arg2",
            description=u"쎼 and 쎶 and <&> für",
            validation="is_pos_int('some_name')",
            data_type=Argument.data_type_number,
            required_on_edit=True,
            required_on_create=True
        )
        scheme.add_argument(arg2)

        constructed = scheme.to_xml()
        expected = ET.parse(data_open("data/scheme_without_defaults.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:27,代码来源:test_scheme.py


示例6: test_generate_xml_from_scheme_with_default_values

    def test_generate_xml_from_scheme_with_default_values(self):
        """Checks the Scheme generated by creating a Scheme object and setting no fields on it.
        This test checks for sane defaults in the class."""

        scheme = Scheme("abcd")

        constructed = scheme.to_xml()
        expected = ET.parse(data_open("data/scheme_with_defaults.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:10,代码来源:test_scheme.py


示例7: test_generate_xml_from_argument_with_default_values

    def test_generate_xml_from_argument_with_default_values(self):
        """Checks that the XML produced from an Argument class that is initialized but has no additional manipulations
        made to it is what we expect. This is mostly a check of the default values."""

        argument = Argument("some_name")

        root = ET.Element("")
        constructed = argument.add_to_document(root)

        expected = ET.parse(data_open("data/argument_with_defaults.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:12,代码来源:test_scheme.py


示例8: test_write_events

    def test_write_events(self):
        """Check that passing an input definition and writing a couple events goes smoothly."""

        # Override abstract methods
        class NewScript(Script):
            def get_scheme(self):
                return None

            def stream_events(self, inputs, ew):
                event = Event(
                    data="This is a test of the emergency broadcast system.",
                    stanza="fubar",
                    time="%.3f" % 1372275124.466,
                    host="localhost",
                    index="main",
                    source="hilda",
                    sourcetype="misc",
                    done=True,
                    unbroken=True
                )

                ew.write_event(event)
                ew.write_event(event)

        script = NewScript()
        input_configuration = data_open("data/conf_with_2_inputs.xml")

        out = BytesIO()
        err = BytesIO()
        ew = EventWriter(out, err)

        return_value = script.run_script([TEST_SCRIPT_PATH], ew, input_configuration)

        self.assertEqual(0, return_value)
        self.assertEqual(b"", err.getvalue())

        expected = ET.parse(data_open("data/stream_with_two_events.xml")).getroot()
        found = ET.fromstring(out.getvalue())

        self.assertTrue(xml_compare(expected, found))
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:40,代码来源:test_script.py


示例9: test_write_xml_is_sane

    def test_write_xml_is_sane(self):
        """Check that EventWriter.write_xml_document writes sensible
        XML to the output stream."""
        out = StringIO()
        err = StringIO()

        ew = EventWriter(out, err)

        expected_xml = ET.parse(data_open("data/event_maximal.xml")).getroot()

        ew.write_xml_document(expected_xml)
        found_xml = ET.fromstring(out.getvalue())

        self.assertTrue(xml_compare(expected_xml, found_xml))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:14,代码来源:test_event.py


示例10: test_parse_inputdef_with_zero_inputs

    def test_parse_inputdef_with_zero_inputs(self):
        """Check parsing of XML that contains only metadata"""

        found = InputDefinition.parse(data_open("data/conf_with_0_inputs.xml"))

        expectedDefinition = InputDefinition()
        expectedDefinition.metadata = {
            "server_host": "tiny",
            "server_uri": "https://127.0.0.1:8089",
            "checkpoint_dir": "/some/dir",
            "session_key": "123102983109283019283"
        }

        self.assertEqual(found, expectedDefinition)
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:14,代码来源:test_input_definition.py


示例11: test_xml_of_event_with_minimal_configuration

    def test_xml_of_event_with_minimal_configuration(self):
        """Generate XML from an event object with a small number of fields,
        and see if it matches what we expect."""
        stream = StringIO()

        event = Event(
            data="This is a test of the emergency broadcast system.", stanza="fubar", time="%.3f" % 1372187084.000
        )

        event.write_to(stream)

        constructed = ET.fromstring(stream.getvalue())
        expected = ET.parse(data_open("data/event_minimal.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:15,代码来源:test_event.py


示例12: test_scheme_properly_generated_by_script

    def test_scheme_properly_generated_by_script(self):
        """Check that a scheme generated by a script is what we expect."""

        # Override abstract methods
        class NewScript(Script):
            def get_scheme(self):
                scheme = Scheme("abcd")
                scheme.description = u"\uC3BC and \uC3B6 and <&> f\u00FCr"
                scheme.streaming_mode = scheme.streaming_mode_simple
                scheme.use_external_validation = False
                scheme.use_single_instance = True

                arg1 = Argument("arg1")
                scheme.add_argument(arg1)

                arg2 = Argument("arg2")
                arg2.description = u"\uC3BC and \uC3B6 and <&> f\u00FCr"
                arg2.data_type = Argument.data_type_number
                arg2.required_on_create = True
                arg2.required_on_edit = True
                arg2.validation = "is_pos_int('some_name')"
                scheme.add_argument(arg2)

                return  scheme

            def stream_events(self, inputs, ew):
                # not used
                return

        script = NewScript()

        out = BytesIO()
        err = BytesIO()
        ew = EventWriter(out, err)

        args = [TEST_SCRIPT_PATH, "--scheme"]
        return_value = script.run_script(args, ew, err)

        self.assertEqual(b"", err.getvalue())
        self.assertEqual(0, return_value)

        found = ET.fromstring(out.getvalue())
        expected = ET.parse(data_open("data/scheme_without_defaults.xml")).getroot()

        self.assertTrue(xml_compare(expected, found))
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:45,代码来源:test_script.py


示例13: test_generate_xml_from_argument

    def test_generate_xml_from_argument(self):
        """Checks that the XML generated by an Argument class with all its possible values set is what we expect."""

        argument = Argument(
            name="some_name",
            description=u"쎼 and 쎶 and <&> für",
            validation="is_pos_int('some_name')",
            data_type=Argument.data_type_boolean,
            required_on_edit="true",
            required_on_create="true"
        )

        root = ET.Element("")
        constructed = argument.add_to_document(root)

        expected = ET.parse(data_open("data/argument_without_defaults.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:18,代码来源:test_scheme.py


示例14: test_validation_definition_parse

    def test_validation_definition_parse(self):
        """Check that parsing produces expected result"""
        found = ValidationDefinition.parse(data_open("data/validation.xml"))

        expected = ValidationDefinition()
        expected.metadata = {
            "server_host": "tiny",
            "server_uri": "https://127.0.0.1:8089",
            "checkpoint_dir": "/opt/splunk/var/lib/splunk/modinputs",
            "session_key": "123102983109283019283",
            "name": "aaa"
        }
        expected.parameters = {
            "param1": "value1",
            "param2": "value2",
            "disabled": "0",
            "index": "default",
            "multiValue": ["value1", "value2"],
            "multiValue2": ["value3", "value4"]
        }

        self.assertEqual(expected, found)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:22,代码来源:test_validation_definition.py


示例15: test_xml_of_event_with_more_configuration

    def test_xml_of_event_with_more_configuration(self):
        """Generate XML from an even object with all fields set, see if
        it matches what we expect"""
        stream = StringIO()

        event = Event(
            data="This is a test of the emergency broadcast system.",
            stanza="fubar",
            time="%.3f" % 1372274622.493,
            host="localhost",
            index="main",
            source="hilda",
            sourcetype="misc",
            done=True,
            unbroken=True
        )
        event.write_to(stream)

        constructed = ET.fromstring(stream.getvalue())
        expected = ET.parse(data_open("data/event_maximal.xml")).getroot()

        self.assertTrue(xml_compare(expected, constructed))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:22,代码来源:test_event.py


示例16: test_service_property

    def test_service_property(self):
        """ Check that Script.service returns a valid Service instance as soon
        as the stream_events method is called, but not before.

        """

        # Override abstract methods
        class NewScript(Script):
            def __init__(self, test):
                super(NewScript, self).__init__()
                self.test = test

            def get_scheme(self):
                return None

            def stream_events(self, inputs, ew):
                service = self.service
                self.test.assertIsInstance(service, Service)
                self.test.assertEqual(str(service.authority), inputs.metadata['server_uri'])

        script = NewScript(self)
        input_configuration = data_open("data/conf_with_2_inputs.xml")

        out = BytesIO()
        err = BytesIO()
        ew = EventWriter(out, err)

        self.assertEqual(script.service, None)

        return_value = script.run_script(
            [TEST_SCRIPT_PATH], ew, input_configuration)

        self.assertEqual(0, return_value)
        self.assertEqual(b"", err.getvalue())

        return
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:36,代码来源:test_script.py


示例17: test_attempt_to_parse_malformed_input_definition_will_throw_exception

    def test_attempt_to_parse_malformed_input_definition_will_throw_exception(self):
        """Does malformed XML cause the expected exception."""

        with self.assertRaises(ValueError):
            found = InputDefinition.parse(data_open("data/conf_with_invalid_inputs.xml"))
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:5,代码来源:test_input_definition.py



注:本文中的tests.modularinput.modularinput_testlib.data_open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python mr_cmd_job.CmdJob类代码示例发布时间:2022-05-27
下一篇:
Python models.UserProfile类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap