• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python contextutil.temporary_file函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twitter.common.contextutil.temporary_file函数的典型用法代码示例。如果您正苦于以下问题:Python temporary_file函数的具体用法?Python temporary_file怎么用?Python temporary_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了temporary_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_load_json

def test_load_json():
    with temporary_file() as fp:
        fp.write(MESOS_CONFIG)
        fp.flush()
        env = AuroraConfigLoader.load(fp.name)
        job = env["jobs"][0]
    with temporary_file() as fp:
        fp.write(json.dumps(job.get()))
        fp.flush()
        new_job = AuroraConfigLoader.load_json(fp.name)
        assert new_job == job
开发者ID:Empia,项目名称:incubator-aurora,代码行数:11,代码来源:test_loader.py


示例2: test_override_single_variable

def test_override_single_variable():
  with temporary_file() as output:
    # test that the override takes place
    with environment_as(HORK = 'BORK'):
      subprocess.Popen([sys.executable, '-c', 'import os; print os.environ["HORK"]'],
        stdout=output).wait()
      output.seek(0)
      assert output.read() == 'BORK\n'

    # test that the variable is cleared
    with temporary_file() as new_output:
      subprocess.Popen([sys.executable, '-c', 'import os; print os.environ.has_key("HORK")'],
        stdout=new_output).wait()
      new_output.seek(0)
      assert new_output.read() == 'False\n'
开发者ID:billwei,项目名称:commons,代码行数:15,代码来源:test_environment_as.py


示例3: test_simple_successful_create_job_open_page

  def test_simple_successful_create_job_open_page(self):
    mock_context = FakeAuroraCommandContext()
    with contextlib.nested(
        # TODO(maxim): Patching threading.Event with all possible namespace/patch/mock
        #              combinations did not produce the desired effect. Investigate why (AURORA-510)
        patch('threading._Event.wait'),
        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
      mock_query = self.create_query()
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.PENDING))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))
      api = mock_context.get_api('west')
      api.create_job.return_value = self.get_createjob_response()
      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        cmd = AuroraCommandLine()
        result = cmd.execute(['job', 'create', '--wait-until=RUNNING', '--open-browser',
                     'west/bozo/test/hello',
            fp.name])
        assert result == EXIT_OK

      self.assert_create_job_called(api)
      self.assert_scheduler_called(api, mock_query, 2)
      assert mock_context.showed_urls == ["http://something_or_other/scheduler/bozo/test/hello"]
开发者ID:davelester,项目名称:incubator-aurora,代码行数:26,代码来源:test_create.py


示例4: test_compute_stats

 def test_compute_stats(self):
   executed_goals = {'resolve-idl':{ 'idl': [0.00072813034057617188],
                                     'extract': [3.0994415283203125e-06] },
                     "thriftstore-codegen": {'thriftstore-codegen': [0.0001010894775390625] },
                     "gen": {'tweetypie-fetch': [0.028632879257202148], 'thrift': [0.016566991806030273],
                             'protoc': [0.0038318634033203125], 'antlr': [0.0020389556884765625],
                             'thriftstore-dml-gen': [0.0022170543670654297],
                             'tweetypie-clean': [0.0054290294647216797] },
                     "resolve": {'ivy': [0.00097703933715820312] },
                     "compile": {'checkstyle': [0.00057005882263183594]},
                     "test": {'junit': [9.1075897216796875e-05], 'specs': [0.0015749931335449219]}
                   }
   with temporary_file() as temp_fd:
     bs = BuildTimeStats(temp_fd.name)
     actual_timings = bs.compute_stats(executed_goals, 100)
     expected_timings =[{'phase': 'resolve', 'total': 0.00097703933715820312, 'goal': 'ivy'},
                      {'phase': 'resolve-idl', 'total': 0.00072813034057617188, 'goal': 'idl'},
                      {'phase': 'resolve-idl', 'total': 3.0994415283203125e-06, 'goal': 'extract'},
                      {'phase': 'resolve-idl', 'total': 0.00073122978210449219, 'goal': 'phase_total'},
                      {'phase': 'compile', 'total': 0.00057005882263183594, 'goal': 'checkstyle'},
                      {'phase': 'thriftstore-codegen', 'total': 0.0001010894775390625, 'goal': 'thriftstore-codegen'},
                      {'phase': 'test', 'total': 9.1075897216796875e-05, 'goal': 'junit'},
                      {'phase': 'test', 'total': 0.0015749931335449219, 'goal': 'specs'},
                      {'phase': 'test', 'total': 0.0016660690307617188, 'goal': 'phase_total'},
                      {'phase': 'gen', 'total': 0.0038318634033203125, 'goal': 'protoc'},
                      {'phase': 'gen', 'total': 0.0020389556884765625, 'goal': 'antlr'},
                      {'phase': 'gen', 'total': 0.028632879257202148, 'goal': 'tweetypie-fetch'},
                      {'phase': 'gen', 'total': 0.0054290294647216797, 'goal': 'tweetypie-clean'},
                      {'phase': 'gen', 'total': 0.0022170543670654297, 'goal': 'thriftstore-dml-gen'},
                      {'phase': 'gen', 'total': 0.016566991806030273, 'goal': 'thrift'},
                      {'phase': 'gen', 'total': 0.058716773986816406, 'goal': 'phase_total'},
                      {'phase': 'cmd_total', 'total': 100, 'goal': 'cmd_total'}]
     self.assertEqual(actual_timings, expected_timings )
