• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python mock.MagicMock类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mock.mock.MagicMock的典型用法代码示例。如果您正苦于以下问题:Python MagicMock类的具体用法?Python MagicMock怎么用?Python MagicMock使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MagicMock类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_report_data

    def test_report_data(self):
        mock = MagicMock()
        mock.couch_user = self.web_user
        mock.GET = {
            'location_id': 'd1',
            'startdate': '2017-11-01',
            'enddate': '2017-11-30'
        }
        mock.datespan = DateSpan(datetime(2017, 11, 1), datetime(2017, 11, 30))

        fiche_report = FicheConsommationReport(request=mock, domain='test-domain')
        header = fiche_report.headers
        rows = fiche_report.rows

        self.assertEqual(
            [column_group.html for column_group in header],
            ['', 'Product 1', 'Product 2', 'Product 3']
        )
        self.assertEqual(
            rows,
            [
                [
                    'PPS 1',
                    {'sort_key': 10, 'html': 10}, {'sort_key': 5, 'html': 5}, {'sort_key': 5, 'html': 5},
                    {'sort_key': 2, 'html': 2}, {'sort_key': 2, 'html': 2}, {'sort_key': 0, 'html': 0},
                    {'sort_key': 6, 'html': 6}, {'sort_key': 4, 'html': 4}, {'sort_key': 2, 'html': 2}
                ],
                [
                    'PPS 2',
                    {'sort_key': 13, 'html': 13}, {'sort_key': 11, 'html': 11}, {'sort_key': 2, 'html': 2},
                    {'sort_key': 0, 'html': 0}, {'sort_key': 0, 'html': 0}, {'sort_key': 0, 'html': 0},
                    {'sort_key': 150, 'html': 150}, {'sort_key': 11, 'html': 11}, {'sort_key': 139, 'html': 139}
                ]
            ]
        )
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:35,代码来源:test_fiche_consommation.py


示例2: test_action_create_xml_config_with_metacharacters

  def test_action_create_xml_config_with_metacharacters(self,
                                                        time_asctime_mock,
                                                        os_path_isdir_mock,
                                                        os_path_exists_mock,
                                                        open_mock,
                                                        ensure_mock):
    """
    Tests if 'create' action - creates new non existent xml file and write proper data
    where configurations={"Some conf":"Some metacharacters"}
    """
    os_path_isdir_mock.side_effect = [False, True]
    os_path_exists_mock.return_value = False
    time_asctime_mock.return_value = 'Wed 2014-02'

    result_file = MagicMock()
    open_mock.return_value = result_file

    with Environment('/') as env:
      XmlConfig('file.xml',
                conf_dir='/dir/conf',
                configurations={"": "",
                                "prop.1": "'.'yyyy-MM-dd-HH",
                                "prop.3": "%d{ISO8601} %5p %c{1}:%L - %m%n",
                                "prop.2": "INFO, openjpa",
                                "prop.4": "${oozie.log.dir}/oozie.log",
                                "prop.empty": "",
                },
      )

    open_mock.assert_called_with('/dir/conf/file.xml', 'wb')
    result_file.__enter__().write.assert_called_with(u'<!--Wed 2014-02-->\n    <configuration>\n    \n    <property>\n      <name></name>\n      <value></value>\n    </property>\n    \n    <property>\n      <name>prop.empty</name>\n      <value></value>\n    </property>\n    \n    <property>\n      <name>prop.3</name>\n      <value>%d{ISO8601} %5p %c{1}:%L - %m%n</value>\n    </property>\n    \n    <property>\n      <name>prop.2</name>\n      <value>INFO, openjpa</value>\n    </property>\n    \n    <property>\n      <name>prop.1</name>\n      <value>&#39;.&#39;yyyy-MM-dd-HH</value>\n    </property>\n    \n    <property>\n      <name>prop.4</name>\n      <value>${oozie.log.dir}/oozie.log</value>\n    </property>\n    \n  </configuration>\n')
开发者ID:Altiscale,项目名称:incubator-slider,代码行数:31,代码来源:TestXmlConfigResource.py


