本文整理汇总了Python中mock.patch.object函数的典型用法代码示例。如果您正苦于以下问题:Python object函数的具体用法?Python object怎么用?Python object使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了object函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_readManifestFile__synapseStore_values_are_set
def test_readManifestFile__synapseStore_values_are_set():
project_id = "syn123"
header = 'path\tparent\tsynapseStore\n'
path1 = os.path.abspath(os.path.expanduser('~/file1.txt'))
path2 = 'http://www.synapse.org'
path3 = os.path.abspath(os.path.expanduser('~/file3.txt'))
path4 = 'http://www.github.com'
path5 = os.path.abspath(os.path.expanduser('~/file5.txt'))
path6 = 'http://www.checkoutmymixtapefam.com/fire.mp3'
row1 = '%s\t%s\tTrue\n' % (path1, project_id)
row2 = '%s\t%s\tTrue\n' % (path2, project_id)
row3 = '%s\t%s\tFalse\n' % (path3, project_id)
row4 = '%s\t%s\tFalse\n' % (path4, project_id)
row5 = '%s\t%s\t""\n' % (path5, project_id)
row6 = '%s\t%s\t""\n' % (path6, project_id)
expected_synapseStore = {
str(path1): True,
str(path2): False,
str(path3): False,
str(path4): False,
str(path5): True,
str(path6): False
}
manifest = StringIO(header+row1+row2+row3+row4+row5+row6)
with patch.object(syn, "get", return_value=Project()),\
patch.object(os.path, "isfile", return_value=True): # mocks values for: file1.txt, file3.txt, file5.txt
manifest_dataframe = synapseutils.sync.readManifestFile(syn, manifest)
actual_synapseStore = (manifest_dataframe.set_index('path')['synapseStore'].to_dict())
assert_dict_equal(expected_synapseStore, actual_synapseStore)
开发者ID:Sage-Bionetworks,项目名称:synapsePythonClient,代码行数:34,代码来源:unit_test_synapseutils_sync.py
示例2: test_graceful_stop_on_one_container_error
def test_graceful_stop_on_one_container_error(runner_factory, rabbit_config):
runner = runner_factory(rabbit_config, ExampleService, SecondService)
runner.start()
container = get_container(runner, ExampleService)
second_container = get_container(runner, SecondService)
original_stop = second_container.stop
with patch.object(second_container, 'stop', autospec=True,
wraps=original_stop) as stop:
rpc_consumer = get_extension(container, RpcConsumer)
with patch.object(
rpc_consumer, 'handle_result', autospec=True) as handle_result:
exception = Exception("error")
handle_result.side_effect = exception
# use a standalone rpc proxy to call exampleservice.task()
with ServiceRpcProxy("exampleservice", rabbit_config) as proxy:
# proxy.task() will hang forever because it generates an error
# in the remote container (so never receives a response).
proxy.task.call_async()
# verify that the error bubbles up to runner.wait()
with pytest.raises(Exception) as exc_info:
runner.wait()
assert exc_info.value == exception
# Check that the second service was stopped due to the first
# service being killed
stop.assert_called_once_with()
开发者ID:evilino,项目名称:nameko,代码行数:30,代码来源:test_errors.py
示例3: test_SplitQueryFn_without_num_splits
def test_SplitQueryFn_without_num_splits(self):
with patch.object(helper, 'get_datastore',
return_value=self._mock_datastore):
# Force SplitQueryFn to compute the number of query splits
num_splits = 0
expected_num_splits = 23
entity_bytes = (expected_num_splits *
ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES)
with patch.object(ReadFromDatastore, 'get_estimated_size_bytes',
return_value=entity_bytes):
def fake_get_splits(datastore, query, num_splits, partition=None):
return self.split_query(query, num_splits)
with patch.object(query_splitter, 'get_splits',
side_effect=fake_get_splits):
split_query_fn = ReadFromDatastore.SplitQueryFn(
self._PROJECT, self._query, None, num_splits)
split_query_fn.start_bundle()
returned_split_queries = []
for split_query in split_query_fn.process(self._query):
returned_split_queries.append(split_query)
self.assertEqual(len(returned_split_queries), expected_num_splits)
self.assertEqual(0,
len(self._mock_datastore.run_query.call_args_list))
self.verify_unique_keys(returned_split_queries)
开发者ID:amitsela,项目名称:incubator-beam,代码行数:27,代码来源:datastoreio_test.py
示例4: test_get_listed_users
def test_get_listed_users(self):
usr1 = models.CassandraUser(self._get_random_name(1025))
usr2 = models.CassandraUser(self._get_random_name(1025))
usr3 = models.CassandraUser(self._get_random_name(1025))
db1 = models.CassandraSchema('db1')
db2 = models.CassandraSchema('db2')
usr1.databases.append(db1.serialize())
usr3.databases.append(db2.serialize())
rv_1 = NonCallableMagicMock()
rv_1.configure_mock(name=usr1.name, super=False)
rv_2 = NonCallableMagicMock()
rv_2.configure_mock(name=usr2.name, super=False)
rv_3 = NonCallableMagicMock()
rv_3.configure_mock(name=usr3.name, super=True)
with patch.object(self.conn, 'execute', return_value=iter(
[rv_1, rv_2, rv_3])):
with patch.object(self.admin, '_get_acl',
return_value={usr1.name: {db1.name: {'SELECT'},
db2.name: {}},
usr3.name: {db2.name: {'SELECT'}}}
):
usrs = self.manager.list_users(self.context)
self.conn.execute.assert_has_calls([
call(self.__LIST_USR_FORMAT),
], any_order=True)
self.assertIn(usr1.serialize(), usrs[0])
self.assertIn(usr2.serialize(), usrs[0])
self.assertIn(usr3.serialize(), usrs[0])
开发者ID:gongwayne,项目名称:Openstack,代码行数:30,代码来源:test_cassandra_manager.py
示例5: test_perfscale_longrun_perf
def test_perfscale_longrun_perf(self):
client = Mock()
pprof_collector = Mock()
new_client = Mock()
new_models = [Mock(), Mock()]
args = argparse.Namespace(run_length=1)
with patch.object(
pl, 'until_timeout',
autospec=True, return_value=[1]):
with patch.object(pl, 'action_rest', autospec=True):
with patch.object(
pl, 'action_create',
autospec=True, return_value=new_client) as m_ac:
with patch.object(
pl, 'action_busy',
autospec=True, return_value=new_models) as m_ab:
with patch.object(
pl, 'action_cleanup',
autospec=True) as m_acu:
pl.perfscale_longrun_perf(
client, pprof_collector, args)
m_ac.assert_called_once_with(client)
m_ab.assert_called_once_with(new_client, ['dummy-sink'])
m_acu.assert_called_once_with(new_client, new_models)
开发者ID:mjs,项目名称:juju,代码行数:26,代码来源:test_perfscale_longrunning.py
示例6: test_returns_ordered_dictionary_of_details
def test_returns_ordered_dictionary_of_details(self):
"""Must be ordered on the event_range."""
# Use ordered dict here so we can check the returned order has changed
# later on.
first = '2016-10-16 20:28:06 - 2016-10-16 20:29:44'
second = '2016-10-16 20:30:13 - 2016-10-16 20:32:21'
fake_data = OrderedDict()
fake_data[second] = {'20:30:13 - 20:30:33': 'Log message second'}
fake_data[first] = {'20:28:06 - 20:28:26': 'Log message'}
name_lookup = {first: 'First', second: 'Second'}
with patch.object(
gpr, '_get_chunked_log',
return_value=fake_data, autospec=True):
with patch.object(
gpr, '_get_log_name_lookup_table',
return_value=name_lookup, autospec=True):
details = gpr.breakdown_log_by_events_timeframe(
'/tmp', 'boostrap', 'cleanup', [])
self.assertIsInstance(details, OrderedDict)
items = details.items()
self.assertEqual(items[0][0], first)
self.assertEqual(items[1][0], second)
开发者ID:mjs,项目名称:juju,代码行数:25,代码来源:test_generate_perfscale_results.py
示例7: test_loop_body
def test_loop_body(self):
# Shifting the schedule must be handled.
self.rf_sensor._started = True
with patch.object(TDMA_Scheduler, "shift") as shift_mock:
with patch.object(TDMA_Scheduler, "update") as update_mock:
with patch.object(RF_Sensor_Physical_Texas_Instruments, "_receive") as receive_mock:
try:
self.rf_sensor._loop_body()
except DisabledException:
pass
receive_mock.assert_called_once_with()
self.assertEqual(shift_mock.call_count, 1)
self.assertEqual(update_mock.call_count, 1)
self.assertNotEqual(self.rf_sensor._polling_time, 0.0)
# Regular updates must be handled.
self.rf_sensor._started = False
with patch.object(RF_Sensor_Physical_Texas_Instruments, "_receive") as receive_mock:
# The receive method must be called.
self.rf_sensor._loop_body()
receive_mock.assert_called_once_with()
开发者ID:timvandermeij,项目名称:mobile-radio-tomography,代码行数:25,代码来源:zigbee_rf_sensor_physical_texas_instruments.py
示例8: max_header_length_test
def max_header_length_test():
message = create.from_string(LONG_HEADER)
# this used to fail because exceeded max depth recursion
ok_(message.headers.getraw('subject').encode("utf-8") in message.to_string())
unicode_subject = (u"Это сообщение с длинным сабжектом "
u"специально чтобы проверить кодировки")
ascii_subject = "This is simple ascii subject"
with patch.object(
headers.encoding, 'MAX_HEADER_LENGTH', len(ascii_subject) + 1):
eq_(Header(ascii_subject.encode("ascii"), "ascii", header_name="Subject"),
encode_unstructured("Subject", ascii_subject))
with patch.object(
headers.encoding, 'MAX_HEADER_LENGTH', len(unicode_subject) + 1):
eq_(Header(unicode_subject.encode("utf-8"), "utf-8", header_name="Subject"),
encode_unstructured("Subject", unicode_subject))
with patch.object(headers.encoding, 'MAX_HEADER_LENGTH', 1):
eq_(ascii_subject.encode("utf-8"),
encode_unstructured("Subject", ascii_subject))
eq_(unicode_subject.encode("utf-8"),
encode_unstructured("Subject", unicode_subject))
开发者ID:nylas,项目名称:flanker,代码行数:29,代码来源:encoding_test.py
示例9: test_bookmarklet
def test_bookmarklet(self):
"""
Does api/bookmarklet fetch, save, and return a response for the recipe?
"""
fromTest = fromdir(__file__)
loc = fromTest('recipe_page_source.html')
pageSource = open(loc).read()
pGet = patch.object(treq, 'get', return_value=defer.succeed(None), autospec=True)
pTreqContent = patch.object(treq, 'content', return_value=defer.succeed(pageSource), autospec=True)
with pGet, pTreqContent:
# normal bookmarketing
u = self._users()[0]
req = self.requestJSON([], session_user=u)
req.args['uri'] = ['http://www.foodandwine.com/recipes/poutine-style-twice-baked-potatoes']
ret = yield self.handler('bookmarklet', req)
self.assertEqual(len(recipe.Recipe.objects()), 1)
expectedResults = '{"status": "ok", "recipes": [{"name": "Delicious Meatless Meatballs", "urlKey": "weirdo-gmail-com-delicious-meatless-meatballs-"}], "message": ""}'
assert ret == expectedResults
# # not signed in to noms; bookmarketing should not be allowed
req = self.requestJSON([])
req.args['uri'] = ['http://www.foodandwine.com/recipes/poutine-style-twice-baked-potatoes']
ret = yield self.handler('bookmarklet', req)
expectedResults = '{"status": "error", "recipes": [], "message": "User was not logged in."}'
assert ret == expectedResults
开发者ID:corydodt,项目名称:Noms,代码行数:27,代码来源:test_server.py
示例10: test_corrupted_read_writes_new
def test_corrupted_read_writes_new(library):
with ArcticTransaction(library, symbol, 'u1', 'l1') as mt:
mt.write(symbol, ts1)
res = library.read(symbol)
assert res.version == 1
with ArcticTransaction(library, symbol, 'u1', 'l2') as mt:
mt.write(symbol, ts2)
res = library.read(symbol)
assert res.version == 2
with patch.object(library, 'read') as l:
l.side_effect = OperationFailure('some failure')
with ArcticTransaction(library, symbol, 'u1', 'l2') as mt:
mt.write(symbol, ts3, metadata={'a': 1, 'b': 2})
res = library.read(symbol)
# Corrupted data still increments on write to next version correctly with new data
assert res.version == 3
assert_frame_equal(ts3, library.read(symbol, 3).data)
assert res.metadata == {'a': 1, 'b': 2}
with patch.object(library, 'read') as l:
l.side_effect = OperationFailure('some failure')
with ArcticTransaction(library, symbol, 'u1', 'l2') as mt:
mt.write(symbol, ts3, metadata={'a': 1, 'b': 2})
res = library.read(symbol)
# Corrupted data still increments to next version correctly with ts & metadata unchanged
assert res.version == 4
assert_frame_equal(ts3, library.read(symbol, 4).data)
assert res.metadata == {'a': 1, 'b': 2}
开发者ID:JianfengYu,项目名称:arctic,代码行数:34,代码来源:test_version_store_audit.py
示例11: test_predict_1
def test_predict_1(self):
with patch.object(self.xgb._model, "predict_proba",
return_value=IV):
with patch.object(self.xgb, "_clf", None):
ov = np.zeros((1, 5))
self.xgb._predict({}, ov, 0)
assert np.allclose(IV, ov)
开发者ID:WladimirSidorenko,项目名称:DiscourseSenser,代码行数:7,代码来源:test_xgb_base.py
示例12: test_watchers_are_finished
def test_watchers_are_finished(self, *args):
"""
Test for asserting that watchers are closed in LibevConnection
This test simulates a process termination without calling cluster.shutdown(), which would trigger
LibevConnection._libevloop._cleanup. It will check the watchers have been closed
Finally it will restore the LibevConnection reactor so it doesn't affect
the rest of the tests
@since 3.10
@jira_ticket PYTHON-747
@expected_result the watchers are closed
@test_category connection
"""
with patch.object(LibevConnection._libevloop, "_thread"), \
patch.object(LibevConnection._libevloop, "notify"):
self.make_connection()
# We have to make a copy because the connections shouldn't
# be alive when we verify them
live_connections = set(LibevConnection._libevloop._live_conns)
# This simulates the process ending without cluster.shutdown()
# being called, then with atexit _cleanup for libevreactor would
# be called
libev__cleanup(weakref.ref(LibevConnection._libevloop))
for conn in live_connections:
for watcher in (conn._write_watcher, conn._read_watcher):
self.assertTrue(watcher.stop.mock_calls)
LibevConnection._libevloop._shutdown = False
开发者ID:stonefly,项目名称:python-driver,代码行数:33,代码来源:test_libevreactor.py
示例13: test_connect_subscribes_if_subscription_is_set
def test_connect_subscribes_if_subscription_is_set(self):
with patch.object(self.external_queue, 'close'), \
patch.object(self.external_queue, '_subscribe') as mock_subscribe:
self.external_queue.subscription = 'routing_key'
self.external_queue._connect()
self.assertEqual(mock_subscribe.call_count, 1)
开发者ID:renanlage,项目名称:hived,代码行数:7,代码来源:test_queue.py
示例14: test_validate_instance_flavors
def test_validate_instance_flavors(self, create_nove_cli_mock):
patch.object(
create_nove_cli_mock.return_value, 'flavors',
new_callable=PropertyMock(return_value=Mock()))
mock_flv = create_nove_cli_mock.return_value.flavors.get.return_value
mock_flv.ephemeral = 0
test_instances = [{'flavor_id': 1, 'volume_size': 10},
{'flavor_id': 1, 'volume_size': 1.5,
'region_name': 'home'},
{'flavor_id': 2, 'volume_size': 3,
'region_name': 'work'}]
models.validate_instance_flavors(Mock(), test_instances,
True, True)
create_nove_cli_mock.assert_has_calls([call(ANY, None),
call(ANY, 'home'),
call(ANY, 'work')])
self.assertRaises(exception.LocalStorageNotSpecified,
models.validate_instance_flavors,
Mock(), test_instances, False, True)
mock_flv.ephemeral = 1
models.validate_instance_flavors(Mock(), test_instances,
False, True)
开发者ID:Tesora-Release,项目名称:tesora-trove,代码行数:25,代码来源:test_models.py
示例15: test_setup_for_ha_enabled
def test_setup_for_ha_enabled(self):
client = fake_juju_client()
client.bootstrap()
client.enable_ha()
admin_client = client.get_controller_client()
with patch.object(
gpr, '_setup_system_monitoring', autospec=True) as m_ssm:
with patch.object(
gpr, '_enable_monitoring', autospec=True) as m_em:
self.assertListEqual(
['0', '1', '2'],
gpr.setup_system_monitoring(admin_client))
self.assertListEqual(
m_ssm.call_args_list,
[
call(admin_client, '0'),
call(admin_client, '1'),
call(admin_client, '2')])
self.assertListEqual(
m_em.call_args_list,
[
call(admin_client, '0'),
call(admin_client, '1'),
call(admin_client, '2')])
开发者ID:mjs,项目名称:juju,代码行数:25,代码来源:test_generate_perfscale_results.py
示例16: test_to_swift_req_subrequest_proxy_access_log
def test_to_swift_req_subrequest_proxy_access_log(self):
container = 'bucket'
obj = 'obj'
method = 'GET'
# force_swift_request_proxy_log is True
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=True)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertFalse(sw_req.environ['swift.proxy_access_log_made'])
# force_swift_request_proxy_log is False
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=False)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertTrue(sw_req.environ['swift.proxy_access_log_made'])
开发者ID:mahak,项目名称:swift,代码行数:32,代码来源:test_s3request.py
示例17: test_create_report_graph_returns_base_file_path
def test_create_report_graph_returns_base_file_path(self):
"""The returned filepath should just be the basename."""
generator = Mock()
start = 0000
end = 9999
file_list = ['example.rrd']
rrd_dir = '/foo'
output_file = '/bar/test.png'
output_file_base = 'test.png'
graph_period = '0'
with patch.object(
gpr.os, 'listdir',
autospec=True, return_value=file_list) as m_list:
with patch.object(
gpr, 'get_duration_points',
autospec=True, return_value=(start, end)) as m_gdp:
self.assertEqual(
output_file_base,
gpr.create_report_graph(
rrd_dir, output_file, generator, graph_period)
)
m_gdp.assert_called_once_with('/foo/example.rrd', graph_period)
m_list.assert_called_once_with(rrd_dir)
generator.assert_called_once_with(
start, end, rrd_dir, output_file)
开发者ID:mjs,项目名称:juju,代码行数:26,代码来源:test_generate_perfscale_results.py
示例18: test_make_connection_no_signal
def test_make_connection_no_signal(self, source_getter, sink_getter):
"""Test that building connections adds a new signal to the model."""
model = Model()
class A(object):
pass
# Create a connection from a to b
connection = mock.Mock()
connection.pre_obj = A()
connection.post_obj = A()
# Create a mock network
network = mock.Mock()
network.seed = None
network.connections = [connection]
network.ensembles = []
network.nodes = []
network.networks = []
network.probes = []
# Patch the getters, add a null builder
with patch.object(model, "source_getters", {A: source_getter}), \
patch.object(model, "sink_getters", {A: sink_getter}), \
patch.object(model, "connection_parameter_builders",
{A: mock.Mock()}):
# Build the network
model.build(network)
# Assert that no signal exists
assert connection not in model.connections_signals
开发者ID:pabogdan,项目名称:nengo_spinnaker,代码行数:31,代码来源:test_builder.py
示例19: test_calls_provided_test
def test_calls_provided_test(self):
client = fake_juju_client()
with temp_dir() as juju_home:
client.env.juju_home = juju_home
bs_manager = make_bootstrap_manager(client)
bs_manager.log_dir = os.path.join(juju_home, 'log-dir')
os.mkdir(bs_manager.log_dir)
timing = gpr.TimingData(datetime.utcnow(), datetime.utcnow())
deploy_details = gpr.DeployDetails('test', dict(), timing)
noop_test = Mock(return_value=deploy_details)
pprof_collector = Mock()
with patch.object(gpr, 'dump_performance_metrics_logs',
autospec=True):
with patch.object(gpr, 'generate_reports', autospec=True):
with patch.object(
gpr, 'PPROFCollector', autospec=True) as p_pc:
p_pc.return_value = pprof_collector
gpr.run_perfscale_test(
noop_test,
bs_manager,
get_default_args())
noop_test.assert_called_once_with(
client, pprof_collector, get_default_args())
开发者ID:mjs,项目名称:juju,代码行数:27,代码来源:test_generate_perfscale_results.py
示例20: test_probe_building_disabled
def test_probe_building_disabled(self, recwarn):
"""Test that build methods are not called and that a warning is raised
if probe building is disabled.
"""
# Create test network
with nengo.Network() as network:
a = nengo.Ensemble(100, 2)
p_a = nengo.Probe(a, label="Output")
# Create a model
model = Model()
# Dummy neurons builder
ens_build = mock.Mock(name="ensemble builder")
# Define a probe build function
build_ens_probe = mock.Mock()
# Build the model
probe_builders = {nengo.Ensemble: build_ens_probe}
with patch.object(model, "builders", new={nengo.Ensemble: ens_build}),\
patch.object(model, "probe_builders", new=probe_builders):
model.build(network, build_probes=False)
# Assert the probes were NOT built
assert p_a not in model.seeds
assert build_ens_probe.call_count == 0
# And that a warning was raised
w = recwarn.pop()
assert "Probes" in str(w.message)
assert "disabled" in str(w.message)
开发者ID:pabogdan,项目名称:nengo_spinnaker,代码行数:32,代码来源:test_builder.py
注:本文中的mock.patch.object函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论