开发者ID:benhuang-zh,项目名称:commons,代码行数:33,代码来源:buildtimestats_test.py


示例5: test_perform_maintenance_partial_sla_failure

  def test_perform_maintenance_partial_sla_failure(self, mock_check_sla, mock_start_maintenance,
                               mock_drain_hosts, mock_operate_on_hosts, mock_complete_maintenance):
    mock_callback = mock.Mock()
    failed_host = 'us-west-001.example.com'
    mock_check_sla.return_value = set([failed_host])
    drained_hosts = set(TEST_HOSTNAMES) - set([failed_host])
    maintenance = HostMaintenance(DEFAULT_CLUSTER, 'quiet')

    with temporary_file() as fp:
      with group_by_rack():
        maintenance.perform_maintenance(
            TEST_HOSTNAMES,
            callback=mock_callback,
            grouping_function='by_rack',
            output_file=fp.name)

        with open(fp.name, 'r') as fpr:
          content = fpr.read()
          assert failed_host in content

        mock_start_maintenance.assert_called_once_with(TEST_HOSTNAMES)
        assert mock_check_sla.call_count == 1
        assert mock_drain_hosts.call_count == 1
        assert mock_drain_hosts.call_args_list == [mock.call(Hosts(drained_hosts))]
        assert mock_operate_on_hosts.call_count == 1
        assert mock_operate_on_hosts.call_args_list == [
            mock.call(Hosts(drained_hosts), mock_callback)]
        assert mock_complete_maintenance.call_count == 2
        assert mock_complete_maintenance.call_args_list == [
            mock.call(Hosts(set([failed_host]))), mock.call(Hosts(drained_hosts))]
开发者ID:dhardy92,项目名称:incubator-aurora,代码行数:30,代码来源:test_host_maintenance.py


示例6: use_cached_files

  def use_cached_files(self, cache_key):
    # This implementation fetches the appropriate tarball and extracts it.
    remote_path = self._remote_path_for_key(cache_key)
    try:
      # Send an HTTP request for the tarball.
      response = self._request('GET', remote_path)
      if response is None:
        return None

      done = False
      with temporary_file() as outfile:
        total_bytes = 0
        # Read the data in a loop.
        while not done:
          data = response.read(self.READ_SIZE)
          outfile.write(data)
          if len(data) < self.READ_SIZE:
            done = True
          total_bytes += len(data)
        outfile.close()
        self.log.debug('Read %d bytes from artifact cache at %s' %
                       (total_bytes,self._url_string(remote_path)))

        # Extract the tarfile.
        artifact = TarballArtifact(self.artifact_root, outfile.name, self.compress)
        artifact.extract()
        return artifact
    except Exception as e:
      self.log.warn('Error while reading from remote artifact cache: %s' % e)
      return None
开发者ID:govindkabra,项目名称:pants,代码行数:30,代码来源:restful_artifact_cache.py