示例3: test_run

  def test_run(self, ActionQueue_mock, installMock, buildMock):
    aq = MagicMock()
    ActionQueue_mock.return_value = aq

    buildMock.return_value = "opener"
    registerAndHeartbeat  = MagicMock("registerAndHeartbeat")
    calls = []
    def side_effect():
      if len(calls) == 0:
        self.controller.repeatRegistration = True
      calls.append(1)
    registerAndHeartbeat.side_effect = side_effect
    self.controller.registerAndHeartbeat = registerAndHeartbeat

    # repeat registration
    self.controller.run()

    self.assertTrue(buildMock.called)
    installMock.called_once_with("opener")
    self.assertEqual(2, registerAndHeartbeat.call_count)

    # one call, +1
    registerAndHeartbeat.side_effect = None
    self.controller.run()
    self.assertEqual(3, registerAndHeartbeat.call_count)

    # Action queue should be started during calls
    self.assertTrue(ActionQueue_mock.called)
    self.assertTrue(aq.start.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-slider,代码行数:29,代码来源:TestController.py


示例4: test_action_create_properties_with_metacharacters

  def test_action_create_properties_with_metacharacters(self,
                                                        time_asctime_mock,
                                                        os_path_isdir_mock,
                                                        os_path_exists_mock,
                                                        open_mock,
                                                        ensure_mock):
    """
    Tests if 'action_create' - creates new non existent file and write proper data
    1) properties={"":"", "Some property":"Metacharacters: -%{} ${a.a}/"}
    2) dir=None
    """
    os_path_isdir_mock.side_effect = [False, True]
    os_path_exists_mock.return_value = False
    time_asctime_mock.return_value = 777

    result_file = MagicMock()
    open_mock.return_value = result_file

    with Environment('/') as env:
      PropertiesFile('/dir/new_file',
                     properties={"": "",
                                 "prop.1": "'.'yyyy-MM-dd-HH",
                                 "prop.3": "%d{ISO8601} %5p %c{1}:%L - %m%n",
                                 "prop.2": "INFO, openjpa",
                                 "prop.4": "${oozie.log.dir}/oozie.log",
                                 "prop.empty": "",
                     },
      )

    open_mock.assert_called_with('/dir/new_file','wb')
    result_file.__enter__().write.assert_called_with(u"# Generated by Apache Ambari. 777\n    \n=\nprop.1='.'yyyy-MM-dd-HH\nprop.2=INFO, openjpa\nprop.3=%d{ISO8601} %5p %c{1}:%L - %m%n\nprop.4=${oozie.log.dir}/oozie.log\nprop.empty=\n    \n")
    self.assertEqual(open_mock.call_count, 1)
    ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:33,代码来源:TestPropertiesFileResource.py


示例5: test_action_create_empty_properties_without_dir

  def test_action_create_empty_properties_without_dir(self,
                                                      time_asctime_mock,
                                                      os_path_isdir_mock,
                                                      os_path_exists_mock,
                                                      open_mock,
                                                      ensure_mock):
    """
    Tests if 'action_create' - creates new non existent file and write proper data
    1) properties={}
    2) dir=None
    """
    os_path_isdir_mock.side_effect = [False, True]
    os_path_exists_mock.return_value = False
    time_asctime_mock.return_value = 'Today is Wednesday'

    result_file = MagicMock()
    open_mock.return_value = result_file

    with Environment('/') as env:
      PropertiesFile('/somewhere_in_system/one_file.properties',
                     dir=None,
                     properties={}
      )

    open_mock.assert_called_with('/somewhere_in_system/one_file.properties', 'wb')
    result_file.__enter__().write.assert_called_with( u'# Generated by Apache Ambari. Today is Wednesday\n    \n    \n')
    self.assertEqual(open_mock.call_count, 1)
    ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:28,代码来源:TestPropertiesFileResource.py


示例6: test_watchdog_2

  def test_watchdog_2(self):
    """
    Tries to catch false positive watchdog invocations
    """
    subproc_mock = self.Subprocess_mockup()
    executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
    _, tmpoutfile = tempfile.mkstemp()
    _, tmperrfile = tempfile.mkstemp()
    _, tmpstrucout = tempfile.mkstemp()
    PYTHON_TIMEOUT_SECONDS =  5

    def launch_python_subprocess_method(command, tmpout, tmperr):
      subproc_mock.tmpout = tmpout
      subproc_mock.tmperr = tmperr
      return subproc_mock
    executor.launch_python_subprocess = launch_python_subprocess_method
    runShellKillPgrp_method = MagicMock()
    runShellKillPgrp_method.side_effect = lambda python : python.terminate()
    executor.runShellKillPgrp = runShellKillPgrp_method
    subproc_mock.returncode = 0
    callback_method = MagicMock()
    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
                                                      tmpoutfile, tmperrfile,
                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout,
                                                      callback_method, "1-1"))
    thread.start()
    time.sleep(0.1)
    subproc_mock.should_finish_event.set()
    subproc_mock.finished_event.wait()
    self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
    self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
    self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:32,代码来源:TestPythonExecutor.py


