本文整理汇总了Python中mock.mock.MagicMock类的典型用法代码示例。如果您正苦于以下问题:Python MagicMock类的具体用法?Python MagicMock怎么用?Python MagicMock使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MagicMock类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_report_data
def test_report_data(self):
mock = MagicMock()
mock.couch_user = self.web_user
mock.GET = {
'location_id': 'd1',
'startdate': '2017-11-01',
'enddate': '2017-11-30'
}
mock.datespan = DateSpan(datetime(2017, 11, 1), datetime(2017, 11, 30))
fiche_report = FicheConsommationReport(request=mock, domain='test-domain')
header = fiche_report.headers
rows = fiche_report.rows
self.assertEqual(
[column_group.html for column_group in header],
['', 'Product 1', 'Product 2', 'Product 3']
)
self.assertEqual(
rows,
[
[
'PPS 1',
{'sort_key': 10, 'html': 10}, {'sort_key': 5, 'html': 5}, {'sort_key': 5, 'html': 5},
{'sort_key': 2, 'html': 2}, {'sort_key': 2, 'html': 2}, {'sort_key': 0, 'html': 0},
{'sort_key': 6, 'html': 6}, {'sort_key': 4, 'html': 4}, {'sort_key': 2, 'html': 2}
],
[
'PPS 2',
{'sort_key': 13, 'html': 13}, {'sort_key': 11, 'html': 11}, {'sort_key': 2, 'html': 2},
{'sort_key': 0, 'html': 0}, {'sort_key': 0, 'html': 0}, {'sort_key': 0, 'html': 0},
{'sort_key': 150, 'html': 150}, {'sort_key': 11, 'html': 11}, {'sort_key': 139, 'html': 139}
]
]
)
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:35,代码来源:test_fiche_consommation.py
示例2: test_action_create_xml_config_with_metacharacters
def test_action_create_xml_config_with_metacharacters(self,
time_asctime_mock,
os_path_isdir_mock,
os_path_exists_mock,
open_mock,
ensure_mock):
"""
Tests if 'create' action - creates new non existent xml file and write proper data
where configurations={"Some conf":"Some metacharacters"}
"""
os_path_isdir_mock.side_effect = [False, True]
os_path_exists_mock.return_value = False
time_asctime_mock.return_value = 'Wed 2014-02'
result_file = MagicMock()
open_mock.return_value = result_file
with Environment('/') as env:
XmlConfig('file.xml',
conf_dir='/dir/conf',
configurations={"": "",
"prop.1": "'.'yyyy-MM-dd-HH",
"prop.3": "%d{ISO8601} %5p %c{1}:%L - %m%n",
"prop.2": "INFO, openjpa",
"prop.4": "${oozie.log.dir}/oozie.log",
"prop.empty": "",
},
)
open_mock.assert_called_with('/dir/conf/file.xml', 'wb')
result_file.__enter__().write.assert_called_with(u'<!--Wed 2014-02-->\n <configuration>\n \n <property>\n <name></name>\n <value></value>\n </property>\n \n <property>\n <name>prop.empty</name>\n <value></value>\n </property>\n \n <property>\n <name>prop.3</name>\n <value>%d{ISO8601} %5p %c{1}:%L - %m%n</value>\n </property>\n \n <property>\n <name>prop.2</name>\n <value>INFO, openjpa</value>\n </property>\n \n <property>\n <name>prop.1</name>\n <value>'.'yyyy-MM-dd-HH</value>\n </property>\n \n <property>\n <name>prop.4</name>\n <value>${oozie.log.dir}/oozie.log</value>\n </property>\n \n </configuration>\n')
开发者ID:Altiscale,项目名称:incubator-slider,代码行数:31,代码来源:TestXmlConfigResource.py
示例3: test_run
def test_run(self, ActionQueue_mock, installMock, buildMock):
aq = MagicMock()
ActionQueue_mock.return_value = aq
buildMock.return_value = "opener"
registerAndHeartbeat = MagicMock("registerAndHeartbeat")
calls = []
def side_effect():
if len(calls) == 0:
self.controller.repeatRegistration = True
calls.append(1)
registerAndHeartbeat.side_effect = side_effect
self.controller.registerAndHeartbeat = registerAndHeartbeat
# repeat registration
self.controller.run()
self.assertTrue(buildMock.called)
installMock.called_once_with("opener")
self.assertEqual(2, registerAndHeartbeat.call_count)
# one call, +1
registerAndHeartbeat.side_effect = None
self.controller.run()
self.assertEqual(3, registerAndHeartbeat.call_count)
# Action queue should be started during calls
self.assertTrue(ActionQueue_mock.called)
self.assertTrue(aq.start.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-slider,代码行数:29,代码来源:TestController.py
示例4: test_action_create_properties_with_metacharacters
def test_action_create_properties_with_metacharacters(self,
time_asctime_mock,
os_path_isdir_mock,
os_path_exists_mock,
open_mock,
ensure_mock):
"""
Tests if 'action_create' - creates new non existent file and write proper data
1) properties={"":"", "Some property":"Metacharacters: -%{} ${a.a}/"}
2) dir=None
"""
os_path_isdir_mock.side_effect = [False, True]
os_path_exists_mock.return_value = False
time_asctime_mock.return_value = 777
result_file = MagicMock()
open_mock.return_value = result_file
with Environment('/') as env:
PropertiesFile('/dir/new_file',
properties={"": "",
"prop.1": "'.'yyyy-MM-dd-HH",
"prop.3": "%d{ISO8601} %5p %c{1}:%L - %m%n",
"prop.2": "INFO, openjpa",
"prop.4": "${oozie.log.dir}/oozie.log",
"prop.empty": "",
},
)
open_mock.assert_called_with('/dir/new_file','wb')
result_file.__enter__().write.assert_called_with(u"# Generated by Apache Ambari. 777\n \n=\nprop.1='.'yyyy-MM-dd-HH\nprop.2=INFO, openjpa\nprop.3=%d{ISO8601} %5p %c{1}:%L - %m%n\nprop.4=${oozie.log.dir}/oozie.log\nprop.empty=\n \n")
self.assertEqual(open_mock.call_count, 1)
ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:33,代码来源:TestPropertiesFileResource.py
示例5: test_action_create_empty_properties_without_dir
def test_action_create_empty_properties_without_dir(self,
time_asctime_mock,
os_path_isdir_mock,
os_path_exists_mock,
open_mock,
ensure_mock):
"""
Tests if 'action_create' - creates new non existent file and write proper data
1) properties={}
2) dir=None
"""
os_path_isdir_mock.side_effect = [False, True]
os_path_exists_mock.return_value = False
time_asctime_mock.return_value = 'Today is Wednesday'
result_file = MagicMock()
open_mock.return_value = result_file
with Environment('/') as env:
PropertiesFile('/somewhere_in_system/one_file.properties',
dir=None,
properties={}
)
open_mock.assert_called_with('/somewhere_in_system/one_file.properties', 'wb')
result_file.__enter__().write.assert_called_with( u'# Generated by Apache Ambari. Today is Wednesday\n \n \n')
self.assertEqual(open_mock.call_count, 1)
ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:28,代码来源:TestPropertiesFileResource.py
示例6: test_watchdog_2
def test_watchdog_2(self):
"""
Tries to catch false positive watchdog invocations
"""
subproc_mock = self.Subprocess_mockup()
executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
_, tmpstrucout = tempfile.mkstemp()
PYTHON_TIMEOUT_SECONDS = 5
def launch_python_subprocess_method(command, tmpout, tmperr):
subproc_mock.tmpout = tmpout
subproc_mock.tmperr = tmperr
return subproc_mock
executor.launch_python_subprocess = launch_python_subprocess_method
runShellKillPgrp_method = MagicMock()
runShellKillPgrp_method.side_effect = lambda python : python.terminate()
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = 0
callback_method = MagicMock()
thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
tmpoutfile, tmperrfile,
PYTHON_TIMEOUT_SECONDS, tmpstrucout,
callback_method, "1-1"))
thread.start()
time.sleep(0.1)
subproc_mock.should_finish_event.set()
subproc_mock.finished_event.wait()
self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:32,代码来源:TestPythonExecutor.py
示例7: test_extract_value_from_report_table_row_value_input_dict
def test_extract_value_from_report_table_row_value_input_dict(self):
mock = MagicMock()
mock.couch_user = self.user
mock.GET = {
'location_id': '',
'program': '',
'month_start': '10',
'year_start': '2017',
'month_end': '3',
'year_end': '2018',
}
dashboard1_report = Dashboard1Report(request=mock, domain='test-pna')
report_table = {
'fix_column': False,
'comment': 'test_comment',
'rows': [],
'datatables': False,
'title': 'test_title',
'total_row': [
{'html': 'row_0'},
{'html': 'row_1'},
{'html': 'row_2'},
{'html': 'row_3'}
],
'slug': 'disponibilite',
'default_rows': 10
}
report_table_value = dashboard1_report._extract_value_from_report_table_row_value(report_table)
self.assertEqual(report_table_value, ['row_0', 'row_1', 'row_2', 'row_3'])
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:29,代码来源:test_dashboard_1.py
示例8: test_ParseOptions
def test_ParseOptions(self, open_mock, backup_file_mock, modify_action_mock, option_parser_mock):
class options(object):
user = "test_user"
hostname = "127.0.0.1"
clustername = "test1"
password = "test_password"
upgrade_json = "catalog_file"
from_stack = "0.0"
to_stack = "1.3"
logfile = "test.log"
report = "report.txt"
warnings = []
printonly = False
args = ["update-configs"]
modify_action_mock.return_value = MagicMock()
backup_file_mock.return_value = MagicMock()
test_mock = MagicMock()
test_mock.parse_args = lambda: (options, args)
option_parser_mock.return_value = test_mock
upgradeHelper.main()
self.assertEqual(backup_file_mock.call_count, 0)
self.assertEqual(modify_action_mock.call_count, 1)
self.assertEqual({"user": options.user, "pass": options.password}, upgradeHelper.Options.API_TOKENS)
self.assertEqual(options.clustername, upgradeHelper.Options.CLUSTER_NAME)
开发者ID:fanzhidongyzby,项目名称:ambari,代码行数:26,代码来源:TestUpgradeHelper.py
示例9: test_start
def test_start(self):
execution_commands = [
{
'clusterName': 'cluster',
'hostName': 'host',
'alertDefinition': {
'name': 'alert1'
}
}
]
scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
alert_mock = MagicMock()
alert_mock.interval = Mock(return_value=5)
alert_mock.collect = Mock()
alert_mock.set_helpers = Mock()
scheduler.schedule_definition = MagicMock()
scheduler._AlertSchedulerHandler__scheduler = MagicMock()
scheduler._AlertSchedulerHandler__scheduler.running = False
scheduler._AlertSchedulerHandler__scheduler.start = Mock()
scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
scheduler._AlertSchedulerHandler__config_maps = {
'cluster': {}
}
scheduler.start()
self.assertTrue(scheduler._AlertSchedulerHandler__scheduler.start.called)
scheduler.schedule_definition.assert_called_with(alert_mock)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:29,代码来源:TestAlertSchedulerHandler.py
示例10: test_rss
def test_rss(container_node, other_container_node, content_node, collections, home_root, guest_user, root):
everybody_rule = get_or_add_everybody_rule()
root.children.append(collections)
collections.access_rule_assocs.append(NodeToAccessRule(ruletype=u"read", rule=everybody_rule))
collections.container_children.append(container_node)
container_node.container_children.append(other_container_node)
other_container_node.content_children.append(content_node)
struct = {"nodelist": [other_container_node, content_node], "build_response_start": time.time()}
params = {}
req = MagicMock()
req.get_header = lambda x: "localhost:8081"
req.fullpath = ""
req.query = ""
res = struct2rss(req, "", params, None, struct=struct)
print res
# TODO: find some way to check XML content properly
assert res.startswith("""<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" version="2.0">
<channel>""")
assert "document/testschema" in res
assert "http://localhost:8081/node?id=" in res
开发者ID:mediatum,项目名称:mediatum,代码行数:27,代码来源:test_handlers.py
示例11: test_repeatRegistration
def test_repeatRegistration(self, get_os_version_mock, get_os_type_mock,
run_mock, installMock, buildMock, Popen_mock):
registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
get_os_type_mock.return_value = "suse"
get_os_version_mock.return_value = "11"
self.controller.registerAndHeartbeat = registerAndHeartbeat
self.controller.run()
self.assertTrue(installMock.called)
self.assertTrue(buildMock.called)
self.controller.registerAndHeartbeat.assert_called_once_with()
calls = []
def switchBool():
if len(calls) == 0:
self.controller.repeatRegistration = True
calls.append(1)
self.controller.repeatRegistration = False
registerAndHeartbeat.side_effect = switchBool
self.controller.run()
self.assertEqual(2, registerAndHeartbeat.call_count)
self.controller.registerAndHeartbeat = \
Controller.Controller.registerAndHeartbeat
开发者ID:maduhu,项目名称:HDP2.5-ambari,代码行数:26,代码来源:TestController.py
示例12: xml_fixture
def xml_fixture(parent_node, content_node):
global mock
parent_node["testvalue"] = "1001"
content_node["testvalue"] = "1002"
struct = {"nodelist": [parent_node, content_node],
"build_response_start": time.time(),
"status": "ok",
"dataready": "0.1",
"retrievaldate": datetime.datetime.now().isoformat(),
"sortfield": "sortfield",
"sortdirection": "up",
"timetable": [],
"result_shortlist": []}
params = {}
if not mock:
req = MagicMock()
mock = req
else:
req = mock
req.get_header = lambda x: "localhost:8081"
req.fullpath = ""
req.query = ""
MetadatatypeFactory(name=u"directory")
MetadatatypeFactory(name=u"testschema")
return struct, req, params
开发者ID:mediatum,项目名称:mediatum,代码行数:27,代码来源:test_handlers.py
示例13: test_main
def test_main(self, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock,
update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
resolve_ambari_config_mock, stop_mock, bind_signal_handlers_mock,
setup_logging_mock, socket_mock):
data_clean_init_mock.return_value = None
Controller_init_mock.return_value = None
ping_port_init_mock.return_value = None
options = MagicMock()
parse_args_mock.return_value = (options, MagicMock)
#testing call without command-line arguments
main.main()
self.assertTrue(setup_logging_mock.called)
self.assertTrue(bind_signal_handlers_mock.called)
self.assertTrue(stop_mock.called)
self.assertTrue(resolve_ambari_config_mock.called)
self.assertTrue(perform_prestart_checks_mock.called)
self.assertTrue(daemonize_mock.called)
self.assertTrue(update_log_level_mock.called)
try_to_connect_mock.assert_called_once_with(ANY, -1, ANY)
self.assertTrue(start_mock.called)
self.assertTrue(data_clean_init_mock.called)
self.assertTrue(data_clean_start_mock.called)
self.assertTrue(ping_port_init_mock.called)
self.assertTrue(ping_port_start_mock.called)
perform_prestart_checks_mock.reset_mock()
# Testing call with --expected-hostname parameter
options.expected_hostname = "test.hst"
main.main()
perform_prestart_checks_mock.assert_called_once_with(options.expected_hostname)
开发者ID:duxia,项目名称:ambari,代码行数:34,代码来源:TestMain.py
示例14: test_execution_results
def test_execution_results(self):
subproc_mock = self.Subprocess_mockup()
executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
tmp_file = tempfile.NamedTemporaryFile() # the structured out file should be preserved across calls to the hooks and script.
tmpstructuredoutfile = tmp_file.name
tmp_file.close()
PYTHON_TIMEOUT_SECONDS = 5
def launch_python_subprocess_method(command, tmpout, tmperr):
subproc_mock.tmpout = tmpout
subproc_mock.tmperr = tmperr
return subproc_mock
executor.launch_python_subprocess = launch_python_subprocess_method
runShellKillPgrp_method = MagicMock()
runShellKillPgrp_method.side_effect = lambda python : python.terminate()
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = 0
subproc_mock.should_finish_event.set()
callback_method = MagicMock()
result = executor.run_file("file", ["arg1", "arg2"],
tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
tmpstructuredoutfile, callback_method, "1-1")
self.assertEquals(result, {'exitcode': 0, 'stderr': '', 'stdout': '',
'structuredOut': {}})
self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:29,代码来源:TestPythonExecutor.py
示例15: test_attribute_command_escaping
def test_attribute_command_escaping(self, popen_mock):
expected_command0 = "arg1 arg2 'quoted arg'"
expected_command1 = "arg1 arg2 'command \"arg\"'"
expected_command2 = 'arg1 arg2 \'command \'"\'"\'arg\'"\'"\'\''
expected_command3 = "arg1 arg2 'echo `ls /root`'"
expected_command4 = "arg1 arg2 '$ROOT'"
expected_command5 = "arg1 arg2 '`ls /root`'"
subproc_mock = MagicMock()
subproc_mock.returncode = 0
popen_mock.return_value = subproc_mock
with Environment("/") as env:
Execute(('arg1', 'arg2', 'quoted arg'),
)
Execute(('arg1', 'arg2', 'command "arg"'),
)
Execute(('arg1', 'arg2', "command 'arg'"),
)
Execute(('arg1', 'arg2', "echo `ls /root`"),
)
Execute(('arg1', 'arg2', "$ROOT"),
)
Execute(('arg1', 'arg2', "`ls /root`"),
)
self.assertEqual(popen_mock.call_args_list[0][0][0][3], expected_command0)
self.assertEqual(popen_mock.call_args_list[1][0][0][3], expected_command1)
self.assertEqual(popen_mock.call_args_list[2][0][0][3], expected_command2)
self.assertEqual(popen_mock.call_args_list[3][0][0][3], expected_command3)
self.assertEqual(popen_mock.call_args_list[4][0][0][3], expected_command4)
self.assertEqual(popen_mock.call_args_list[5][0][0][3], expected_command5)
开发者ID:LXiong,项目名称:slider,代码行数:32,代码来源:TestExecuteResource.py
示例16: test_watchdog_1
def test_watchdog_1(self, kill_process_with_children_mock):
"""
Tests whether watchdog works
"""
subproc_mock = self.Subprocess_mockup()
executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
_, tmpoutfile = tempfile.mkstemp()
_, tmperrfile = tempfile.mkstemp()
_, tmpstrucout = tempfile.mkstemp()
PYTHON_TIMEOUT_SECONDS = 0.1
kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()
def launch_python_subprocess_method(command, tmpout, tmperr):
subproc_mock.tmpout = tmpout
subproc_mock.tmperr = tmperr
return subproc_mock
executor.launch_python_subprocess = launch_python_subprocess_method
runShellKillPgrp_method = MagicMock()
runShellKillPgrp_method.side_effect = lambda python : python.terminate()
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = None
callback_method = MagicMock()
thread = Thread(target = executor.run_file, args = ("fake_puppetFile",
["arg1", "arg2"], tmpoutfile, tmperrfile,
PYTHON_TIMEOUT_SECONDS, tmpstrucout, callback_method, '1'))
thread.start()
time.sleep(0.1)
subproc_mock.finished_event.wait()
self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:30,代码来源:TestPythonExecutor.py
示例17: test_request
def test_request(self, connect_mock):
httpsconn_mock = MagicMock(create = True)
self.cachedHTTPSConnection.httpsconn = httpsconn_mock
dummy_request = MagicMock(create = True)
dummy_request.get_method.return_value = "dummy_get_method"
dummy_request.get_full_url.return_value = "dummy_full_url"
dummy_request.get_data.return_value = "dummy_get_data"
dummy_request.headers = "dummy_headers"
responce_mock = MagicMock(create = True)
responce_mock.read.return_value = "dummy responce"
httpsconn_mock.getresponse.return_value = responce_mock
# Testing normal case
responce = self.cachedHTTPSConnection.request(dummy_request)
self.assertEqual(responce, responce_mock.read.return_value)
httpsconn_mock.request.assert_called_once_with(
dummy_request.get_method.return_value,
dummy_request.get_full_url.return_value,
dummy_request.get_data.return_value,
dummy_request.headers)
# Testing case of exception
try:
def side_eff():
raise Exception("Dummy exception")
httpsconn_mock.read.side_effect = side_eff
responce = self.cachedHTTPSConnection.request(dummy_request)
self.fail("Should raise IOError")
except Exception, err:
# Expected
pass
开发者ID:adamosloizou,项目名称:fiware-cosmos-ambari,代码行数:34,代码来源:TestSecurity.py
示例18: test_execute_status_command
def test_execute_status_command(self, CustomServiceOrchestrator_mock,
build_mock, execute_command_mock, requestComponentSecurityState_mock,
requestComponentStatus_mock, read_stack_version_mock,
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
build_mock.return_value = {'dummy report': '' }
dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
requestComponentStatus_mock.reset_mock()
requestComponentStatus_mock.return_value = {'exitcode': 0 }
requestComponentSecurityState_mock.reset_mock()
requestComponentSecurityState_mock.return_value = 'UNKNOWN'
actionQueue.execute_status_command(self.status_command)
report = actionQueue.result()
expected = {'dummy report': '',
'securityState' : 'UNKNOWN'}
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0], expected)
self.assertTrue(requestComponentStatus_mock.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:26,代码来源:TestActionQueue.py
示例19: test_cancel_backgound_command
def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
kill_process_with_children_mock):
FileCache_mock.return_value = None
FileCache_mock.cache_dir = MagicMock()
resolve_hook_script_path_mock.return_value = None
# shell.kill_process_with_children = MagicMock()
dummy_controller = MagicMock()
cfg = AmbariConfig().getConfig()
cfg.set('agent', 'tolerate_download_failures', 'true')
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
actionQueue = ActionQueue(cfg, dummy_controller)
dummy_controller.actionQueue = actionQueue
orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
orchestrator.file_cache = MagicMock()
def f (a, b):
return ""
orchestrator.file_cache.get_service_base_dir = f
actionQueue.customServiceOrchestrator = orchestrator
import TestActionQueue
import copy
TestActionQueue.patch_output_file(orchestrator.python_executor)
orchestrator.python_executor.prepare_process_result = MagicMock()
orchestrator.dump_command_to_json = MagicMock()
lock = threading.RLock()
complete_done = threading.Condition(lock)
complete_was_called = {}
def command_complete_w(process_condenced_result, handle):
with lock:
complete_was_called['visited']= ''
complete_done.wait(3)
actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None)
execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
actionQueue.put([execute_command])
actionQueue.processBackgroundQueueSafeEmpty()
time.sleep(.1)
orchestrator.cancel_command(19,'')
self.assertTrue(kill_process_with_children_mock.called)
kill_process_with_children_mock.assert_called_with(33)
with lock:
complete_done.notifyAll()
with lock:
self.assertTrue(complete_was_called.has_key('visited'))
time.sleep(.1)
runningCommand = actionQueue.commandStatuses.get_command_status(19)
self.assertTrue(runningCommand is not None)
self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
开发者ID:fanzhidongyzby,项目名称:ambari,代码行数:60,代码来源:TestCustomServiceOrchestrator.py
示例20: test_action_create_properties_simple
def test_action_create_properties_simple(self,
time_asctime_mock,
os_path_isdir_mock,
os_path_exists_mock,
open_mock,
ensure_mock):
"""
Tests if 'action_create' - creates new non existent file and write proper data
1) properties={"Some property":"Some value"}
2) dir=None
"""
os_path_isdir_mock.side_effect = [False, True]
os_path_exists_mock.return_value = False
time_asctime_mock.return_value = 777
result_file = MagicMock()
open_mock.return_value = result_file
with Environment('/') as env:
PropertiesFile('/dir/new_file',
properties={'property1': 'value1'},
)
open_mock.assert_called_with('/dir/new_file',
'wb')
result_file.__enter__().write.assert_called_with(u'# Generated by Apache Ambari. 777\n \nproperty1=value1\n \n')
self.assertEqual(open_mock.call_count, 1)
ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:29,代码来源:TestPropertiesFileResource.py
注:本文中的mock.mock.MagicMock类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论