示例7: use_cached_files

 def use_cached_files(self, cache_key):
   path = self._path_for_key(cache_key)
   response = self._request('GET', path)
   if response is None:
     return False
   expected_size = int(response.getheader('content-length', -1))
   if expected_size == -1:
     raise Exception, 'No content-length header in HTTP response'
   read_size = 4 * 1024 * 1024 # 4 MB
   done = False
   if self.context:
     self.context.log.info('Reading %d bytes' % expected_size)
   with temporary_file() as outfile:
     total_bytes = 0
     while not done:
       data = response.read(read_size)
       outfile.write(data)
       if len(data) < read_size:
         done = True
       total_bytes += len(data)
       if self.context:
         self.context.log.debug('Read %d bytes' % total_bytes)
     outfile.close()
     if total_bytes != expected_size:
       raise Exception, 'Read only %d bytes from %d expected' % (total_bytes, expected_size)
     mode = 'r:bz2' if self.compress else 'r'
     with open_tar(outfile.name, mode) as tarfile:
       tarfile.extractall(self.artifact_root)
   return True
开发者ID:JoeEnnever,项目名称:commons,代码行数:29,代码来源:artifact_cache.py


示例8: test_simple_successful_create_job_with_bindings

  def test_simple_successful_create_job_with_bindings(self):
    """Run a test of the "create" command against a mocked-out API:
    Verifies that the creation command sends the right API RPCs, and performs the correct
    tests on the result."""

    mock_context = FakeAuroraCommandContext()
    with contextlib.nested(
        patch('threading._Event.wait'),
        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
      mock_query = self.create_query()
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.PENDING))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))
      api = mock_context.get_api('west')
      api.create_job.return_value = self.get_createjob_response()

      # This is the real test: invoke create as if it had been called by the command line.
      with temporary_file() as fp:
        fp.write(self.get_unbound_test_config())
        fp.flush()
        cmd = AuroraCommandLine()
        cmd.execute(['job', 'create', '--wait-until=RUNNING', '--bind', 'cluster_binding=west',
            '--bind', 'instances_binding=20', '--bind', 'TEST_BATCH=1',
            'west/bozo/test/hello',
            fp.name])

      # Now check that the right API calls got made.
      # Check that create_job was called exactly once, with an AuroraConfig parameter.
      self.assert_create_job_called(api)
      self.assert_scheduler_called(api, mock_query, 2)
开发者ID:davelester,项目名称:incubator-aurora,代码行数:31,代码来源:test_create.py


示例9: test_parse_script

 def test_parse_script(self, mock_subprocess):
   with temporary_file() as fp:
     mock_popen = mock.Mock()
     mock_popen.wait.return_value = 0
     mock_subprocess.Popen.return_value = mock_popen
     parse_script(fp.name)('h1')
     assert mock_popen.wait.call_count == 1
开发者ID:apache,项目名称:aurora,代码行数:7,代码来源:test_admin_util.py


示例10: test_kill_job_api_level_with_shards

  def test_kill_job_api_level_with_shards(self):
    """Test kill client-side API logic."""
    mock_options = self.setup_mock_options()
    mock_options.shards = [0, 1, 2, 3]
    mock_config = Mock()
    mock_config.hooks = []
    mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
    (mock_api, mock_scheduler) = self.setup_mock_api()
    mock_api_factory = Mock(return_value=mock_api)
    mock_scheduler.killTasks.return_value = self.get_kill_job_response()
    with contextlib.nested(
        patch('apache.aurora.client.factory.make_client_factory', return_value=mock_api_factory),
        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler),
        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
        patch('twitter.common.app.get_options', return_value=mock_options),
        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
            mock_api_factory_patch,
            mock_scheduler_proxy_class,
            mock_clusters,
            options, mock_get_job_config):
      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        kill(['west/mchucarroll/test/hello', fp.name], mock_options)

      # Now check that the right API calls got made.
      self.assert_scheduler_called(mock_api)
      assert mock_scheduler.killTasks.call_count == 1
      query = self.get_expected_task_query([0, 1, 2, 3])
      mock_scheduler.killTasks.assert_called_with(query, None)