示例7: test_extract_value_from_report_table_row_value_input_dict

 def test_extract_value_from_report_table_row_value_input_dict(self):
     mock = MagicMock()
     mock.couch_user = self.user
     mock.GET = {
         'location_id': '',
         'program': '',
         'month_start': '10',
         'year_start': '2017',
         'month_end': '3',
         'year_end': '2018',
     }
     dashboard1_report = Dashboard1Report(request=mock, domain='test-pna')
     report_table = {
         'fix_column': False,
         'comment': 'test_comment',
         'rows': [],
         'datatables': False,
         'title': 'test_title',
         'total_row': [
             {'html': 'row_0'},
             {'html': 'row_1'},
             {'html': 'row_2'},
             {'html': 'row_3'}
         ],
         'slug': 'disponibilite',
         'default_rows': 10
     }
     report_table_value = dashboard1_report._extract_value_from_report_table_row_value(report_table)
     self.assertEqual(report_table_value, ['row_0', 'row_1', 'row_2', 'row_3'])
开发者ID:kkrampa,项目名称:commcare-hq,代码行数:29,代码来源:test_dashboard_1.py


示例8: test_ParseOptions

  def test_ParseOptions(self, open_mock, backup_file_mock, modify_action_mock, option_parser_mock):
    class options(object):
      user = "test_user"
      hostname = "127.0.0.1"
      clustername = "test1"
      password = "test_password"
      upgrade_json = "catalog_file"
      from_stack = "0.0"
      to_stack = "1.3"
      logfile = "test.log"
      report = "report.txt"
      warnings = []
      printonly = False

    args = ["update-configs"]
    modify_action_mock.return_value = MagicMock()
    backup_file_mock.return_value = MagicMock()
    test_mock = MagicMock()
    test_mock.parse_args = lambda: (options, args)
    option_parser_mock.return_value = test_mock

    upgradeHelper.main()
    self.assertEqual(backup_file_mock.call_count, 0)
    self.assertEqual(modify_action_mock.call_count, 1)
    self.assertEqual({"user": options.user, "pass": options.password}, upgradeHelper.Options.API_TOKENS)
    self.assertEqual(options.clustername, upgradeHelper.Options.CLUSTER_NAME)
开发者ID:fanzhidongyzby,项目名称:ambari,代码行数:26,代码来源:TestUpgradeHelper.py


