本文整理汇总了Python中tests.py2.patch函数的典型用法代码示例。如果您正苦于以下问题:Python patch函数的具体用法?Python patch怎么用?Python patch使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了patch函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp(self):
super(InterpretEMRStepStderrTestCase, self).setUp()
# instead of mocking out contents of files, just mock out
# what _parse_step_syslog() should return, and have
# _cat_log() just pass through the path
self.mock_paths = []
self.path_to_mock_result = {}
self.mock_paths_catted = []
def mock_cat_log(fs, path):
if path in self.mock_paths:
self.mock_paths_catted.append(path)
return path
def mock_parse_task_stderr(path_from_mock_cat_log):
return self.path_to_mock_result.get(path_from_mock_cat_log)
# need to mock ls so that _ls_task_syslogs() can work
def mock_exists(path):
return path in self.mock_paths
def mock_ls(log_dir):
return self.mock_paths
self.mock_fs = Mock()
self.mock_fs.ls = Mock(side_effect=mock_ls)
self.mock_cat_log = self.start(patch("mrjob.logs.step._cat_log", side_effect=mock_cat_log))
self.start(patch("mrjob.logs.step._parse_task_stderr", side_effect=mock_parse_task_stderr))
开发者ID:davidmarin,项目名称:mrjob,代码行数:32,代码来源:test_step.py
示例2: setUp
def setUp(self):
# if save_sys_std() *doesn't* work, don't mess up other tests
super(SaveSysStdTestCase, self).setUp()
self.stdin = self.start(patch('sys.stdin'))
self.stdout = self.start(patch('sys.stdout'))
self.stderr = self.start(patch('sys.stderr'))
开发者ID:Yelp,项目名称:mrjob,代码行数:7,代码来源:test_util.py
示例3: test_logging_stderr_in_cleanup
def test_logging_stderr_in_cleanup(self):
def mock_Popen(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.stdout = MagicMock()
mock_proc.stdout.__iter__.return_value = [
b'line1\n', b'line2\n']
mock_proc.stderr = MagicMock()
mock_proc.stderr.__iter__.return_value = [
b'Emergency, everybody to get from street\n']
mock_proc.wait.return_value = 0
return mock_proc
self.start(patch('mrjob.fs.hadoop.Popen', mock_Popen))
mock_log = self.start(patch('mrjob.fs.hadoop.log'))
fs = HadoopFilesystem()
data = b''.join(fs._cat_file('/some/path'))
self.assertEqual(data, b'line1\nline2\n')
mock_log.error.assert_called_once_with(
'STDERR: Emergency, everybody to get from street')
开发者ID:Yelp,项目名称:mrjob,代码行数:28,代码来源:test_hadoop.py
示例4: setUp
def setUp(self):
super(AuditUsageTestCase, self).setUp()
self.repeat_sleep = self.start(patch('time.sleep'))
# this is called once per cluster (no pagination), so we can
# test quantity as well as whether it was called
self.describe_cluster_sleep = self.start(
patch('mrjob.tools.emr.audit_usage.sleep'))
开发者ID:okomestudio,项目名称:mrjob,代码行数:8,代码来源:test_audit_usage.py
示例5: setUp
def setUp(self):
super(DeprecatedSwitchesTestCase, self).setUp()
self._maybe_terminate_clusters = self.start(patch(
'mrjob.tools.emr.terminate_idle_clusters.'
'_maybe_terminate_clusters'))
self.log = self.start(
patch('mrjob.tools.emr.terminate_idle_clusters.log'))
开发者ID:Affirm,项目名称:mrjob,代码行数:9,代码来源:test_terminate_idle_clusters.py
示例6: setUp
def setUp(self):
super(InterpretTaskLogsTestCase, self).setUp()
self.runner._ls_task_logs = Mock()
self._interpret_task_logs = (
self.start(patch('mrjob.logs.mixin._interpret_task_logs')))
self._interpret_spark_task_logs = (
self.start(patch('mrjob.logs.mixin._interpret_spark_task_logs')))
self.runner.get_hadoop_version = Mock(return_value='2.7.1')
开发者ID:Affirm,项目名称:mrjob,代码行数:9,代码来源:test_mixin.py
示例7: setUp
def setUp(self):
super(StepPickingTestCase, self).setUp()
self.pick_error = self.start(
patch('mrjob.emr.EMRJobRunner._pick_error',
side_effect=StopIteration))
self.log = self.start(
patch('mrjob.tools.diagnose.log'))
开发者ID:okomestudio,项目名称:mrjob,代码行数:9,代码来源:test_diagnose.py
示例8: setUp
def setUp(self):
super(JoinTestCase, self).setUp()
# os.path.join() and posixpath.join() do the same thing in
# UNIX and OS X, so track which one we called
self.start(patch('os.path.join', wraps=os.path.join))
self.start(patch('posixpath.join', wraps=posixpath.join))
self.fs = Filesystem()
开发者ID:kartheek6,项目名称:mrjob,代码行数:9,代码来源:test_base.py
示例9: setUp
def setUp(self):
super(InterpretEMRBootstrapStderrTestCase, self).setUp()
self.mock_fs = Mock()
self.mock_parse_task_stderr = self.start(
patch('mrjob.logs.bootstrap._parse_task_stderr',
return_value=dict(message='BOOM!\n')))
self.mock_cat_log = self.start(patch('mrjob.logs.bootstrap._cat_log'))
开发者ID:kaiyik,项目名称:mrjob,代码行数:10,代码来源:test_bootstrap.py
示例10: setUp
def setUp(self):
super(SortBinTestCase, self).setUp()
# these patches are only okay if they don't raise an exception;
# otherwise that hands an un-pickleable stacktrace to multiprocessing
self.check_call = self.start(patch(
'mrjob.local.check_call', wraps=check_call))
self._sort_lines_in_memory = self.start(patch(
'mrjob.local._sort_lines_in_memory',
wraps=_sort_lines_in_memory))
开发者ID:Affirm,项目名称:mrjob,代码行数:11,代码来源:test_local.py
示例11: setUp
def setUp(self):
super(StepPickingTestCase, self).setUp()
self.pick_error = self.start(
patch('mrjob.emr.EMRJobRunner._pick_error',
side_effect=StopIteration))
self.log = self.start(
patch('mrjob.tools.diagnose.log'))
# don't print logging messages when we start the diagnose tool
self.log_to_stream = self.start(
patch('mrjob.launch.log_to_stream'))
开发者ID:Affirm,项目名称:mrjob,代码行数:13,代码来源:test_diagnose.py
示例12: setUp
def setUp(self):
super(WrapAWSClientTestCase, self).setUp()
# don't actually wait between retries
self.sleep = self.start(patch('time.sleep'))
self.log = self.start(patch('mrjob.retry.log'))
self.list_buckets = self.start(patch(
'tests.mock_boto3.s3.MockS3Client.list_buckets',
side_effect=[dict(Buckets=[])]))
self.client = self.client('s3')
self.wrapped_client = _wrap_aws_client(self.client)
开发者ID:Yelp,项目名称:mrjob,代码行数:14,代码来源:test_s3.py
示例13: setUp
def setUp(self):
super(SparkPyFilesTestCase, self).setUp()
# don't bother actually running spark
self.start(patch(
'mrjob.spark.runner.SparkMRJobRunner._run_spark_submit',
return_value=0))
开发者ID:Affirm,项目名称:mrjob,代码行数:7,代码来源:test_runner.py
示例14: test_infer_from_hadoop_bin_realpath
def test_infer_from_hadoop_bin_realpath(self):
with patch('posixpath.realpath', return_value='/ha/do/op/bin'):
self.runner = HadoopJobRunner(hadoop_bin=['/usr/bin/hadoop'])
self.mock_paths.append('/ha/do/op/hadoop-streaming.jar')
self.assertEqual(self.runner._find_hadoop_streaming_jar(),
'/ha/do/op/hadoop-streaming.jar')
开发者ID:kartheek6,项目名称:mrjob,代码行数:7,代码来源:test_hadoop.py
示例15: setUp
def setUp(self):
self._dataproc_client = MockDataprocClient(self)
self._gcs_client = MockGCSClient(self)
self._gcs_fs = self._gcs_client._fs
self.start(patch.object(
DataprocJobRunner, 'api_client', self._dataproc_client))
self.gcs_patch_api_client = patch.object(
GCSFilesystem, 'api_client', self._gcs_client)
self.gcs_patch_download_io = patch.object(
GCSFilesystem, '_download_io', self._gcs_client.download_io)
self.gcs_patch_upload_io = patch.object(
GCSFilesystem, '_upload_io', self._gcs_client.upload_io)
self.start(self.gcs_patch_api_client)
self.start(self.gcs_patch_download_io)
self.start(self.gcs_patch_upload_io)
self.start(patch('mrjob.dataproc._read_gcloud_config',
lambda: _GCLOUD_CONFIG))
super(MockGoogleAPITestCase, self).setUp()
# patch slow things
def fake_create_mrjob_tar_gz(mocked_self, *args, **kwargs):
mocked_self._mrjob_tar_gz_path = self.fake_mrjob_tgz_path
return self.fake_mrjob_tgz_path
self.start(patch.object(
DataprocJobRunner, '_create_mrjob_tar_gz',
fake_create_mrjob_tar_gz))
self.start(patch.object(time, 'sleep'))
开发者ID:anirudhreddy92,项目名称:mrjob,代码行数:33,代码来源:mockgoogleapiclient.py
示例16: setUp
def setUp(self):
super(LsLogsTestCase, self).setUp()
self.mock_fs = Mock()
self.mock_paths = []
def mock_fs_ls(log_dir):
prefix = log_dir.rstrip('/') + '/'
exists = False
for p in self.mock_paths:
if isinstance(p, Exception):
raise p
elif p.startswith(prefix):
yield p
exists = True
if not exists:
raise IOError
def mock_fs_exists(log_dir):
return any(mock_fs_ls(log_dir))
self.mock_fs.ls = Mock(side_effect=mock_fs_ls)
self.mock_fs.exists = Mock(side_effect=mock_fs_exists)
# a matcher that cheerfully passes through kwargs
def mock_matcher(path, **kwargs):
return dict(**kwargs)
self.mock_matcher = Mock(side_effect=mock_matcher)
self.log = self.start(patch('mrjob.logs.wrap.log'))
开发者ID:Affirm,项目名称:mrjob,代码行数:34,代码来源:test_wrap.py
示例17: test_get_location_is_forbidden
def test_get_location_is_forbidden(self):
self.add_mock_s3_data({'walrus': {}}, location='us-west-2')
fs = S3Filesystem()
access_denied_error = ClientError(
dict(
Error=dict(
Code='AccessDenied',
Message='Access Denied',
),
ResponseMetadata=dict(
HTTPStatusCode=403
),
),
'GetBucketLocation')
with patch(
'tests.mock_boto3.s3.MockS3Client.get_bucket_location',
side_effect=access_denied_error):
bucket = fs.get_bucket('walrus')
self.assertEqual(bucket.meta.client.meta.endpoint_url,
'https://s3.amazonaws.com')
self.assertEqual(bucket.meta.client.meta.region_name, 'us-east-1')
开发者ID:okomestudio,项目名称:mrjob,代码行数:26,代码来源:test_s3.py
示例18: setUp
def setUp(self):
super(InterpretTaskLogsTestCase, self).setUp()
# instead of mocking out contents of files, just mock out
# what _parse_task_{syslog,stderr}() should return, and have
# _cat_log_lines() just pass through the path
self.mock_paths = []
self.path_to_mock_result = {}
self.mock_log_callback = Mock()
self.mock_paths_catted = []
def mock_cat_log_lines(fs, path):
if path in self.mock_paths:
self.mock_paths_catted.append(path)
return path
# (the actual log-parsing functions take lines from the log)
def mock_parse_task_syslog(path_from_mock_cat_log_lines):
# default is {}
return self.path_to_mock_result.get(
path_from_mock_cat_log_lines, {})
def mock_parse_task_stderr(path_from_mock_cat_log_lines):
# default is None
return self.path_to_mock_result.get(path_from_mock_cat_log_lines)
def mock_exists(path):
return path in self.mock_paths or path == 'MOCK_LOG_DIR'
# need to mock ls so that _ls_task_logs() can work
def mock_ls(log_dir):
return self.mock_paths
self.mock_fs = Mock()
self.mock_fs.exists = Mock(side_effect=mock_exists)
self.mock_fs.ls = Mock(side_effect=mock_ls)
self.mock_cat_log_lines = self.start(
patch('mrjob.logs.task._cat_log_lines',
side_effect=mock_cat_log_lines))
self.start(patch('mrjob.logs.task._parse_task_syslog',
side_effect=mock_parse_task_syslog))
self.start(patch('mrjob.logs.task._parse_task_stderr',
side_effect=mock_parse_task_stderr))
开发者ID:Affirm,项目名称:mrjob,代码行数:47,代码来源:test_task.py
示例19: setUp
def setUp(self):
super(MRBossTestCase, self).setUp()
self.ssh_worker_hosts = self.start(patch(
'mrjob.emr.EMRJobRunner._ssh_worker_hosts',
return_value=[]))
self.make_runner()
开发者ID:Affirm,项目名称:mrjob,代码行数:8,代码来源:test_mrboss.py
示例20: setUp
def setUp(self):
super(PickErrorsTestCase, self).setUp()
self.runner._interpret_history_log = Mock()
self.runner._interpret_step_logs = Mock()
self.runner._interpret_task_logs = Mock()
self._pick_error = self.start(
patch('mrjob.logs.mixin._pick_error'))
开发者ID:anirudhreddy92,项目名称:mrjob,代码行数:9,代码来源:test_mixin.py
注:本文中的tests.py2.patch函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论