本文整理汇总了Python中mock.call函数的典型用法代码示例。如果您正苦于以下问题:Python call函数的具体用法?Python call怎么用?Python call使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了call函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: checkDiffTacFile
def checkDiffTacFile(self, quiet):
"""
Utility function to test calling _makeBuildbotTac() on base directory
with a buildbot.tac file, with does needs to be changed.
@param quiet: the value of 'quiet' argument for _makeBuildbotTac()
"""
# set-up mocks to simulate buildbot.tac file in basedir
self.patch(os.path, "exists", mock.Mock(return_value=True))
self.setUpOpen("old-tac-contents")
# call _makeBuildbotTac()
create_slave._makeBuildbotTac("bdir", "new-tac-contents", quiet)
# check that buildbot.tac.new file was created with expected contents
tac_file_path = os.path.join("bdir", "buildbot.tac")
self.open.assert_has_calls([mock.call(tac_file_path, "rt"),
mock.call(tac_file_path + ".new", "wt")])
self.fileobj.write.assert_called_once_with("new-tac-contents")
self.chmod.assert_called_once_with(tac_file_path + ".new", 0o600)
# check output to the log
if quiet:
self.assertWasQuiet()
else:
self.assertLogged("not touching existing buildbot.tac",
"creating buildbot.tac.new instead")
开发者ID:BenjamenMeyer,项目名称:buildbot,代码行数:27,代码来源:test_scripts_create_slave.py
示例2: check_estimated_size_bytes
def check_estimated_size_bytes(self, entity_bytes, timestamp, namespace=None):
"""A helper method to test get_estimated_size_bytes"""
timestamp_req = helper.make_request(
self._PROJECT, namespace, helper.make_latest_timestamp_query(namespace))
timestamp_resp = self.make_stats_response(
{'timestamp': datastore_helper.from_timestamp(timestamp)})
kind_stat_req = helper.make_request(
self._PROJECT, namespace, helper.make_kind_stats_query(
namespace, self._query.kind[0].name,
datastore_helper.micros_from_timestamp(timestamp)))
kind_stat_resp = self.make_stats_response(
{'entity_bytes': entity_bytes})
def fake_run_query(req):
if req == timestamp_req:
return timestamp_resp
elif req == kind_stat_req:
return kind_stat_resp
else:
print kind_stat_req
raise ValueError("Unknown req: %s" % req)
self._mock_datastore.run_query.side_effect = fake_run_query
self.assertEqual(entity_bytes, ReadFromDatastore.get_estimated_size_bytes(
self._PROJECT, namespace, self._query, self._mock_datastore))
self.assertEqual(self._mock_datastore.run_query.call_args_list,
[call(timestamp_req), call(kind_stat_req)])
开发者ID:amitsela,项目名称:incubator-beam,代码行数:28,代码来源:datastoreio_test.py
示例3: test_reload_allocations
def test_reload_allocations(self):
exp_host_name = '/dhcp/cccccccc-cccc-cccc-cccc-cccccccccccc/host'
exp_host_data = """
00:00:80:aa:bb:cc,192-168-0-2.openstacklocal,192.168.0.2
00:00:f3:aa:bb:cc,fdca-3ba5-a17a-4ba3--2.openstacklocal,fdca:3ba5:a17a:4ba3::2
00:00:0f:aa:bb:cc,192-168-0-3.openstacklocal,192.168.0.3
00:00:0f:aa:bb:cc,fdca-3ba5-a17a-4ba3--3.openstacklocal,fdca:3ba5:a17a:4ba3::3
""".lstrip()
exp_opt_name = '/dhcp/cccccccc-cccc-cccc-cccc-cccccccccccc/opts'
exp_opt_data = "tag:tag0,option:router,192.168.0.1"
fake_v6 = 'gdca:3ba5:a17a:4ba3::1'
fake_v6_cidr = 'gdca:3ba5:a17a:4ba3::/64'
exp_opt_data = """
tag:tag0,option:dns-server,8.8.8.8
tag:tag0,option:classless-static-route,20.0.0.1/24,20.0.0.1
tag:tag0,option:router,192.168.0.1
tag:tag1,option:dns-server,%s
tag:tag1,option:classless-static-route,%s,%s""".lstrip() % (fake_v6,
fake_v6_cidr,
fake_v6)
exp_args = ['ip', 'netns', 'exec', 'qdhcp-ns', 'kill', '-HUP', 5]
with mock.patch('os.path.isdir') as isdir:
isdir.return_value = True
with mock.patch.object(dhcp.Dnsmasq, 'pid') as pid:
pid.__get__ = mock.Mock(return_value=5)
dm = dhcp.Dnsmasq(self.conf, FakeDualNetwork(),
namespace='qdhcp-ns')
dm.reload_allocations()
self.safe.assert_has_calls([mock.call(exp_host_name, exp_host_data),
mock.call(exp_opt_name, exp_opt_data)])
self.execute.assert_called_once_with(exp_args, root_helper='sudo',
check_exit_code=True)
开发者ID:a3linux,项目名称:quantum,代码行数:35,代码来源:test_linux_dhcp.py
示例4: test_get_blkdev_error
def test_get_blkdev_error(self, mock_get_blkdev):
mock_exec = mock.Mock()
output = volume_utils.setup_blkio_cgroup('src', 'dst', 1,
execute=mock_exec)
self.assertIsNone(output)
mock_get_blkdev.assert_has_calls([mock.call('src'), mock.call('dst')])
self.assertFalse(mock_exec.called)
开发者ID:daming000,项目名称:cinder,代码行数:7,代码来源:test_volume_utils.py
示例5: test_run_queue
def test_run_queue(self, _execute_task):
"""When run() is called, ensure tasks are run, and
the queue is flushed to remove run tasks. Also, ensure True
is returned since messages were processed.
"""
from furious.test_stubs.appengine.queues import run_queue
queue_service = Mock()
queue_service.GetTasks.return_value = ['task1', 'task2', 'task3']
num_processed = run_queue(queue_service, 'default')
# Expect _execute_task() to be called for each task
expected_call_args_list = [call('task1', None, None),
call('task2', None, None),
call('task3', None, None)]
self.assertEquals(_execute_task.call_args_list,
expected_call_args_list)
# Make sure FlushQueue was called once to clear the queue after
# tasks were processed
self.assertEqual(1, queue_service.FlushQueue.call_count)
# We should have processed tasks, so verify the num processed.
self.assertEqual(3, num_processed)
开发者ID:aaronkavlie-wf,项目名称:furious,代码行数:27,代码来源:test_queues.py
示例6: test_adding_return_value_mock
def test_adding_return_value_mock(self):
for Klass in Mock, MagicMock:
mock = Klass()
mock.return_value = MagicMock()
mock()()
self.assertEqual(mock.mock_calls, [call(), call()()])
开发者ID:5m0k3r,项目名称:desarrollo_web_udp,代码行数:7,代码来源:testmock.py
示例7: _test_plug
def _test_plug(self, namespace=None, mtu=None):
def device_exists(device, namespace=None):
return device.startswith('brq')
root_veth = mock.Mock()
ns_veth = mock.Mock()
self.ip().add_veth = mock.Mock(return_value=(root_veth, ns_veth))
self.device_exists.side_effect = device_exists
br = interface.BridgeInterfaceDriver(self.conf)
mac_address = 'aa:bb:cc:dd:ee:ff'
br.plug('01234567-1234-1234-99',
'port-1234',
'ns-0',
mac_address,
namespace=namespace)
ip_calls = [mock.call(),
mock.call().add_veth('tap0', 'ns-0', namespace2=namespace)]
ns_veth.assert_has_calls([mock.call.link.set_address(mac_address)])
if mtu:
ns_veth.assert_has_calls([mock.call.link.set_mtu(mtu)])
root_veth.assert_has_calls([mock.call.link.set_mtu(mtu)])
self.ip.assert_has_calls(ip_calls)
root_veth.assert_has_calls([mock.call.link.set_up()])
ns_veth.assert_has_calls([mock.call.link.set_up()])
开发者ID:21atlas,项目名称:neutron,代码行数:29,代码来源:test_interface.py
示例8: test_get_undredacted_processed_with_trouble
def test_get_undredacted_processed_with_trouble(self):
# setup some internal behaviors and fake outs
boto_s3_store = self.setup_mocked_s3_storage(
TransactionExecutorWithLimitedBackoff
)
mocked_bucket = boto_s3_store._connect_to_endpoint.return_value \
.get_bucket.return_value
mocked_key = mocked_bucket.new_key.return_value
mocked_key.get_contents_as_string \
.side_effect = [
self._fake_unredacted_processed_crash_as_string()
]
actions = [
mocked_bucket,
ABadDeal('second-hit'),
ABadDeal('first hit'),
]
def temp_failure_fn(key):
self.assertEqual(key, '120408')
action = actions.pop()
if isinstance(action, Exception):
raise action
return action
boto_s3_store._connect_to_endpoint.return_value.get_bucket \
.side_effect = (
temp_failure_fn
)
# the tested call
result = boto_s3_store.get_unredacted_processed(
"936ce666-ff3b-4c7a-9674-367fe2120408"
)
# what should have happened internally
self.assertEqual(boto_s3_store._calling_format.call_count, 3)
boto_s3_store._calling_format.assert_called_with()
self.assertEqual(boto_s3_store._connect_to_endpoint.call_count, 3)
self.assert_s3_connection_parameters(boto_s3_store)
self.assertEqual(
boto_s3_store._mocked_connection.get_bucket.call_count,
3
)
boto_s3_store._mocked_connection.get_bucket \
.assert_has_calls(
[
mock.call('120408'),
mock.call('120408'),
mock.call('120408'),
],
)
self.assertEqual(mocked_key.get_contents_as_string.call_count, 1)
mocked_key.get_contents_as_string.assert_has_calls(
[mock.call(), ],
)
self.assertEqual(result, self._fake_unredacted_processed_crash())
开发者ID:SKY51717,项目名称:socorro,代码行数:60,代码来源:test_crashstorage.py
示例9: test_remoteGetWorkerInfo_slave_2_16
def test_remoteGetWorkerInfo_slave_2_16(self):
"""In buildslave 2.16 all information about worker is retrieved in
a single getSlaveInfo() call."""
def side_effect(*args, **kwargs):
if 'getWorkerInfo' in args:
return defer.fail(twisted_pb.RemoteError(
'twisted.spread.flavors.NoSuchMethod', None, None))
if 'getSlaveInfo' in args:
return defer.succeed({
'info': 'test',
'slave_commands': {'x': 1, 'y': 2},
'version': 'TheVersion',
})
if 'getCommands' in args:
return defer.succeed({'x': 1, 'y': 2})
if 'getVersion' in args:
return defer.succeed('TheVersion')
self.mind.callRemote.side_effect = side_effect
conn = pb.Connection(self.master, self.worker, self.mind)
info = yield conn.remoteGetWorkerInfo()
r = {'info': 'test', 'worker_commands': {
'y': 2, 'x': 1}, 'version': 'TheVersion'}
self.assertEqual(info, r)
calls = [
mock.call('getWorkerInfo'),
mock.call('print',
message='buildbot-slave detected, failing back to deprecated buildslave API. '
'(Ignoring missing getWorkerInfo method.)'),
mock.call('getSlaveInfo'),
]
self.mind.callRemote.assert_has_calls(calls)
开发者ID:stuarta,项目名称:buildbot,代码行数:33,代码来源:test_worker_protocols_pb.py
示例10: test_makeRelayCrypto
def test_makeRelayCrypto(self, mock_rc, mock_maes, mock_sha1, mock_hexp,
mock_hext):
secret_input = 'secret input'
km = [chr(i) for i in range(96)]
ret = ntor._makeRelayCrypto(secret_input)
mock_hext.assert_called_once_with(
salt='ntor-curve25519-sha256-1:key_extract',
input_key_material='secret input',
hash=hashlib.sha256)
mock_hexp.assert_called_once_with(
pseudo_random_key='prk',
info='ntor-curve25519-sha256-1:key_expand',
length=72,
hash=hashlib.sha256)
self.assertEqual(mock_sha1.call_count, 2)
self.assertEqual(mock_sha1.call_args_list,
[mock.call(km[:20]), mock.call(km[20:40])])
self.assertEqual(mock_maes.call_count, 2)
self.assertEqual(mock_maes.call_args_list,
[mock.call(km[40:56]), mock.call(km[56:72])])
mock_rc.assert_called_once_with(
forward_cipher='cipher', forward_digest='sha1',
backward_cipher='cipher', backward_digest='sha1')
self.assertEqual(ret, 'ret')
开发者ID:nskinkel,项目名称:oppy,代码行数:28,代码来源:test_ntor.py
示例11: test_mysql_mode_locks_unlocks_tables
def test_mysql_mode_locks_unlocks_tables(self, mock_create_dir, mock_get_lvm_info, mock_get_vol_fs_type, mock_popen):
mock_get_vol_fs_type.return_value = 'xfs'
mock_get_lvm_info.return_value = {
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
mock_process = Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 0
mock_popen.return_value = mock_process
backup_opt = Mock()
backup_opt.snapshot = True
backup_opt.lvm_auto_snap = ''
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
backup_opt.lvm_snapperm = 'ro'
backup_opt.mode = 'mysql'
backup_opt.mysql_db_inst = Mock()
mock_cursor = Mock()
backup_opt.mysql_db_inst.cursor.return_value = mock_cursor
self.assertTrue(lvm.lvm_snap(backup_opt))
first_call = call('FLUSH TABLES WITH READ LOCK')
second_call = call('UNLOCK TABLES')
self.assertEquals(first_call, mock_cursor.execute.call_args_list[0])
self.assertEquals(second_call, mock_cursor.execute.call_args_list[1])
开发者ID:sckevmit,项目名称:freezer,代码行数:28,代码来源:test_lvm.py
示例12: test_transfer_accept_invalid_volume
def test_transfer_accept_invalid_volume(self, mock_notify):
svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop)
tx_api = transfer_api.API()
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
volume = storage.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('awaiting-transfer', volume['status'],
'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume.status = 'wrong'
volume.save()
self.assertRaises(exception.InvalidVolume,
tx_api.accept,
self.ctxt, transfer['id'], transfer['auth_key'])
volume.status = 'awaiting-transfer'
volume.save()
# Because the InvalidVolume exception is raised in tx_api, so there is
# only transfer.accept.start called and missing transfer.accept.end.
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start")]
mock_notify.assert_has_calls(calls)
self.assertEqual(3, mock_notify.call_count)
开发者ID:HybridF5,项目名称:jacket,代码行数:28,代码来源:test_volume_transfer.py
示例13: test_create_hm_with_vip
def test_create_hm_with_vip(self):
with self.subnet() as subnet:
with self.health_monitor() as hm:
with self.pool(provider='radware',
subnet_id=subnet['subnet']['id']) as pool:
with self.vip(pool=pool, subnet=subnet):
self.plugin_instance.create_pool_health_monitor(
context.get_admin_context(),
hm, pool['pool']['id']
)
# Test REST calls
calls = [
mock.call(
'POST', '/api/workflow/' + pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
),
mock.call(
'POST', '/api/workflow/' + pool['pool']['id'] +
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
phm = self.plugin_instance.get_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'], pool['pool']['id']
)
self.assertEqual(phm['status'], constants.ACTIVE)
开发者ID:basilbaby,项目名称:neutron,代码行数:33,代码来源:test_plugin_driver.py
示例14: _test_fdb_add
def _test_fdb_add(self, proxy_enabled=False):
fdb_entries = {'net_id':
{'ports':
{'agent_ip': [constants.FLOODING_ENTRY,
['port_mac', 'port_ip']]},
'network_type': 'vxlan',
'segment_id': 1}}
with mock.patch.object(utils, 'execute',
return_value='') as execute_fn, \
mock.patch.object(ip_lib, 'add_neigh_entry',
return_value='') as add_fn:
self.lb_rpc.fdb_add(None, fdb_entries)
expected = [
mock.call(['bridge', 'fdb', 'show', 'dev', 'vxlan-1'],
run_as_root=True),
mock.call(['bridge', 'fdb', 'add',
constants.FLOODING_ENTRY[0],
'dev', 'vxlan-1', 'dst', 'agent_ip'],
run_as_root=True,
check_exit_code=False),
mock.call(['bridge', 'fdb', 'replace', 'port_mac', 'dev',
'vxlan-1', 'dst', 'agent_ip'],
run_as_root=True,
check_exit_code=False),
]
execute_fn.assert_has_calls(expected)
if proxy_enabled:
add_fn.assert_called_with('port_ip', 'port_mac', 'vxlan-1')
else:
add_fn.assert_not_called()
开发者ID:eayunstack,项目名称:neutron,代码行数:32,代码来源:test_linuxbridge_neutron_agent.py
示例15: test_interval_adjustment
def test_interval_adjustment(self, sleep_mock):
self.num_runs = 2
timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
timer.start(periodic_interval_max=5).wait()
sleep_mock.assert_has_calls([mock.call(5), mock.call(1)])
开发者ID:Ericyuanhui,项目名称:oslo.service,代码行数:7,代码来源:test_loopingcall.py
示例16: test_remoteGetWorkerInfo_no_info
def test_remoteGetWorkerInfo_no_info(self):
# All remote commands tried in remoteGetWorkerInfo are unavailable.
# This should be real old worker...
def side_effect(*args, **kwargs):
if args[0] == 'print':
return
return defer.fail(twisted_pb.RemoteError(
'twisted.spread.flavors.NoSuchMethod', None, None))
self.mind.callRemote.side_effect = side_effect
conn = pb.Connection(self.master, self.worker, self.mind)
info = yield conn.remoteGetWorkerInfo()
r = {}
self.assertEqual(info, r)
calls = [
mock.call('getWorkerInfo'),
mock.call('print',
message='buildbot-slave detected, failing back to deprecated buildslave API. '
'(Ignoring missing getWorkerInfo method.)'),
mock.call('getSlaveInfo'),
mock.call('getCommands'),
mock.call('getVersion'),
]
self.mind.callRemote.assert_has_calls(calls)
开发者ID:stuarta,项目名称:buildbot,代码行数:25,代码来源:test_worker_protocols_pb.py
示例17: test_initial_delay
def test_initial_delay(self, sleep_mock):
self.num_runs = 1
timer = loopingcall.DynamicLoopingCall(self._wait_for_zero)
timer.start(initial_delay=3).wait()
sleep_mock.assert_has_calls([mock.call(3), mock.call(1)])
开发者ID:Ericyuanhui,项目名称:oslo.service,代码行数:7,代码来源:test_loopingcall.py
示例18: test_check_events_acquired
def test_check_events_acquired(self, mock_datetime):
"""
`check_events` checks events in each bucket when they are partitoned.
"""
self.kz_partition.acquired = True
self.scheduler_service.startService()
self.kz_partition.__iter__.return_value = [2, 3]
self.scheduler_service.log = mock.Mock()
mock_datetime.utcnow.return_value = 'utcnow'
responses = [4, 5]
self.check_events_in_bucket.side_effect = lambda *_: defer.succeed(responses.pop(0))
d = self.scheduler_service.check_events(100)
self.assertEqual(self.successResultOf(d), [4, 5])
self.assertEqual(self.kz_partition.__iter__.call_count, 1)
self.scheduler_service.log.bind.assert_called_once_with(
scheduler_run_id='transaction-id', utcnow='utcnow')
log = self.scheduler_service.log.bind.return_value
log.msg.assert_called_once_with('Got buckets {buckets}',
buckets=[2, 3], path='/part_path')
self.assertEqual(self.check_events_in_bucket.mock_calls,
[mock.call(log, self.mock_store, 2, 'utcnow', 100),
mock.call(log, self.mock_store, 3, 'utcnow', 100)])
开发者ID:MariaAbrahms,项目名称:otter,代码行数:25,代码来源:test_scheduler.py
示例19: test_comit_no_parents
def test_comit_no_parents(self):
mocked_repo = MagicMock()
mocked_parent = MagicMock()
mocked_parent.id = 1
mocked_repo.status.return_value = True
mocked_repo.index.write_tree.return_value = "tree"
mocked_repo.revparse_single.return_value = mocked_parent
mocked_repo.create_commit.return_value = "commit"
author = ("author_1", "author_2")
commiter = ("commiter_1", "commiter_2")
with patch('gitfs.repository.Signature') as mocked_signature:
mocked_signature.return_value = "signature"
repo = Repository(mocked_repo)
commit = repo.commit("message", author, commiter)
assert commit == "commit"
assert mocked_repo.status.call_count == 1
assert mocked_repo.index.write_tree.call_count == 1
assert mocked_repo.index.write.call_count == 1
mocked_signature.has_calls([call(*author), call(*commiter)])
mocked_repo.revparse_single.assert_called_once_with("HEAD")
mocked_repo.create_commit.assert_called_once_with("HEAD",
"signature",
"signature",
"message",
"tree", [1])
开发者ID:krodyrobi,项目名称:gitfs,代码行数:32,代码来源:test_repository.py
示例20: test_low_e_open_string
def test_low_e_open_string(self, mock_compare_notes):
mock_compare_notes.side_effect = [0, 1]
note = ('E', 2)
expected = tuple(note)
self.assertEqual(notemappings._lowest_string_with(note), expected)
calls = [call(('E', 2), note), call(('E', 4), note)]
mock_compare_notes.assert_has_calls(calls)
开发者ID:v-for-vincent,项目名称:pyngwie,代码行数:7,代码来源:notemappings_tests.py
注:本文中的mock.call函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论