示例9: test_start

  def test_start(self):
    execution_commands = [
      {
        'clusterName': 'cluster',
        'hostName': 'host',
        'alertDefinition': {
          'name': 'alert1'
        }
      }
    ]

    scheduler = AlertSchedulerHandler(TEST_PATH, TEST_PATH, TEST_PATH, TEST_PATH, 5, None, None, None)
    alert_mock = MagicMock()
    alert_mock.interval = Mock(return_value=5)
    alert_mock.collect = Mock()
    alert_mock.set_helpers = Mock()
    scheduler.schedule_definition = MagicMock()
    scheduler._AlertSchedulerHandler__scheduler = MagicMock()
    scheduler._AlertSchedulerHandler__scheduler.running = False
    scheduler._AlertSchedulerHandler__scheduler.start = Mock()
    scheduler._AlertSchedulerHandler__json_to_callable = Mock(return_value=alert_mock)
    scheduler._AlertSchedulerHandler__config_maps = {
      'cluster': {}
    }

    scheduler.start()

    self.assertTrue(scheduler._AlertSchedulerHandler__scheduler.start.called)
    scheduler.schedule_definition.assert_called_with(alert_mock)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:29,代码来源:TestAlertSchedulerHandler.py


示例10: test_rss

def test_rss(container_node, other_container_node, content_node, collections, home_root, guest_user, root):

    everybody_rule = get_or_add_everybody_rule()

    root.children.append(collections)

    collections.access_rule_assocs.append(NodeToAccessRule(ruletype=u"read", rule=everybody_rule))
    collections.container_children.append(container_node)

    container_node.container_children.append(other_container_node)
    other_container_node.content_children.append(content_node)

    struct = {"nodelist": [other_container_node, content_node], "build_response_start": time.time()}
    params = {}
    req = MagicMock()
    req.get_header = lambda x: "localhost:8081"
    req.fullpath = ""
    req.query = ""

    res = struct2rss(req, "", params, None, struct=struct)
    print res
    # TODO: find some way to check XML content properly
    assert res.startswith("""<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" version="2.0">
<channel>""")
    assert "document/testschema" in res
    assert "http://localhost:8081/node?id=" in res
开发者ID:mediatum,项目名称:mediatum,代码行数:27,代码来源:test_handlers.py


示例11: test_repeatRegistration

  def test_repeatRegistration(self, get_os_version_mock, get_os_type_mock,
                              run_mock, installMock, buildMock, Popen_mock):

    registerAndHeartbeat = MagicMock(name="registerAndHeartbeat")
    get_os_type_mock.return_value = "suse"
    get_os_version_mock.return_value = "11"

    self.controller.registerAndHeartbeat = registerAndHeartbeat
    self.controller.run()
    self.assertTrue(installMock.called)
    self.assertTrue(buildMock.called)
    self.controller.registerAndHeartbeat.assert_called_once_with()

    calls = []
    def switchBool():
      if len(calls) == 0:
        self.controller.repeatRegistration = True
        calls.append(1)
      self.controller.repeatRegistration = False

    registerAndHeartbeat.side_effect = switchBool
    self.controller.run()
    self.assertEqual(2, registerAndHeartbeat.call_count)

    self.controller.registerAndHeartbeat = \
      Controller.Controller.registerAndHeartbeat
开发者ID:maduhu,项目名称:HDP2.5-ambari,代码行数:26,代码来源:TestController.py


示例12: xml_fixture

def xml_fixture(parent_node, content_node):
    global mock
    parent_node["testvalue"] = "1001"
    content_node["testvalue"] = "1002"
    struct = {"nodelist": [parent_node, content_node],
              "build_response_start": time.time(),
              "status": "ok",
              "dataready": "0.1",
              "retrievaldate": datetime.datetime.now().isoformat(),
              "sortfield": "sortfield",
              "sortdirection": "up",
              "timetable": [],
              "result_shortlist": []}
    params = {}
    if not mock:
        req = MagicMock()
        mock = req
    else:
        req = mock
    req.get_header = lambda x: "localhost:8081"
    req.fullpath = ""
    req.query = ""

    MetadatatypeFactory(name=u"directory")
    MetadatatypeFactory(name=u"testschema")

    return struct, req, params
开发者ID:mediatum,项目名称:mediatum,代码行数:27,代码来源:test_handlers.py