开发者ID:betepahos,项目名称:incubator-aurora,代码行数:30,代码来源:test_kill.py


示例11: test_simple_successful_kill_job

  def test_simple_successful_kill_job(self):
    """Run a test of the "kill" command against a mocked-out API:
    Verifies that the kill command sends the right API RPCs, and performs the correct
    tests on the result."""
    mock_options = self.setup_mock_options()
    mock_config = Mock()
    mock_api_factory = self.setup_mock_api_factory()
    with contextlib.nested(
        patch('apache.aurora.client.commands.core.make_client_factory',
            return_value=mock_api_factory),
        patch('twitter.common.app.get_options', return_value=mock_options),
        patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
            mock_make_client_factory,
            options, mock_get_job_config):
      mock_api = mock_api_factory.return_value

      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        kill(['west/mchucarroll/test/hello', fp.name], mock_options)

      # Now check that the right API calls got made.
      self.assert_kill_job_called(mock_api)
      mock_api.kill_job.assert_called_with(
        AuroraJobKey(cluster=self.TEST_CLUSTER, role=self.TEST_ROLE, env=self.TEST_ENV,
            name=self.TEST_JOB), None, config=mock_config)
      self.assert_scheduler_called(mock_api)
      assert mock_make_client_factory.call_count == 1
开发者ID:betepahos,项目名称:incubator-aurora,代码行数:28,代码来源:test_kill.py


示例12: test_failed_create_job_with_incomplete_bindings

  def test_failed_create_job_with_incomplete_bindings(self):
    """Run a test of the "create" command against a mocked-out API:
    Verifies that the creation command sends the right API RPCs, and performs the correct
    tests on the result."""

    mock_context = FakeAuroraCommandContext()
    with contextlib.nested(
        patch('threading._Event.wait'),
        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):

      # This is the real test: invoke create as if it had been called by the command line.
      with temporary_file() as fp:
        fp.write(self.get_unbound_test_config())
        fp.flush()
        cmd = AuroraCommandLine()
        result = cmd.execute(['job', 'create', '--wait-until=RUNNING',
            '--bind', 'cluster_binding=west',
           'west/bozo/test/hello',
            fp.name])
        assert result == EXIT_INVALID_CONFIGURATION
      assert mock_context.get_out() == []
      assert mock_context.get_err() == [
            "Error loading configuration: "
            "TypeCheck(FAILED): MesosJob[update_config] failed: "
            "UpdateConfig[batch_size] failed: u'{{TEST_BATCH}}' not an integer"]
开发者ID:kidaa,项目名称:aurora,代码行数:25,代码来源:test_create.py


示例13: test_plugin_runs_in_create_job

  def test_plugin_runs_in_create_job(self):
    """Run a test of the "create" command against a mocked-out API:
    Verifies that the creation command sends the right API RPCs, and performs the correct
    tests on the result."""

    # We'll patch out create_context, which will give us a fake context
    # object, and everything can be stubbed through that.
    mock_context = FakeAuroraCommandContext()
    with patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context):
      # After making the client, create sets up a job monitor.
      # The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
      # the job is created, and once after. So we need to set up mocks for the query results.
      mock_query = self.create_mock_query()
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.INIT))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))
      api = mock_context.get_api('west')
      api.create_job.return_value = self.get_createjob_response()

      # This is the real test: invoke create as if it had been called by the command line.
      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        cmd = AuroraCommandLine()
        cmd.register_plugin(BogusPlugin())
        cmd.execute(['job', 'create', '--bogosity=maximum', '--wait_until=RUNNING',
            'west/bozo/test/hello', fp.name])

      # Now check that the right API calls got made.
      # Check that create_job was called exactly once, with an AuroraConfig parameter.
      self.assert_create_job_called(api)
      self.assert_scheduler_called(api, mock_query, 2)
      # Check that the plugin did its job.
      assert mock_context.bogosity == "maximum"
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:35,代码来源:test_plugins.py


