本文整理汇总了Python中tests.modularinput.modularinput_testlib.data_open函数的典型用法代码示例。如果您正苦于以下问题:Python data_open函数的具体用法?Python data_open怎么用?Python data_open使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了data_open函数的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_failed_validation
def test_failed_validation(self):
"""Check that failed validation writes sensible XML to stdout."""
# Override abstract methods
class NewScript(Script):
def get_scheme(self):
return None
def validate_input(self, definition):
raise ValueError("Big fat validation error!")
def stream_events(self, inputs, ew):
# unused
return
script = NewScript()
out = BytesIO()
err = BytesIO()
ew = EventWriter(out, err)
args = [TEST_SCRIPT_PATH, "--validate-arguments"]
return_value = script.run_script(args, ew, data_open("data/validation.xml"))
expected = ET.parse(data_open("data/validation_error.xml")).getroot()
found = ET.fromstring(out.getvalue().decode('utf-8'))
self.assertEqual(b"", err.getvalue())
self.assertTrue(xml_compare(expected, found))
self.assertNotEqual(0, return_value)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:31,代码来源:test_script.py
示例2: test_writing_events_on_event_writer
def test_writing_events_on_event_writer(self):
"""Write a pair of events with an EventWriter, and ensure that they
are being encoded immediately and correctly onto the output stream"""
out = StringIO()
err = StringIO()
ew = EventWriter(out, err)
e = Event(
data="This is a test of the emergency broadcast system.",
stanza="fubar",
time="%.3f" % 1372275124.466,
host="localhost",
index="main",
source="hilda",
sourcetype="misc",
done=True,
unbroken=True
)
ew.write_event(e)
found = ET.fromstring("%s</stream>" % out.getvalue())
expected = ET.parse(data_open("data/stream_with_one_event.xml")).getroot()
self.assertTrue(xml_compare(expected, found))
self.assertEqual(err.getvalue(), "")
ew.write_event(e)
ew.close()
found = ET.fromstring(out.getvalue())
expected = ET.parse(data_open("data/stream_with_two_events.xml")).getroot()
self.assertTrue(xml_compare(expected, found))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:34,代码来源:test_event.py
示例3: test_parse_inputdef_with_two_inputs
def test_parse_inputdef_with_two_inputs(self):
"""Check parsing of XML that contains 2 inputs"""
found = InputDefinition.parse(data_open("data/conf_with_2_inputs.xml"))
expectedDefinition = InputDefinition()
expectedDefinition.metadata = {
"server_host": "tiny",
"server_uri": "https://127.0.0.1:8089",
"checkpoint_dir": "/some/dir",
"session_key": "123102983109283019283"
}
expectedDefinition.inputs["foobar://aaa"] = {
"param1": "value1",
"param2": "value2",
"disabled": "0",
"index": "default"
}
expectedDefinition.inputs["foobar://bbb"] = {
"param1": "value11",
"param2": "value22",
"disabled": "0",
"index": "default",
"multiValue": ["value1", "value2"],
"multiValue2": ["value3", "value4"]
}
self.assertEqual(expectedDefinition, found)
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:28,代码来源:test_input_definition.py
示例4: test_successful_validation
def test_successful_validation(self):
"""Check that successful validation yield no text and a 0 exit value."""
# Override abstract methods
class NewScript(Script):
def get_scheme(self):
return None
def validate_input(self, definition):
# always succeed...
return
def stream_events(self, inputs, ew):
# unused
return
script = NewScript()
out = StringIO()
err = StringIO()
ew = EventWriter(out, err)
args = [TEST_SCRIPT_PATH, "--validate-arguments"]
return_value = script.run_script(args, ew, data_open("data/validation.xml"))
self.assertEqual("", err.getvalue())
self.assertEqual("", out.getvalue())
self.assertEqual(0, return_value)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:29,代码来源:test_script.py
示例5: test_generate_xml_from_scheme
def test_generate_xml_from_scheme(self):
"""Checks that the XML generated by a Scheme object with all its fields set and
some arguments added matches what we expect."""
scheme = Scheme("abcd")
scheme.description = u"쎼 and 쎶 and <&> für"
scheme.streaming_mode = Scheme.streaming_mode_simple
scheme.use_external_validation = "false"
scheme.use_single_instance = "true"
arg1 = Argument(name="arg1")
scheme.add_argument(arg1)
arg2 = Argument(
name="arg2",
description=u"쎼 and 쎶 and <&> für",
validation="is_pos_int('some_name')",
data_type=Argument.data_type_number,
required_on_edit=True,
required_on_create=True
)
scheme.add_argument(arg2)
constructed = scheme.to_xml()
expected = ET.parse(data_open("data/scheme_without_defaults.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:27,代码来源:test_scheme.py
示例6: test_generate_xml_from_scheme_with_default_values
def test_generate_xml_from_scheme_with_default_values(self):
"""Checks the Scheme generated by creating a Scheme object and setting no fields on it.
This test checks for sane defaults in the class."""
scheme = Scheme("abcd")
constructed = scheme.to_xml()
expected = ET.parse(data_open("data/scheme_with_defaults.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:10,代码来源:test_scheme.py
示例7: test_generate_xml_from_argument_with_default_values
def test_generate_xml_from_argument_with_default_values(self):
"""Checks that the XML produced from an Argument class that is initialized but has no additional manipulations
made to it is what we expect. This is mostly a check of the default values."""
argument = Argument("some_name")
root = ET.Element("")
constructed = argument.add_to_document(root)
expected = ET.parse(data_open("data/argument_with_defaults.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:12,代码来源:test_scheme.py
示例8: test_write_events
def test_write_events(self):
"""Check that passing an input definition and writing a couple events goes smoothly."""
# Override abstract methods
class NewScript(Script):
def get_scheme(self):
return None
def stream_events(self, inputs, ew):
event = Event(
data="This is a test of the emergency broadcast system.",
stanza="fubar",
time="%.3f" % 1372275124.466,
host="localhost",
index="main",
source="hilda",
sourcetype="misc",
done=True,
unbroken=True
)
ew.write_event(event)
ew.write_event(event)
script = NewScript()
input_configuration = data_open("data/conf_with_2_inputs.xml")
out = BytesIO()
err = BytesIO()
ew = EventWriter(out, err)
return_value = script.run_script([TEST_SCRIPT_PATH], ew, input_configuration)
self.assertEqual(0, return_value)
self.assertEqual(b"", err.getvalue())
expected = ET.parse(data_open("data/stream_with_two_events.xml")).getroot()
found = ET.fromstring(out.getvalue())
self.assertTrue(xml_compare(expected, found))
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:40,代码来源:test_script.py
示例9: test_write_xml_is_sane
def test_write_xml_is_sane(self):
"""Check that EventWriter.write_xml_document writes sensible
XML to the output stream."""
out = StringIO()
err = StringIO()
ew = EventWriter(out, err)
expected_xml = ET.parse(data_open("data/event_maximal.xml")).getroot()
ew.write_xml_document(expected_xml)
found_xml = ET.fromstring(out.getvalue())
self.assertTrue(xml_compare(expected_xml, found_xml))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:14,代码来源:test_event.py
示例10: test_parse_inputdef_with_zero_inputs
def test_parse_inputdef_with_zero_inputs(self):
"""Check parsing of XML that contains only metadata"""
found = InputDefinition.parse(data_open("data/conf_with_0_inputs.xml"))
expectedDefinition = InputDefinition()
expectedDefinition.metadata = {
"server_host": "tiny",
"server_uri": "https://127.0.0.1:8089",
"checkpoint_dir": "/some/dir",
"session_key": "123102983109283019283"
}
self.assertEqual(found, expectedDefinition)
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:14,代码来源:test_input_definition.py
示例11: test_xml_of_event_with_minimal_configuration
def test_xml_of_event_with_minimal_configuration(self):
"""Generate XML from an event object with a small number of fields,
and see if it matches what we expect."""
stream = StringIO()
event = Event(
data="This is a test of the emergency broadcast system.", stanza="fubar", time="%.3f" % 1372187084.000
)
event.write_to(stream)
constructed = ET.fromstring(stream.getvalue())
expected = ET.parse(data_open("data/event_minimal.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:15,代码来源:test_event.py
示例12: test_scheme_properly_generated_by_script
def test_scheme_properly_generated_by_script(self):
"""Check that a scheme generated by a script is what we expect."""
# Override abstract methods
class NewScript(Script):
def get_scheme(self):
scheme = Scheme("abcd")
scheme.description = u"\uC3BC and \uC3B6 and <&> f\u00FCr"
scheme.streaming_mode = scheme.streaming_mode_simple
scheme.use_external_validation = False
scheme.use_single_instance = True
arg1 = Argument("arg1")
scheme.add_argument(arg1)
arg2 = Argument("arg2")
arg2.description = u"\uC3BC and \uC3B6 and <&> f\u00FCr"
arg2.data_type = Argument.data_type_number
arg2.required_on_create = True
arg2.required_on_edit = True
arg2.validation = "is_pos_int('some_name')"
scheme.add_argument(arg2)
return scheme
def stream_events(self, inputs, ew):
# not used
return
script = NewScript()
out = BytesIO()
err = BytesIO()
ew = EventWriter(out, err)
args = [TEST_SCRIPT_PATH, "--scheme"]
return_value = script.run_script(args, ew, err)
self.assertEqual(b"", err.getvalue())
self.assertEqual(0, return_value)
found = ET.fromstring(out.getvalue())
expected = ET.parse(data_open("data/scheme_without_defaults.xml")).getroot()
self.assertTrue(xml_compare(expected, found))
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:45,代码来源:test_script.py
示例13: test_generate_xml_from_argument
def test_generate_xml_from_argument(self):
"""Checks that the XML generated by an Argument class with all its possible values set is what we expect."""
argument = Argument(
name="some_name",
description=u"쎼 and 쎶 and <&> für",
validation="is_pos_int('some_name')",
data_type=Argument.data_type_boolean,
required_on_edit="true",
required_on_create="true"
)
root = ET.Element("")
constructed = argument.add_to_document(root)
expected = ET.parse(data_open("data/argument_without_defaults.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:AMAKO11,项目名称:splunk-sdk-python,代码行数:18,代码来源:test_scheme.py
示例14: test_validation_definition_parse
def test_validation_definition_parse(self):
"""Check that parsing produces expected result"""
found = ValidationDefinition.parse(data_open("data/validation.xml"))
expected = ValidationDefinition()
expected.metadata = {
"server_host": "tiny",
"server_uri": "https://127.0.0.1:8089",
"checkpoint_dir": "/opt/splunk/var/lib/splunk/modinputs",
"session_key": "123102983109283019283",
"name": "aaa"
}
expected.parameters = {
"param1": "value1",
"param2": "value2",
"disabled": "0",
"index": "default",
"multiValue": ["value1", "value2"],
"multiValue2": ["value3", "value4"]
}
self.assertEqual(expected, found)
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:22,代码来源:test_validation_definition.py
示例15: test_xml_of_event_with_more_configuration
def test_xml_of_event_with_more_configuration(self):
"""Generate XML from an even object with all fields set, see if
it matches what we expect"""
stream = StringIO()
event = Event(
data="This is a test of the emergency broadcast system.",
stanza="fubar",
time="%.3f" % 1372274622.493,
host="localhost",
index="main",
source="hilda",
sourcetype="misc",
done=True,
unbroken=True
)
event.write_to(stream)
constructed = ET.fromstring(stream.getvalue())
expected = ET.parse(data_open("data/event_maximal.xml")).getroot()
self.assertTrue(xml_compare(expected, constructed))
开发者ID:bumyongchoi,项目名称:splunk-sdk-python,代码行数:22,代码来源:test_event.py
示例16: test_service_property
def test_service_property(self):
""" Check that Script.service returns a valid Service instance as soon
as the stream_events method is called, but not before.
"""
# Override abstract methods
class NewScript(Script):
def __init__(self, test):
super(NewScript, self).__init__()
self.test = test
def get_scheme(self):
return None
def stream_events(self, inputs, ew):
service = self.service
self.test.assertIsInstance(service, Service)
self.test.assertEqual(str(service.authority), inputs.metadata['server_uri'])
script = NewScript(self)
input_configuration = data_open("data/conf_with_2_inputs.xml")
out = BytesIO()
err = BytesIO()
ew = EventWriter(out, err)
self.assertEqual(script.service, None)
return_value = script.run_script(
[TEST_SCRIPT_PATH], ew, input_configuration)
self.assertEqual(0, return_value)
self.assertEqual(b"", err.getvalue())
return
开发者ID:larrys,项目名称:splunk-sdk-python,代码行数:36,代码来源:test_script.py
示例17: test_attempt_to_parse_malformed_input_definition_will_throw_exception
def test_attempt_to_parse_malformed_input_definition_will_throw_exception(self):
"""Does malformed XML cause the expected exception."""
with self.assertRaises(ValueError):
found = InputDefinition.parse(data_open("data/conf_with_invalid_inputs.xml"))
开发者ID:Jaykul,项目名称:splunk-sdk-python,代码行数:5,代码来源:test_input_definition.py
注:本文中的tests.modularinput.modularinput_testlib.data_open函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论