示例13: test_main

  def test_main(self, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock,
                parse_args_mock, join_mock, start_mock, Controller_init_mock, try_to_connect_mock,
                update_log_level_mock, daemonize_mock, perform_prestart_checks_mock,
                resolve_ambari_config_mock, stop_mock, bind_signal_handlers_mock,
                setup_logging_mock, socket_mock):
    data_clean_init_mock.return_value = None
    Controller_init_mock.return_value = None
    ping_port_init_mock.return_value = None
    options = MagicMock()
    parse_args_mock.return_value = (options, MagicMock)

    #testing call without command-line arguments
    main.main()

    self.assertTrue(setup_logging_mock.called)
    self.assertTrue(bind_signal_handlers_mock.called)
    self.assertTrue(stop_mock.called)
    self.assertTrue(resolve_ambari_config_mock.called)
    self.assertTrue(perform_prestart_checks_mock.called)
    self.assertTrue(daemonize_mock.called)
    self.assertTrue(update_log_level_mock.called)
    try_to_connect_mock.assert_called_once_with(ANY, -1, ANY)
    self.assertTrue(start_mock.called)
    self.assertTrue(data_clean_init_mock.called)
    self.assertTrue(data_clean_start_mock.called)
    self.assertTrue(ping_port_init_mock.called)
    self.assertTrue(ping_port_start_mock.called)

    perform_prestart_checks_mock.reset_mock()

    # Testing call with --expected-hostname parameter
    options.expected_hostname = "test.hst"
    main.main()
    perform_prestart_checks_mock.assert_called_once_with(options.expected_hostname)
开发者ID:duxia,项目名称:ambari,代码行数:34,代码来源:TestMain.py


示例14: test_execution_results

  def test_execution_results(self):
    subproc_mock = self.Subprocess_mockup()
    executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
    _, tmpoutfile = tempfile.mkstemp()
    _, tmperrfile = tempfile.mkstemp()
    
    tmp_file = tempfile.NamedTemporaryFile()    # the structured out file should be preserved across calls to the hooks and script.
    tmpstructuredoutfile = tmp_file.name
    tmp_file.close()

    PYTHON_TIMEOUT_SECONDS =  5

    def launch_python_subprocess_method(command, tmpout, tmperr):
      subproc_mock.tmpout = tmpout
      subproc_mock.tmperr = tmperr
      return subproc_mock
    executor.launch_python_subprocess = launch_python_subprocess_method
    runShellKillPgrp_method = MagicMock()
    runShellKillPgrp_method.side_effect = lambda python : python.terminate()
    executor.runShellKillPgrp = runShellKillPgrp_method
    subproc_mock.returncode = 0
    subproc_mock.should_finish_event.set()
    callback_method = MagicMock()
    result = executor.run_file("file", ["arg1", "arg2"],
                               tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
                               tmpstructuredoutfile, callback_method, "1-1")
    self.assertEquals(result, {'exitcode': 0, 'stderr': '', 'stdout': '',
                               'structuredOut': {}})
    self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:29,代码来源:TestPythonExecutor.py


示例15: test_attribute_command_escaping

  def test_attribute_command_escaping(self, popen_mock):
    expected_command0 = "arg1 arg2 'quoted arg'"
    expected_command1 = "arg1 arg2 'command \"arg\"'"
    expected_command2 = 'arg1 arg2 \'command \'"\'"\'arg\'"\'"\'\''
    expected_command3 = "arg1 arg2 'echo `ls /root`'"
    expected_command4 = "arg1 arg2 '$ROOT'"
    expected_command5 = "arg1 arg2 '`ls /root`'"

    subproc_mock = MagicMock()
    subproc_mock.returncode = 0
    popen_mock.return_value = subproc_mock

    with Environment("/") as env:
      Execute(('arg1', 'arg2', 'quoted arg'),
      )
      Execute(('arg1', 'arg2', 'command "arg"'),
      )
      Execute(('arg1', 'arg2', "command 'arg'"),
      )
      Execute(('arg1', 'arg2', "echo `ls /root`"),
      )
      Execute(('arg1', 'arg2', "$ROOT"),
      )
      Execute(('arg1', 'arg2', "`ls /root`"),
      )

    self.assertEqual(popen_mock.call_args_list[0][0][0][3], expected_command0)
    self.assertEqual(popen_mock.call_args_list[1][0][0][3], expected_command1)
    self.assertEqual(popen_mock.call_args_list[2][0][0][3], expected_command2)
    self.assertEqual(popen_mock.call_args_list[3][0][0][3], expected_command3)
    self.assertEqual(popen_mock.call_args_list[4][0][0][3], expected_command4)
    self.assertEqual(popen_mock.call_args_list[5][0][0][3], expected_command5)