示例14: test_diff_server_error

 def test_diff_server_error(self):
   """Test the diff command if the user passes a config with an error in it."""
   mock_options = self.setup_mock_options()
   (mock_api, mock_scheduler_proxy) = self.create_mock_api()
   mock_scheduler_proxy.getTasksStatus.return_value = self.create_failed_status_response()
   self.setup_populate_job_config(mock_scheduler_proxy)
   with contextlib.nested(
       patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
       patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
       patch('twitter.common.app.get_options', return_value=mock_options),
       patch('subprocess.call', return_value=0),
       patch('json.loads', return_value=Mock())) as (
           mock_scheduler_proxy_class,
           mock_clusters,
           options,
           subprocess_patch,
           json_patch):
     with temporary_file() as fp:
       fp.write(self.get_valid_config())
       fp.flush()
       cmd = AuroraCommandLine()
       result = cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])
       assert result == EXIT_INVALID_PARAMETER
       # In this error case, we should have called the server getTasksStatus;
       # but since it fails, we shouldn't call populateJobConfig or subprocess.
       mock_scheduler_proxy.getTasksStatus.assert_called_with(
           TaskQuery(jobName='hello', environment='test', owner=Identity(role='bozo'),
               statuses=ACTIVE_STATES))
       assert mock_scheduler_proxy.populateJobConfig.call_count == 0
       assert subprocess_patch.call_count == 0
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:30,代码来源:test_diff.py


示例15: test_diff_invalid_config

 def test_diff_invalid_config(self):
   """Test the diff command if the user passes a config with an error in it."""
   mock_options = self.setup_mock_options()
   (mock_api, mock_scheduler_proxy) = self.create_mock_api()
   mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
   self.setup_populate_job_config(mock_scheduler_proxy)
   with contextlib.nested(
       patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
       patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
       patch('twitter.common.app.get_options', return_value=mock_options),
       patch('subprocess.call', return_value=0),
       patch('json.loads', return_value=Mock())) as (
           mock_scheduler_proxy_class,
           mock_clusters,
           options,
           subprocess_patch,
           json_patch):
     with temporary_file() as fp:
       fp.write(self.get_invalid_config('stupid="me"',))
       fp.flush()
       cmd = AuroraCommandLine()
       result = cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])
       assert result == EXIT_INVALID_CONFIGURATION
       assert mock_scheduler_proxy.getTasksStatus.call_count == 0
       assert mock_scheduler_proxy.populateJobConfig.call_count == 0
       assert subprocess_patch.call_count == 0
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:26,代码来源:test_diff.py


示例16: test_successful_diff

  def test_successful_diff(self):
    """Test the diff command."""
    (mock_api, mock_scheduler_proxy) = self.create_mock_api()
    with contextlib.nested(
        patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
        patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
        patch('subprocess.call', return_value=0),
        patch('json.loads', return_value=Mock())) as (_, _, subprocess_patch, _):
      mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
      self.setup_populate_job_config(mock_scheduler_proxy)
      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        cmd = AuroraCommandLine()
        cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])

        # Diff should get the task status, populate a config, and run diff.
        mock_scheduler_proxy.getTasksStatus.assert_called_with(
            TaskQuery(jobName='hello', environment='test', owner=Identity(role='bozo'),
                statuses=ACTIVE_STATES))
        assert mock_scheduler_proxy.populateJobConfig.call_count == 1
        assert isinstance(mock_scheduler_proxy.populateJobConfig.call_args[0][0], JobConfiguration)
        assert (mock_scheduler_proxy.populateJobConfig.call_args[0][0].key ==
            JobKey(environment=u'test', role=u'bozo', name=u'hello'))
        # Subprocess should have been used to invoke diff with two parameters.
        assert subprocess_patch.call_count == 1
        assert len(subprocess_patch.call_args[0][0]) == 3
        assert subprocess_patch.call_args[0][0][0] == os.environ.get('DIFF_VIEWER', 'diff')
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:28,代码来源:test_diff.py


示例17: test_simple_successful_create_job_output

  def test_simple_successful_create_job_output(self):
    """Run a test of the "create" command against a mocked-out API:
    Verifies that the creation command generates the correct output.
    """
    mock_context = FakeAuroraCommandContext()
    with contextlib.nested(
        patch('threading._Event.wait'),
        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.PENDING))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))
      api = mock_context.get_api('west')
      api.create_job.return_value = self.get_createjob_response()

      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        cmd = AuroraCommandLine()
        result = cmd.execute(['job', 'create', '--wait-until=RUNNING', 'west/bozo/test/hello',
            fp.name])
        assert result == EXIT_OK
      assert mock_context.get_out() == [
          "Job create succeeded: job url=http://something_or_other/scheduler/bozo/test/hello"]
      assert mock_context.get_err() == []
