本文整理汇总了Python中mock.mock.patch函数的典型用法代码示例。如果您正苦于以下问题:Python patch函数的具体用法?Python patch怎么用?Python patch使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了patch函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _send_presence_for_aggregated_tests
def _send_presence_for_aggregated_tests(self, email, status, validate_time):
# type: (str, str, datetime.datetime) -> Dict[str, Dict[str, Any]]
self.login(email)
timezone_util = 'zerver.views.presence.timezone_now'
with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=5)):
self.client_post("/json/users/me/presence", {'status': status})
with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=2)):
self.client_post("/api/v1/users/me/presence", {'status': status},
HTTP_USER_AGENT="ZulipAndroid/1.0",
**self.api_auth(email))
with mock.patch(timezone_util, return_value=validate_time - datetime.timedelta(seconds=7)):
latest_result = self.client_post("/api/v1/users/me/presence", {'status': status},
HTTP_USER_AGENT="ZulipIOS/1.0",
**self.api_auth(email))
latest_result_dict = latest_result.json()
self.assertDictEqual(
latest_result_dict['presences'][email]['aggregated'],
{
'status': status,
'timestamp': datetime_to_timestamp(validate_time - datetime.timedelta(seconds=2)),
'client': 'ZulipAndroid'
}
)
result = self.client_get("/json/users/%s/presence" % (email,))
return result.json()
开发者ID:brockwhittaker,项目名称:zulip,代码行数:25,代码来源:test_presence.py
示例2: setUp
def setUp(self):
self.repo = pywikibot.Site('test', 'wikidata')
# patch bold
def bold_side_effect(val):
return 'bold_{}'.format(val)
bold_patcher = mock.patch(
'wikidatastuff.preview_item.PreviewItem.make_text_bold')
self.mock_bold = bold_patcher.start()
self.mock_bold.side_effect = bold_side_effect
# patch italics
def italics_side_effect(val):
return 'italics_{}'.format(val)
italics_patcher = mock.patch(
'wikidatastuff.preview_item.PreviewItem.make_text_italics')
self.mock_italics = italics_patcher.start()
self.mock_italics.side_effect = italics_side_effect
self.addCleanup(bold_patcher.stop)
self.addCleanup(italics_patcher.stop)
# patch wikidata_template
wd_template_patcher = mock.patch(
'wikidatastuff.preview_item.PreviewItem.make_wikidata_template')
self.mock_wd_template = wd_template_patcher.start()
self.mock_wd_template.side_effect = ['wd_template_{}'.format(i)
for i in range(1, 5)]
self.addCleanup(wd_template_patcher.stop)
开发者ID:lokal-profil,项目名称:wikidata-stuff,代码行数:31,代码来源:test_preview_item.py
示例3: test_check_supported_version
def test_check_supported_version(self):
# version ok
current_version = '0.4.0'
supported_version = ['0.4', '0.3', '0.2']
self.assertTrue(ResourcesManager._check_supported_version(current_version=current_version,
supported_versions=supported_version))
# version ok
current_version = '11.23.0'
supported_version = ['11.23', '12.3', '2.23']
self.assertTrue(ResourcesManager._check_supported_version(current_version=current_version,
supported_versions=supported_version))
# version non ok, user does not config
# Testing with integer values instead of string
current_version = '0.4.0'
supported_version = [0.3, 0.2]
with mock.patch('kalliope.Utils.query_yes_no', return_value=True):
self.assertTrue(ResourcesManager._check_supported_version(current_version=current_version,
supported_versions=supported_version))
with mock.patch('kalliope.Utils.query_yes_no', return_value=False):
self.assertFalse(ResourcesManager._check_supported_version(current_version=current_version,
supported_versions=supported_version))
开发者ID:igorstarki,项目名称:kalliope,代码行数:27,代码来源:test_resources_manager.py
示例4: testCallback
def testCallback(self):
"""
Testing the callback provided when audio has been provided by the User as an answer.
"""
parameters = {
"default": self.default,
"from_answer_link": self.from_answer_link
}
with mock.patch("kalliope.core.NeuronModule.get_audio_from_stt") as mock_get_audio_from_stt:
with mock.patch("kalliope.core.NeuronModule.run_synapse_by_name") as mock_run_synapse_by_name:
# testing running the default when no order matching
nt = Neurotransmitter(**parameters)
mock_get_audio_from_stt.assert_called_once()
mock_get_audio_from_stt.reset_mock()
# testing running the default when audio None
audio_text = None
nt.callback(audio=audio_text)
mock_run_synapse_by_name.assert_called_once_with(self.default, high_priority=True, is_api_call=False)
mock_run_synapse_by_name.reset_mock()
# testing running the default when no order matching
audio_text = "try test audio "
nt.callback(audio=audio_text)
mock_run_synapse_by_name.assert_called_once_with(self.default, high_priority=True, is_api_call=False)
mock_run_synapse_by_name.reset_mock()
# Testing calling the right synapse
audio_text = "answer one"
nt.callback(audio=audio_text)
mock_run_synapse_by_name.assert_called_once_with(synapse_name="synapse2",
user_order=audio_text,
synapse_order="answer one",
high_priority=True,
is_api_call=False)
开发者ID:igorstarki,项目名称:kalliope,代码行数:35,代码来源:test_neurotransmitter.py
示例5: test_pre_save
def test_pre_save(self):
p = Poll()
p.image = SimpleUploadedFile(TEST_IMAGE, b'')
c = CloudinaryField('image', width_field="image_width", height_field="image_height")
c.set_attributes_from_name('image')
mocked_resource = cloudinary.CloudinaryResource(metadata={"width": TEST_IMAGE_W, "height": TEST_IMAGE_H},
type="upload", public_id=TEST_IMAGE, resource_type="image")
with mock.patch('cloudinary.uploader.upload_resource', return_value=mocked_resource) as upload_mock:
prep_value = c.pre_save(p, None)
self.assertTrue(upload_mock.called)
self.assertEqual(".png", os.path.splitext(prep_value)[1])
self.assertEqual(TEST_IMAGE_W, p.image_width)
self.assertEqual(TEST_IMAGE_H, p.image_height)
# check empty values handling
p.image = SimpleUploadedFile(TEST_IMAGE, b'')
mocked_resource_empty = cloudinary.CloudinaryResource(metadata={})
with mock.patch('cloudinary.uploader.upload_resource', return_value=mocked_resource_empty) as upload_mock:
c.pre_save(p, None)
self.assertTrue(upload_mock.called)
self.assertIsNone(p.image_width)
self.assertIsNone(p.image_height)
开发者ID:cloudinary,项目名称:pycloudinary,代码行数:26,代码来源:test_cloudinaryField.py
示例6: test_remove_other_unseen_boundaries
def test_remove_other_unseen_boundaries(self):
# other unseen boundaries are boundaries which have not been updated in any way for a country
# insert features in the database
geojson_data = [self.data_geojson_level_0, self.data_geojson_level_1]
with patch("builtins.open") as mock_file:
mock_file.return_value.__enter__ = lambda filename: filename
mock_file.return_value.__exit__ = mock.Mock()
mock_file.return_value.read.side_effect = lambda: geojson_data.pop(0)
with CaptureSTDOUT():
call_command("import_geojson", "admin_level_0_simplified.json", "admin_level_1_simplified.json")
self.assertEqual(AdminBoundary.objects.count(), 2)
# update data, and add a new boundary
geojson_data = [self.data_geojson_level_0]
with patch("builtins.open") as mock_file:
mock_file.return_value.__enter__ = lambda filename: filename
mock_file.return_value.__exit__ = mock.Mock()
mock_file.return_value.read.side_effect = lambda: geojson_data.pop(0)
with CaptureSTDOUT() as captured_output:
call_command("import_geojson", "admin_level_0_simplified.json")
self.assertEqual(
captured_output.getvalue(),
"=== parsing admin_level_0_simplified.json\n ** updating Granica (R1000)\n ** removing unseen boundaries (R1000)\nOther unseen boundaries removed: 1\n ** updating paths for all of Granica\n",
)
self.assertEqual(AdminBoundary.objects.count(), 1)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:33,代码来源:tests.py
示例7: test_update_features_with_parent
def test_update_features_with_parent(self):
# insert features in the database
geojson_data = [self.data_geojson_level_0, self.data_geojson_level_1]
with patch("builtins.open") as mock_file:
mock_file.return_value.__enter__ = lambda filename: filename
mock_file.return_value.__exit__ = mock.Mock()
mock_file.return_value.read.side_effect = lambda: geojson_data.pop(0)
with CaptureSTDOUT():
call_command("import_geojson", "admin_level_0_simplified.json", "admin_level_1_simplified.json")
self.assertEqual(AdminBoundary.objects.count(), 2)
# update features
geojson_data = [self.data_geojson_level_0, self.data_geojson_level_1]
with patch("builtins.open") as mock_file:
mock_file.return_value.__enter__ = lambda filename: filename
mock_file.return_value.__exit__ = mock.Mock()
mock_file.return_value.read.side_effect = lambda: geojson_data.pop(0)
with CaptureSTDOUT() as captured_output:
call_command("import_geojson", "admin_level_0_simplified.json", "admin_level_1_simplified.json")
self.assertEqual(
captured_output.getvalue(),
"=== parsing admin_level_0_simplified.json\n ** updating Granica (R1000)\n ** removing unseen boundaries (R1000)\n=== parsing admin_level_1_simplified.json\n ** updating Međa 2 (R2000)\n ** removing unseen boundaries (R2000)\nOther unseen boundaries removed: 0\n ** updating paths for all of Granica\n",
)
self.assertEqual(AdminBoundary.objects.count(), 2)
开发者ID:mxabierto,项目名称:rapidpro,代码行数:31,代码来源:tests.py
示例8: test_resource_in_discovery_container_after_get_patch_delete
def test_resource_in_discovery_container_after_get_patch_delete(self):
with mock.patch('requests.get') as requests_get_mock:
resource = {"@odata.id": "odata.id", "something": "irrelevant"}
get_response = Mock()
get_response.status_code = ReturnCodes.OK
get_response.headers = {}
get_response.text = json.dumps(resource)
requests_get_mock.return_value = get_response
discovery_container = DiscoveryContainer()
self.api_caller.get_resource("/resource", discovery_container)
self.assertEqual(discovery_container["http://{API_ENDPOINT}/resource".format(
API_ENDPOINT=API_ENDPOINT)].body,
resource)
patched_resource = {"@odata.id": "odata.id", "something": "relevant"}
get_response.text = json.dumps(patched_resource)
with mock.patch('requests.patch') as requests_patch_mock:
patch_response = Mock()
patch_response.status_code = ReturnCodes.OK
patch_response.headers = {}
patch_response.text = "{}"
requests_patch_mock.return_value = patch_response
_, _, _, _ = self.api_caller.patch_resource("/resource", discovery_container)
self.assertEqual(discovery_container["http://{API_ENDPOINT}/resource".format(
API_ENDPOINT=API_ENDPOINT)].body, patched_resource)
with mock.patch('requests.delete') as requests_delete_mock:
delete_response = Mock()
delete_response.status_code = ReturnCodes.NO_CONTENT
delete_response.headers = {}
delete_response.text = ""
requests_delete_mock.return_value = delete_response
_, _, _, _ = self.api_caller.delete_resource("/resource", discovery_container)
self.assertNotIn("/resource", discovery_container)
开发者ID:01org,项目名称:intelRSD,代码行数:35,代码来源:test_api_caller.py
示例9: test_field_rules_and_callback
def test_field_rules_and_callback():
form = Form()
form.add_text("description", "Description")\
.add_rule(Validator.INTEGER, "Please provide integer")
form.add_text("first_name", "First Name:")\
.set_required()
form.add_button("reset", "Reset")
form.add_submit("save", "Save")
response = {
"description":1,
"first_name": "Test",
"save": True
}
with mock.patch("tests.form_test.on_success_dummy") as on_success_called:
form.on_success.append(on_success_called)
assert form.validate(response) == True
on_success_called.assert_called_with()
response = {
"description": "test",
"save": True
}
with mock.patch("tests.form_test.on_error_dummy") as on_error_called:
form.on_error.append(on_error_called)
assert form.validate(response) == False
assert form["first_name"].is_valid() == False
assert form["description"].is_valid() == False
on_error_called.assert_called_with()
开发者ID:Twista,项目名称:tforms,代码行数:33,代码来源:form_test.py
示例10: _run_composer
def _run_composer(self, args):
mock_stdout = six.StringIO()
mock_stderr = six.StringIO()
with mock.patch("sys.stdout", mock_stdout):
with mock.patch("sys.stderr", mock_stderr):
with self.assertRaises(SystemExit) as cm:
ringcomposer.main(args)
return (cm.exception.code,
mock_stdout.getvalue(),
mock_stderr.getvalue())
开发者ID:jgmerritt,项目名称:swift,代码行数:10,代码来源:test_ringcomposer.py
示例11: test_check_fail
def test_check_fail(self):
with mock.patch('sys.exit'):
sys.exit = MagicMock()
with mock.patch('qrl.core.misc.DependencyChecker.DependencyChecker') as mockDepChecker:
test_path = os.path.dirname(os.path.abspath(__file__))
dummy_path = os.path.join(test_path, "..", "data", 'misc', 'dummy_requirements.txt')
mockDepChecker._get_requirements_path = MagicMock(return_value=dummy_path)
DependencyChecker.check()
sys.exit.assert_called()
开发者ID:fanff,项目名称:QRL,代码行数:10,代码来源:test_DependencyChecker.py
示例12: test_retrieve_request_from_database
def test_retrieve_request_from_database(self):
with mock.patch('cts_core.commons.replay_controller.ReplayController._read_request_ids_from_database') as read_ids:
controller_class = ReplayController().__class__()
read_ids.return_value = [22,33]
controller_class.initialize()
with mock.patch('cts_framework.db.dao.http_request_dao.HttpRequestDAO.retrieve') as retrieve:
retrieve.return_value = ('_method', 'url', '{"anything":"something"}', '_response', '_status_code')
self.assertEqual('_response', controller_class.request("http_method", "url", anything='something'))
retrieve.assert_called_with(22)
开发者ID:01org,项目名称:intelRSD,代码行数:10,代码来源:test_replay_controller.py
示例13: executeScript
def executeScript(self, path, classname=None, command=None, config_file=None,
# common mocks for all the scripts
config_overrides = None,
shell_mock_value = (0, "OK."),
os_type=('Suse','11','Final'),
kinit_path_local="/usr/bin/kinit"
):
norm_path = os.path.normpath(path)
src_dir = RMFTestCase._getSrcFolder()
stack_version = norm_path.split(os.sep)[0]
stacks_path = os.path.join(src_dir, PATH_TO_STACKS)
configs_path = os.path.join(src_dir, PATH_TO_STACK_TESTS, stack_version, "configs")
script_path = os.path.join(stacks_path, norm_path)
config_file_path = os.path.join(configs_path, config_file)
try:
with open(config_file_path, "r") as f:
self.config_dict = json.load(f)
except IOError:
raise RuntimeError("Can not read config file: "+ config_file_path)
if config_overrides:
for key, value in config_overrides.iteritems():
self.config_dict[key] = value
self.config_dict = ConfigDictionary(self.config_dict)
# append basedir to PYTHONPATH
scriptsdir = os.path.dirname(script_path)
basedir = os.path.dirname(scriptsdir)
sys.path.append(scriptsdir)
# get method to execute
try:
with patch.object(platform, 'linux_distribution', return_value=os_type):
script_module = imp.load_source(classname, script_path)
except IOError:
raise RuntimeError("Cannot load class %s from %s",classname, norm_path)
script_class_inst = RMFTestCase._get_attr(script_module, classname)()
method = RMFTestCase._get_attr(script_class_inst, command)
# Reload params import, otherwise it won't change properties during next import
if 'params' in sys.modules:
del(sys.modules["params"])
# run
with Environment(basedir, test_mode=True) as RMFTestCase.env:
with patch('resource_management.core.shell.checked_call', return_value=shell_mock_value): # we must always mock any shell calls
with patch.object(Script, 'get_config', return_value=self.config_dict): # mocking configurations
with patch.object(Script, 'install_packages'):
with patch('resource_management.libraries.functions.get_kinit_path', return_value=kinit_path_local):
with patch.object(platform, 'linux_distribution', return_value=os_type):
method(RMFTestCase.env)
sys.path.remove(scriptsdir)
开发者ID:wbear2,项目名称:ambari,代码行数:55,代码来源:RMFTestCase.py
示例14: testJwtDecode
def testJwtDecode(self):
headers = {'Cookie': 'access_token=' + JwtServiceTest.ACCESS_TOKEN}
with patch("jwt.decode", self.mocks.jwt_error), self.app.test_request_context(headers=headers):
self.mocks.jwt_error.side_effect = DecodeError()
self.assertFalse(services.get(JwtService).has_jwt())
self.assertTrue(self.mocks.jwt_error.called)
with patch("jwt.decode", self.mocks.jwt_decode), self.app.test_request_context(headers=headers):
self.mocks.jwt_decode.return_value = {}
self.assertTrue(services.get(JwtService).has_jwt())
开发者ID:nuxeo,项目名称:nuxeo-tools-hooks,代码行数:11,代码来源:test_jwt_service.py
示例15: test_patch
def test_patch(self):
with mock.patch('cts_core.commons.api_caller.ApiCaller.__init__') as api_caller_init_mock:
api_caller_init_mock.return_value = None
with mock.patch('cts_core.validation.patch.metadata_patch_validator.MetadataPatchValidator._validate_patchable_property') as validate_property:
validate_property.return_value = ValidationStatus.PASSED
validator = MetadataPatchValidator(self.metadata_container, None, PatchingStrategy2_1())
with StdoutCapture() as output:
self.assertEqual(ValidationStatus.PASSED, validator.validate(self.discovery_container))
开发者ID:01org,项目名称:intelRSD,代码行数:11,代码来源:test_patch_rmm_manager.py
示例16: setUp
def setUp(self):
super(GithubNotifyMailHandlerTest, self).setUp()
patchers = [
patch("nxtools.hooks.services.mail.EmailService.sendemail", Mock()),
patch("gevent.spawn", mocked_spawn),
patch("nxtools.hooks.endpoints.webhook.github_handlers.push_notify_mail.Process", MockedProcess),
]
[patcher.start() for patcher in patchers]
[self.addCleanup(patcher.stop) for patcher in patchers]
开发者ID:nuxeo,项目名称:nuxeo-tools-hooks,代码行数:11,代码来源:test_push_notify_mail.py
示例17: setUp
def setUp(self):
super(HTTPTest, self).setUp()
patchers = [
patch("gevent.socket.create_connection", self.mocks.socket_connect),
patch("httplib.HTTPConnection.request", self.mocks.http_request),
patch("httplib.HTTPConnection.getresponse", self.mocks.http_getresponse),
patch("geventhttpclient.httplib.HTTPResponse.read", self.mocks.httpresponse_read),
patch("geventhttpclient.response.HTTPSocketResponse._read_headers"),
]
[patcher.start() for patcher in patchers]
[self.addCleanup(patcher.stop) for patcher in patchers]
开发者ID:nuxeo,项目名称:nuxeo-tools-hooks,代码行数:13,代码来源:test_http.py
示例18: test_generate_shard_prune_playbook
def test_generate_shard_prune_playbook(plan_name):
migration = _get_migration(plan_name)
mock_shard_allocation = _get_expected_yml(plan_name, 'mock_shard_allocation_post_migration.yml')
mock_func = get_shard_allocation_func(mock_shard_allocation)
with patch('commcare_cloud.commands.migrations.couchdb.get_shard_allocation', mock_func),\
patch('commcare_cloud.commands.migrations.couchdb.get_db_list', return_value=['commcarehq', 'commcarehq__apps']):
nodes = generate_shard_prune_playbook(migration)
if nodes:
actual = _get_yml(migration.prune_playbook_path)
expected = _get_expected_yml(plan_name, 'expected_{}'.format(PRUNE_PLAYBOOK_NAME))
assert expected == actual, "file lists mismatch:\n\nExpected\n{}\nActual\n{}".format(expected, actual)
else:
assert not os.path.exists(migration.prune_playbook_path), migration.prune_playbook_path
开发者ID:dimagi,项目名称:commcarehq-ansible,代码行数:14,代码来源:test_couch_migration.py
示例19: test_with_correct_script_path
def test_with_correct_script_path(self):
with mock.patch('cts_framework.db.dao.script_dao.ScriptDAO.get_script_execution_details') as get_script_execution_details:
script_execution = Mock(configuration="", script_path="valid_script_path")
get_script_execution_details.return_value = script_execution
with mock.patch('cts_framework.tests_managing.tests_manager.TestsManager.get_packages') as get_packages:
get_packages.return_value = TestsPackagesContainerMock()
replay_action = ReplayActionUnderTest()
with mock.patch('cts_framework.actions.execute.execute_test_scripts_action.ExecuteTestScriptsAction.execute_configuration_group') \
as execute_configuration_group:
configuration = Mock(replay_id=['62'])
with StdoutCapture() as output:
replay_action.process_action(configuration)
self.assertIn('Executing', output.raw)
开发者ID:01org,项目名称:intelRSD,代码行数:14,代码来源:test_replay_test_run.py
示例20: test_unpack_archive
def test_unpack_archive(self):
tmpdir = tempfile.mkdtemp()
dummy_archive_name = os.path.join("ambari_agent", "dummy_files",
"dummy_archive.zip")
archive_file = open(dummy_archive_name, "rb")
fileCache = FileCache(self.config)
fileCache.unpack_archive(archive_file, tmpdir)
# Count summary size of unpacked files:
total_size = 0
total_files = 0
total_dirs = 0
for dirpath, dirnames, filenames in os.walk(tmpdir):
total_dirs += 1
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
total_files += 1
self.assertEquals(total_size, 51258L)
self.assertEquals(total_files, 28)
self.assertEquals(total_dirs, 8)
shutil.rmtree(tmpdir)
# Test exception handling
with patch("os.path.isdir") as isdir_mock:
isdir_mock.side_effect = self.exc_side_effect
try:
fileCache.unpack_archive(archive_file, tmpdir)
self.fail('CachingException not thrown')
except CachingException:
pass # Expected
except Exception, e:
self.fail('Unexpected exception thrown:' + str(e))
开发者ID:OpenPOWER-BigData,项目名称:HDP-ambari,代码行数:32,代码来源:TestFileCache.py
注:本文中的mock.mock.patch函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论