开发者ID:LXiong,项目名称:slider,代码行数:32,代码来源:TestExecuteResource.py


示例16: test_watchdog_1

  def test_watchdog_1(self, kill_process_with_children_mock):
    """
    Tests whether watchdog works
    """
    subproc_mock = self.Subprocess_mockup()
    executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
    _, tmpoutfile = tempfile.mkstemp()
    _, tmperrfile = tempfile.mkstemp()
    _, tmpstrucout = tempfile.mkstemp()
    PYTHON_TIMEOUT_SECONDS = 0.1
    kill_process_with_children_mock.side_effect = lambda pid : subproc_mock.terminate()

    def launch_python_subprocess_method(command, tmpout, tmperr):
      subproc_mock.tmpout = tmpout
      subproc_mock.tmperr = tmperr
      return subproc_mock
    executor.launch_python_subprocess = launch_python_subprocess_method
    runShellKillPgrp_method = MagicMock()
    runShellKillPgrp_method.side_effect = lambda python : python.terminate()
    executor.runShellKillPgrp = runShellKillPgrp_method
    subproc_mock.returncode = None
    callback_method = MagicMock()
    thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
      ["arg1", "arg2"], tmpoutfile, tmperrfile,
      PYTHON_TIMEOUT_SECONDS, tmpstrucout, callback_method, '1'))
    thread.start()
    time.sleep(0.1)
    subproc_mock.finished_event.wait()
    self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
    self.assertTrue(callback_method.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:30,代码来源:TestPythonExecutor.py


示例17: test_request

  def test_request(self, connect_mock):
    httpsconn_mock = MagicMock(create = True)
    self.cachedHTTPSConnection.httpsconn = httpsconn_mock

    dummy_request = MagicMock(create = True)
    dummy_request.get_method.return_value = "dummy_get_method"
    dummy_request.get_full_url.return_value = "dummy_full_url"
    dummy_request.get_data.return_value = "dummy_get_data"
    dummy_request.headers = "dummy_headers"

    responce_mock = MagicMock(create = True)
    responce_mock.read.return_value = "dummy responce"
    httpsconn_mock.getresponse.return_value = responce_mock

    # Testing normal case
    responce = self.cachedHTTPSConnection.request(dummy_request)

    self.assertEqual(responce, responce_mock.read.return_value)
    httpsconn_mock.request.assert_called_once_with(
      dummy_request.get_method.return_value,
      dummy_request.get_full_url.return_value,
      dummy_request.get_data.return_value,
      dummy_request.headers)

    # Testing case of exception
    try:
      def side_eff():
        raise Exception("Dummy exception")
      httpsconn_mock.read.side_effect = side_eff
      responce = self.cachedHTTPSConnection.request(dummy_request)
      self.fail("Should raise IOError")
    except Exception, err:
      # Expected
      pass
开发者ID:adamosloizou,项目名称:fiware-cosmos-ambari,代码行数:34,代码来源:TestSecurity.py


示例18: test_execute_status_command

  def test_execute_status_command(self, CustomServiceOrchestrator_mock,
                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
                                  requestComponentStatus_mock, read_stack_version_mock,
                                  status_update_callback):
    CustomServiceOrchestrator_mock.return_value = None
    dummy_controller = MagicMock()
    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)

    build_mock.return_value = {'dummy report': '' }

    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())

    requestComponentStatus_mock.reset_mock()
    requestComponentStatus_mock.return_value = {'exitcode': 0 }

    requestComponentSecurityState_mock.reset_mock()
    requestComponentSecurityState_mock.return_value = 'UNKNOWN'

    actionQueue.execute_status_command(self.status_command)
    report = actionQueue.result()
    expected = {'dummy report': '',
                'securityState' : 'UNKNOWN'}

    self.assertEqual(len(report['componentStatus']), 1)
    self.assertEqual(report['componentStatus'][0], expected)
    self.assertTrue(requestComponentStatus_mock.called)
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:26,代码来源:TestActionQueue.py