开发者ID:davelester,项目名称:incubator-aurora,代码行数:27,代码来源:test_create.py


示例18: test_create_job_startup_fails

  def test_create_job_startup_fails(self):
    mock_context = FakeAuroraCommandContext()
    with contextlib.nested(
        patch('threading._Event.wait'),
        patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.PENDING))
      mock_context.add_expected_status_query_result(
        self.create_mock_status_query_result(ScheduleStatus.RUNNING))

      # We need to override the side_effect behavior of check_status in the context.
      def check_status_side_effect(*args):
        return self.create_error_response()

      mock_context.get_api("west").check_status.side_effect = check_status_side_effect

      api = mock_context.get_api('west')
      api.create_job.return_value = self.get_createjob_response()

      with temporary_file() as fp:
        fp.write(self.get_valid_config())
        fp.flush()
        cmd = AuroraCommandLine()
        result = cmd.execute(['job', 'create', '--wait-until=RUNNING', 'west/bozo/test/hello',
            fp.name])
        assert result == EXIT_COMMAND_FAILURE
      assert mock_context.get_out() == []
      assert mock_context.get_err() == ["Error occurred while creating job west/bozo/test/hello"]
开发者ID:davelester,项目名称:incubator-aurora,代码行数:28,代码来源:test_create.py


示例19: do_test_artifact_cache

    def do_test_artifact_cache(self, artifact_cache):
        key = CacheKey("muppet_key", "fake_hash", 42, [])
        with temporary_file(artifact_cache.artifact_root) as f:
            # Write the file.
            f.write(TEST_CONTENT1)
            path = f.name
            f.close()

            # Cache it.
            self.assertFalse(artifact_cache.has(key))
            self.assertFalse(bool(artifact_cache.use_cached_files(key)))
            artifact_cache.insert(key, [path])
            self.assertTrue(artifact_cache.has(key))

            # Stomp it.
            with open(path, "w") as outfile:
                outfile.write(TEST_CONTENT2)

            # Recover it from the cache.
            self.assertTrue(bool(artifact_cache.use_cached_files(key)))

            # Check that it was recovered correctly.
            with open(path, "r") as infile:
                content = infile.read()
            self.assertEquals(content, TEST_CONTENT1)

            # Delete it.
            artifact_cache.delete(key)
            self.assertFalse(artifact_cache.has(key))
开发者ID:foursquare,项目名称:twitter-commons,代码行数:29,代码来源:test_artifact_cache.py


示例20: test_perform_maintenance_hosts_failed_default_sla

  def test_perform_maintenance_hosts_failed_default_sla(self):
    with temporary_file() as fp:
      mock_options = self.make_mock_options()
      mock_options.post_drain_script = None
      mock_options.grouping = 'by_host'
      mock_options.unsafe_hosts_filename = fp.name

      mock_api, mock_scheduler_proxy = self.create_mock_api()
      mock_scheduler_proxy.startMaintenance.return_value = self.create_start_maintenance_result()
      mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result()
      mock_vector = self.create_mock_probe_hosts_vector([
          self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None),
          self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None),
          self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None)
      ])

      with contextlib.nested(
          patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
          patch('apache.aurora.client.api.sla.Sla.get_domain_uptime_vector',
                return_value=mock_vector),
          patch('apache.aurora.client.commands.maintenance.CLUSTERS', new=self.TEST_CLUSTERS),
          patch('twitter.common.app.get_options', return_value=mock_options)):
        host_drain([self.TEST_CLUSTER])

        mock_scheduler_proxy.startMaintenance.assert_called_with(Hosts(set(self.HOSTNAMES)))
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:25,代码来源:test_maintenance.py



注:本文中的twitter.common.contextutil.temporary_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dirutil.safe_delete函数代码示例发布时间:2022-05-27
下一篇:
Python contextutil.temporary_dir函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap