本文整理汇总了Python中twitter.common.contextutil.temporary_file函数的典型用法代码示例。如果您正苦于以下问题:Python temporary_file函数的具体用法?Python temporary_file怎么用?Python temporary_file使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了temporary_file函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_load_json
def test_load_json():
with temporary_file() as fp:
fp.write(MESOS_CONFIG)
fp.flush()
env = AuroraConfigLoader.load(fp.name)
job = env["jobs"][0]
with temporary_file() as fp:
fp.write(json.dumps(job.get()))
fp.flush()
new_job = AuroraConfigLoader.load_json(fp.name)
assert new_job == job
开发者ID:Empia,项目名称:incubator-aurora,代码行数:11,代码来源:test_loader.py
示例2: test_override_single_variable
def test_override_single_variable():
with temporary_file() as output:
# test that the override takes place
with environment_as(HORK = 'BORK'):
subprocess.Popen([sys.executable, '-c', 'import os; print os.environ["HORK"]'],
stdout=output).wait()
output.seek(0)
assert output.read() == 'BORK\n'
# test that the variable is cleared
with temporary_file() as new_output:
subprocess.Popen([sys.executable, '-c', 'import os; print os.environ.has_key("HORK")'],
stdout=new_output).wait()
new_output.seek(0)
assert new_output.read() == 'False\n'
开发者ID:billwei,项目名称:commons,代码行数:15,代码来源:test_environment_as.py
示例3: test_simple_successful_create_job_open_page
def test_simple_successful_create_job_open_page(self):
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
# TODO(maxim): Patching threading.Event with all possible namespace/patch/mock
# combinations did not produce the desired effect. Investigate why (AURORA-510)
patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_query = self.create_query()
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.PENDING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'create', '--wait-until=RUNNING', '--open-browser',
'west/bozo/test/hello',
fp.name])
assert result == EXIT_OK
self.assert_create_job_called(api)
self.assert_scheduler_called(api, mock_query, 2)
assert mock_context.showed_urls == ["http://something_or_other/scheduler/bozo/test/hello"]
开发者ID:davelester,项目名称:incubator-aurora,代码行数:26,代码来源:test_create.py
示例4: test_compute_stats
def test_compute_stats(self):
executed_goals = {'resolve-idl':{ 'idl': [0.00072813034057617188],
'extract': [3.0994415283203125e-06] },
"thriftstore-codegen": {'thriftstore-codegen': [0.0001010894775390625] },
"gen": {'tweetypie-fetch': [0.028632879257202148], 'thrift': [0.016566991806030273],
'protoc': [0.0038318634033203125], 'antlr': [0.0020389556884765625],
'thriftstore-dml-gen': [0.0022170543670654297],
'tweetypie-clean': [0.0054290294647216797] },
"resolve": {'ivy': [0.00097703933715820312] },
"compile": {'checkstyle': [0.00057005882263183594]},
"test": {'junit': [9.1075897216796875e-05], 'specs': [0.0015749931335449219]}
}
with temporary_file() as temp_fd:
bs = BuildTimeStats(temp_fd.name)
actual_timings = bs.compute_stats(executed_goals, 100)
expected_timings =[{'phase': 'resolve', 'total': 0.00097703933715820312, 'goal': 'ivy'},
{'phase': 'resolve-idl', 'total': 0.00072813034057617188, 'goal': 'idl'},
{'phase': 'resolve-idl', 'total': 3.0994415283203125e-06, 'goal': 'extract'},
{'phase': 'resolve-idl', 'total': 0.00073122978210449219, 'goal': 'phase_total'},
{'phase': 'compile', 'total': 0.00057005882263183594, 'goal': 'checkstyle'},
{'phase': 'thriftstore-codegen', 'total': 0.0001010894775390625, 'goal': 'thriftstore-codegen'},
{'phase': 'test', 'total': 9.1075897216796875e-05, 'goal': 'junit'},
{'phase': 'test', 'total': 0.0015749931335449219, 'goal': 'specs'},
{'phase': 'test', 'total': 0.0016660690307617188, 'goal': 'phase_total'},
{'phase': 'gen', 'total': 0.0038318634033203125, 'goal': 'protoc'},
{'phase': 'gen', 'total': 0.0020389556884765625, 'goal': 'antlr'},
{'phase': 'gen', 'total': 0.028632879257202148, 'goal': 'tweetypie-fetch'},
{'phase': 'gen', 'total': 0.0054290294647216797, 'goal': 'tweetypie-clean'},
{'phase': 'gen', 'total': 0.0022170543670654297, 'goal': 'thriftstore-dml-gen'},
{'phase': 'gen', 'total': 0.016566991806030273, 'goal': 'thrift'},
{'phase': 'gen', 'total': 0.058716773986816406, 'goal': 'phase_total'},
{'phase': 'cmd_total', 'total': 100, 'goal': 'cmd_total'}]
self.assertEqual(actual_timings, expected_timings )
开发者ID:benhuang-zh,项目名称:commons,代码行数:33,代码来源:buildtimestats_test.py
示例5: test_perform_maintenance_partial_sla_failure
def test_perform_maintenance_partial_sla_failure(self, mock_check_sla, mock_start_maintenance,
mock_drain_hosts, mock_operate_on_hosts, mock_complete_maintenance):
mock_callback = mock.Mock()
failed_host = 'us-west-001.example.com'
mock_check_sla.return_value = set([failed_host])
drained_hosts = set(TEST_HOSTNAMES) - set([failed_host])
maintenance = HostMaintenance(DEFAULT_CLUSTER, 'quiet')
with temporary_file() as fp:
with group_by_rack():
maintenance.perform_maintenance(
TEST_HOSTNAMES,
callback=mock_callback,
grouping_function='by_rack',
output_file=fp.name)
with open(fp.name, 'r') as fpr:
content = fpr.read()
assert failed_host in content
mock_start_maintenance.assert_called_once_with(TEST_HOSTNAMES)
assert mock_check_sla.call_count == 1
assert mock_drain_hosts.call_count == 1
assert mock_drain_hosts.call_args_list == [mock.call(Hosts(drained_hosts))]
assert mock_operate_on_hosts.call_count == 1
assert mock_operate_on_hosts.call_args_list == [
mock.call(Hosts(drained_hosts), mock_callback)]
assert mock_complete_maintenance.call_count == 2
assert mock_complete_maintenance.call_args_list == [
mock.call(Hosts(set([failed_host]))), mock.call(Hosts(drained_hosts))]
开发者ID:dhardy92,项目名称:incubator-aurora,代码行数:30,代码来源:test_host_maintenance.py
示例6: use_cached_files
def use_cached_files(self, cache_key):
# This implementation fetches the appropriate tarball and extracts it.
remote_path = self._remote_path_for_key(cache_key)
try:
# Send an HTTP request for the tarball.
response = self._request('GET', remote_path)
if response is None:
return None
done = False
with temporary_file() as outfile:
total_bytes = 0
# Read the data in a loop.
while not done:
data = response.read(self.READ_SIZE)
outfile.write(data)
if len(data) < self.READ_SIZE:
done = True
total_bytes += len(data)
outfile.close()
self.log.debug('Read %d bytes from artifact cache at %s' %
(total_bytes,self._url_string(remote_path)))
# Extract the tarfile.
artifact = TarballArtifact(self.artifact_root, outfile.name, self.compress)
artifact.extract()
return artifact
except Exception as e:
self.log.warn('Error while reading from remote artifact cache: %s' % e)
return None
开发者ID:govindkabra,项目名称:pants,代码行数:30,代码来源:restful_artifact_cache.py
示例7: use_cached_files
def use_cached_files(self, cache_key):
path = self._path_for_key(cache_key)
response = self._request('GET', path)
if response is None:
return False
expected_size = int(response.getheader('content-length', -1))
if expected_size == -1:
raise Exception, 'No content-length header in HTTP response'
read_size = 4 * 1024 * 1024 # 4 MB
done = False
if self.context:
self.context.log.info('Reading %d bytes' % expected_size)
with temporary_file() as outfile:
total_bytes = 0
while not done:
data = response.read(read_size)
outfile.write(data)
if len(data) < read_size:
done = True
total_bytes += len(data)
if self.context:
self.context.log.debug('Read %d bytes' % total_bytes)
outfile.close()
if total_bytes != expected_size:
raise Exception, 'Read only %d bytes from %d expected' % (total_bytes, expected_size)
mode = 'r:bz2' if self.compress else 'r'
with open_tar(outfile.name, mode) as tarfile:
tarfile.extractall(self.artifact_root)
return True
开发者ID:JoeEnnever,项目名称:commons,代码行数:29,代码来源:artifact_cache.py
示例8: test_simple_successful_create_job_with_bindings
def test_simple_successful_create_job_with_bindings(self):
"""Run a test of the "create" command against a mocked-out API:
Verifies that the creation command sends the right API RPCs, and performs the correct
tests on the result."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_query = self.create_query()
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.PENDING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
# This is the real test: invoke create as if it had been called by the command line.
with temporary_file() as fp:
fp.write(self.get_unbound_test_config())
fp.flush()
cmd = AuroraCommandLine()
cmd.execute(['job', 'create', '--wait-until=RUNNING', '--bind', 'cluster_binding=west',
'--bind', 'instances_binding=20', '--bind', 'TEST_BATCH=1',
'west/bozo/test/hello',
fp.name])
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
self.assert_scheduler_called(api, mock_query, 2)
开发者ID:davelester,项目名称:incubator-aurora,代码行数:31,代码来源:test_create.py
示例9: test_parse_script
def test_parse_script(self, mock_subprocess):
with temporary_file() as fp:
mock_popen = mock.Mock()
mock_popen.wait.return_value = 0
mock_subprocess.Popen.return_value = mock_popen
parse_script(fp.name)('h1')
assert mock_popen.wait.call_count == 1
开发者ID:apache,项目名称:aurora,代码行数:7,代码来源:test_admin_util.py
示例10: test_kill_job_api_level_with_shards
def test_kill_job_api_level_with_shards(self):
"""Test kill client-side API logic."""
mock_options = self.setup_mock_options()
mock_options.shards = [0, 1, 2, 3]
mock_config = Mock()
mock_config.hooks = []
mock_config.raw.return_value.enable_hooks.return_value.get.return_value = False
(mock_api, mock_scheduler) = self.setup_mock_api()
mock_api_factory = Mock(return_value=mock_api)
mock_scheduler.killTasks.return_value = self.get_kill_job_response()
with contextlib.nested(
patch('apache.aurora.client.factory.make_client_factory', return_value=mock_api_factory),
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
mock_api_factory_patch,
mock_scheduler_proxy_class,
mock_clusters,
options, mock_get_job_config):
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
kill(['west/mchucarroll/test/hello', fp.name], mock_options)
# Now check that the right API calls got made.
self.assert_scheduler_called(mock_api)
assert mock_scheduler.killTasks.call_count == 1
query = self.get_expected_task_query([0, 1, 2, 3])
mock_scheduler.killTasks.assert_called_with(query, None)
开发者ID:betepahos,项目名称:incubator-aurora,代码行数:30,代码来源:test_kill.py
示例11: test_simple_successful_kill_job
def test_simple_successful_kill_job(self):
"""Run a test of the "kill" command against a mocked-out API:
Verifies that the kill command sends the right API RPCs, and performs the correct
tests on the result."""
mock_options = self.setup_mock_options()
mock_config = Mock()
mock_api_factory = self.setup_mock_api_factory()
with contextlib.nested(
patch('apache.aurora.client.commands.core.make_client_factory',
return_value=mock_api_factory),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('apache.aurora.client.commands.core.get_job_config', return_value=mock_config)) as (
mock_make_client_factory,
options, mock_get_job_config):
mock_api = mock_api_factory.return_value
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
kill(['west/mchucarroll/test/hello', fp.name], mock_options)
# Now check that the right API calls got made.
self.assert_kill_job_called(mock_api)
mock_api.kill_job.assert_called_with(
AuroraJobKey(cluster=self.TEST_CLUSTER, role=self.TEST_ROLE, env=self.TEST_ENV,
name=self.TEST_JOB), None, config=mock_config)
self.assert_scheduler_called(mock_api)
assert mock_make_client_factory.call_count == 1
开发者ID:betepahos,项目名称:incubator-aurora,代码行数:28,代码来源:test_kill.py
示例12: test_failed_create_job_with_incomplete_bindings
def test_failed_create_job_with_incomplete_bindings(self):
"""Run a test of the "create" command against a mocked-out API:
Verifies that the creation command sends the right API RPCs, and performs the correct
tests on the result."""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
# This is the real test: invoke create as if it had been called by the command line.
with temporary_file() as fp:
fp.write(self.get_unbound_test_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'create', '--wait-until=RUNNING',
'--bind', 'cluster_binding=west',
'west/bozo/test/hello',
fp.name])
assert result == EXIT_INVALID_CONFIGURATION
assert mock_context.get_out() == []
assert mock_context.get_err() == [
"Error loading configuration: "
"TypeCheck(FAILED): MesosJob[update_config] failed: "
"UpdateConfig[batch_size] failed: u'{{TEST_BATCH}}' not an integer"]
开发者ID:kidaa,项目名称:aurora,代码行数:25,代码来源:test_create.py
示例13: test_plugin_runs_in_create_job
def test_plugin_runs_in_create_job(self):
"""Run a test of the "create" command against a mocked-out API:
Verifies that the creation command sends the right API RPCs, and performs the correct
tests on the result."""
# We'll patch out create_context, which will give us a fake context
# object, and everything can be stubbed through that.
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context):
# After making the client, create sets up a job monitor.
# The monitor uses TaskQuery to get the tasks. It's called at least twice:once before
# the job is created, and once after. So we need to set up mocks for the query results.
mock_query = self.create_mock_query()
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.INIT))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
# This is the real test: invoke create as if it had been called by the command line.
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
cmd.register_plugin(BogusPlugin())
cmd.execute(['job', 'create', '--bogosity=maximum', '--wait_until=RUNNING',
'west/bozo/test/hello', fp.name])
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
self.assert_create_job_called(api)
self.assert_scheduler_called(api, mock_query, 2)
# Check that the plugin did its job.
assert mock_context.bogosity == "maximum"
开发者ID:MustafaOrkunAcar,项目名称:incubator-aurora,代码行数:35,代码来源:test_plugins.py
示例14: test_diff_server_error
def test_diff_server_error(self):
"""Test the diff command if the user passes a config with an error in it."""
mock_options = self.setup_mock_options()
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_scheduler_proxy.getTasksStatus.return_value = self.create_failed_status_response()
self.setup_populate_job_config(mock_scheduler_proxy)
with contextlib.nested(
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('subprocess.call', return_value=0),
patch('json.loads', return_value=Mock())) as (
mock_scheduler_proxy_class,
mock_clusters,
options,
subprocess_patch,
json_patch):
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])
assert result == EXIT_INVALID_PARAMETER
# In this error case, we should have called the server getTasksStatus;
# but since it fails, we shouldn't call populateJobConfig or subprocess.
mock_scheduler_proxy.getTasksStatus.assert_called_with(
TaskQuery(jobName='hello', environment='test', owner=Identity(role='bozo'),
statuses=ACTIVE_STATES))
assert mock_scheduler_proxy.populateJobConfig.call_count == 0
assert subprocess_patch.call_count == 0
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:30,代码来源:test_diff.py
示例15: test_diff_invalid_config
def test_diff_invalid_config(self):
"""Test the diff command if the user passes a config with an error in it."""
mock_options = self.setup_mock_options()
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
self.setup_populate_job_config(mock_scheduler_proxy)
with contextlib.nested(
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options),
patch('subprocess.call', return_value=0),
patch('json.loads', return_value=Mock())) as (
mock_scheduler_proxy_class,
mock_clusters,
options,
subprocess_patch,
json_patch):
with temporary_file() as fp:
fp.write(self.get_invalid_config('stupid="me"',))
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])
assert result == EXIT_INVALID_CONFIGURATION
assert mock_scheduler_proxy.getTasksStatus.call_count == 0
assert mock_scheduler_proxy.populateJobConfig.call_count == 0
assert subprocess_patch.call_count == 0
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:26,代码来源:test_diff.py
示例16: test_successful_diff
def test_successful_diff(self):
"""Test the diff command."""
(mock_api, mock_scheduler_proxy) = self.create_mock_api()
with contextlib.nested(
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.factory.CLUSTERS', new=self.TEST_CLUSTERS),
patch('subprocess.call', return_value=0),
patch('json.loads', return_value=Mock())) as (_, _, subprocess_patch, _):
mock_scheduler_proxy.getTasksStatus.return_value = self.create_status_response()
self.setup_populate_job_config(mock_scheduler_proxy)
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
cmd.execute(['job', 'diff', 'west/bozo/test/hello', fp.name])
# Diff should get the task status, populate a config, and run diff.
mock_scheduler_proxy.getTasksStatus.assert_called_with(
TaskQuery(jobName='hello', environment='test', owner=Identity(role='bozo'),
statuses=ACTIVE_STATES))
assert mock_scheduler_proxy.populateJobConfig.call_count == 1
assert isinstance(mock_scheduler_proxy.populateJobConfig.call_args[0][0], JobConfiguration)
assert (mock_scheduler_proxy.populateJobConfig.call_args[0][0].key ==
JobKey(environment=u'test', role=u'bozo', name=u'hello'))
# Subprocess should have been used to invoke diff with two parameters.
assert subprocess_patch.call_count == 1
assert len(subprocess_patch.call_args[0][0]) == 3
assert subprocess_patch.call_args[0][0][0] == os.environ.get('DIFF_VIEWER', 'diff')
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:28,代码来源:test_diff.py
示例17: test_simple_successful_create_job_output
def test_simple_successful_create_job_output(self):
"""Run a test of the "create" command against a mocked-out API:
Verifies that the creation command generates the correct output.
"""
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.PENDING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'create', '--wait-until=RUNNING', 'west/bozo/test/hello',
fp.name])
assert result == EXIT_OK
assert mock_context.get_out() == [
"Job create succeeded: job url=http://something_or_other/scheduler/bozo/test/hello"]
assert mock_context.get_err() == []
开发者ID:davelester,项目名称:incubator-aurora,代码行数:27,代码来源:test_create.py
示例18: test_create_job_startup_fails
def test_create_job_startup_fails(self):
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('threading._Event.wait'),
patch('apache.aurora.client.cli.jobs.Job.create_context', return_value=mock_context)):
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.PENDING))
mock_context.add_expected_status_query_result(
self.create_mock_status_query_result(ScheduleStatus.RUNNING))
# We need to override the side_effect behavior of check_status in the context.
def check_status_side_effect(*args):
return self.create_error_response()
mock_context.get_api("west").check_status.side_effect = check_status_side_effect
api = mock_context.get_api('west')
api.create_job.return_value = self.get_createjob_response()
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['job', 'create', '--wait-until=RUNNING', 'west/bozo/test/hello',
fp.name])
assert result == EXIT_COMMAND_FAILURE
assert mock_context.get_out() == []
assert mock_context.get_err() == ["Error occurred while creating job west/bozo/test/hello"]
开发者ID:davelester,项目名称:incubator-aurora,代码行数:28,代码来源:test_create.py
示例19: do_test_artifact_cache
def do_test_artifact_cache(self, artifact_cache):
key = CacheKey("muppet_key", "fake_hash", 42, [])
with temporary_file(artifact_cache.artifact_root) as f:
# Write the file.
f.write(TEST_CONTENT1)
path = f.name
f.close()
# Cache it.
self.assertFalse(artifact_cache.has(key))
self.assertFalse(bool(artifact_cache.use_cached_files(key)))
artifact_cache.insert(key, [path])
self.assertTrue(artifact_cache.has(key))
# Stomp it.
with open(path, "w") as outfile:
outfile.write(TEST_CONTENT2)
# Recover it from the cache.
self.assertTrue(bool(artifact_cache.use_cached_files(key)))
# Check that it was recovered correctly.
with open(path, "r") as infile:
content = infile.read()
self.assertEquals(content, TEST_CONTENT1)
# Delete it.
artifact_cache.delete(key)
self.assertFalse(artifact_cache.has(key))
开发者ID:foursquare,项目名称:twitter-commons,代码行数:29,代码来源:test_artifact_cache.py
示例20: test_perform_maintenance_hosts_failed_default_sla
def test_perform_maintenance_hosts_failed_default_sla(self):
with temporary_file() as fp:
mock_options = self.make_mock_options()
mock_options.post_drain_script = None
mock_options.grouping = 'by_host'
mock_options.unsafe_hosts_filename = fp.name
mock_api, mock_scheduler_proxy = self.create_mock_api()
mock_scheduler_proxy.startMaintenance.return_value = self.create_start_maintenance_result()
mock_scheduler_proxy.drainHosts.return_value = self.create_start_maintenance_result()
mock_vector = self.create_mock_probe_hosts_vector([
self.create_probe_hosts(self.HOSTNAMES[0], 95, False, None),
self.create_probe_hosts(self.HOSTNAMES[1], 95, False, None),
self.create_probe_hosts(self.HOSTNAMES[2], 95, False, None)
])
with contextlib.nested(
patch('apache.aurora.client.api.SchedulerProxy', return_value=mock_scheduler_proxy),
patch('apache.aurora.client.api.sla.Sla.get_domain_uptime_vector',
return_value=mock_vector),
patch('apache.aurora.client.commands.maintenance.CLUSTERS', new=self.TEST_CLUSTERS),
patch('twitter.common.app.get_options', return_value=mock_options)):
host_drain([self.TEST_CLUSTER])
mock_scheduler_proxy.startMaintenance.assert_called_with(Hosts(set(self.HOSTNAMES)))
开发者ID:bhuvan,项目名称:incubator-aurora,代码行数:25,代码来源:test_maintenance.py
注:本文中的twitter.common.contextutil.temporary_file函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论