示例19: test_cancel_backgound_command

  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
                                      kill_process_with_children_mock):
    FileCache_mock.return_value = None
    FileCache_mock.cache_dir = MagicMock()
    resolve_hook_script_path_mock.return_value = None
#     shell.kill_process_with_children = MagicMock()
    dummy_controller = MagicMock()
    cfg = AmbariConfig().getConfig()
    cfg.set('agent', 'tolerate_download_failures', 'true')
    cfg.set('agent', 'prefix', '.')
    cfg.set('agent', 'cache_dir', 'background_tasks')

    actionQueue = ActionQueue(cfg, dummy_controller)

    dummy_controller.actionQueue = actionQueue
    orchestrator = CustomServiceOrchestrator(cfg, dummy_controller)
    orchestrator.file_cache = MagicMock()
    def f (a, b):
      return ""
    orchestrator.file_cache.get_service_base_dir = f
    actionQueue.customServiceOrchestrator = orchestrator

    import TestActionQueue
    import copy

    TestActionQueue.patch_output_file(orchestrator.python_executor)
    orchestrator.python_executor.prepare_process_result = MagicMock()
    orchestrator.dump_command_to_json = MagicMock()

    lock = threading.RLock()
    complete_done = threading.Condition(lock)

    complete_was_called = {}
    def command_complete_w(process_condenced_result, handle):
      with lock:
        complete_was_called['visited']= ''
        complete_done.wait(3)

    actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None)
    execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command)
    actionQueue.put([execute_command])
    actionQueue.processBackgroundQueueSafeEmpty()

    time.sleep(.1)

    orchestrator.cancel_command(19,'')
    self.assertTrue(kill_process_with_children_mock.called)
    kill_process_with_children_mock.assert_called_with(33)

    with lock:
      complete_done.notifyAll()

    with lock:
      self.assertTrue(complete_was_called.has_key('visited'))

    time.sleep(.1)

    runningCommand = actionQueue.commandStatuses.get_command_status(19)
    self.assertTrue(runningCommand is not None)
    self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS)
开发者ID:fanzhidongyzby,项目名称:ambari,代码行数:60,代码来源:TestCustomServiceOrchestrator.py


示例20: test_action_create_properties_simple

  def test_action_create_properties_simple(self,
                                           time_asctime_mock,
                                           os_path_isdir_mock,
                                           os_path_exists_mock,
                                           open_mock,
                                           ensure_mock):
    """
    Tests if 'action_create' - creates new non existent file and write proper data
    1) properties={"Some property":"Some value"}
    2) dir=None
    """

    os_path_isdir_mock.side_effect = [False, True]
    os_path_exists_mock.return_value = False
    time_asctime_mock.return_value = 777

    result_file = MagicMock()
    open_mock.return_value = result_file

    with Environment('/') as env:
      PropertiesFile('/dir/new_file',
                     properties={'property1': 'value1'},
      )

    open_mock.assert_called_with('/dir/new_file',
                                 'wb')
    result_file.__enter__().write.assert_called_with(u'# Generated by Apache Ambari. 777\n    \nproperty1=value1\n    \n')
    self.assertEqual(open_mock.call_count, 1)
    ensure_mock.assert_called()
开发者ID:duxia,项目名称:ambari,代码行数:29,代码来源:TestPropertiesFileResource.py



注:本文中的mock.mock.MagicMock类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python patch.object函数代码示例发布时间:2022-05-27
下一篇:
Python mock.patch函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap