本文整理汇总了Python中mock.call.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_run
def test_run(self):
def se_ras(klass):
if mock_ras.call_count < 4:
return None
raise RuntimeError()
with patch('%s.read_and_send' % pb, autospec=True) as mock_ras:
with patch('%s.sleep' % pbm, autospec=True) as mock_sleep:
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
mock_ras.side_effect = se_ras
with pytest.raises(RuntimeError):
self.cls.run()
assert mock_ras.mock_calls == [
call(self.cls),
call(self.cls),
call(self.cls),
call(self.cls)
]
assert mock_sleep.mock_calls == [
call(60.0),
call(60.0),
call(60.0)
]
assert mock_logger.mock_calls == [
call.info('Running sensor daemon loop...'),
call.debug('Sleeping %ss', 60.0),
call.debug('Sleeping %ss', 60.0),
call.debug('Sleeping %ss', 60.0)
]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:29,代码来源:test_sensor_daemon.py
示例2: test_read_and_send_bad_status_code
def test_read_and_send_bad_status_code(self):
s1 = Mock(spec_set=BaseSensor)
s1.read.return_value = {
'sensor1': {'data': 's1data'},
'sensor2': {'data': 's2data'},
}
self.cls.sensors = [s1]
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s.requests.post' % pbm, autospec=True) as mock_post:
mock_post.return_value = Mock(status_code=404, text='foo')
self.cls.read_and_send()
url = 'http://foo.bar.baz:1234/v1/sensors/update'
data = {
'host_id': 'myhostid',
'sensors': {
'sensor1': {'data': 's1data'},
'sensor2': {'data': 's2data'}
}
}
assert mock_post.mock_calls == [
call(url, json=data)
]
assert mock_logger.mock_calls == [
call.debug('Reading sensors'),
call.debug('POSTing sensor data to %s: %s', url, data),
call.error('Error POSTing sensor data; got status code %s: %s',
404, 'foo')
]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:30,代码来源:test_sensor_daemon.py
示例3: test_find_usage_nat_gateways
def test_find_usage_nat_gateways(self):
subnets = result_fixtures.VPC.test_find_usage_nat_gateways_subnets
response = result_fixtures.VPC.test_find_usage_nat_gateways
mock_conn = Mock()
mock_conn.describe_nat_gateways.return_value = response
with patch('%s.logger' % self.pbm) as mock_logger:
cls = _VpcService(21, 43)
cls.conn = mock_conn
cls._find_usage_nat_gateways(subnets)
assert len(cls.limits['NAT Gateways per AZ'].get_current_usage()) == 2
az2 = cls.limits['NAT Gateways per AZ'].get_current_usage()[0]
assert az2.get_value() == 3
assert az2.resource_id == 'az2'
az3 = cls.limits['NAT Gateways per AZ'].get_current_usage()[1]
assert az3.get_value() == 1
assert az3.resource_id == 'az3'
assert mock_conn.mock_calls == [
call.describe_nat_gateways(),
]
assert mock_logger.mock_calls == [
call.error(
'ERROR: NAT Gateway %s in SubnetId %s, but SubnetId not '
'found in subnet_to_az; Gateway cannot be counted!',
'nat-124', 'subnet4'
),
call.debug(
'Skipping NAT Gateway %s in state: %s', 'nat-125', 'deleted'
),
call.debug(
'Skipping NAT Gateway %s in state: %s', 'nat-127', 'failed'
)
]
开发者ID:jantman,项目名称:awslimitchecker,代码行数:35,代码来源:test_vpc.py
示例4: test_show_one_queue_empty
def test_show_one_queue_empty(self, capsys):
conn = Mock()
conn.receive_message.return_value = {}
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s._url_for_queue' % pb, autospec=True) as mock_url:
mock_url.return_value = 'myurl'
with patch('%s._delete_msg' % pb, autospec=True) as mock_del:
self.cls._show_one_queue(conn, 'foo', 1, delete=True)
out, err = capsys.readouterr()
assert err == ''
expected_out = "=> Queue 'foo' appears empty.\n"
assert out == expected_out
assert mock_del.mock_calls == []
assert conn.mock_calls == [
call.receive_message(
QueueUrl='myurl',
AttributeNames=['All'],
MessageAttributeNames=['All'],
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
]
assert mock_url.mock_calls == [
call(self.cls, conn, 'foo')
]
assert mock_logger.mock_calls == [
call.debug("Queue '%s' url: %s", 'foo', 'myurl'),
call.warning("Receiving %d messages from queue'%s'; this may "
"take up to 20 seconds.", 1, 'foo'),
call.debug('received no messages')
]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:31,代码来源:test_aws.py
示例5: test_get_api_id_aws
def test_get_api_id_aws(self):
def se_exc(*args, **kwargs):
raise Exception()
conf = Mock()
args = Mock(tf_path='tfpath')
with patch.multiple(
pbm,
autospec=True,
logger=DEFAULT,
TerraformRunner=DEFAULT,
AWSInfo=DEFAULT
) as mocks:
mocks['TerraformRunner'].return_value._get_outputs.side_effect = \
se_exc
mocks['AWSInfo'].return_value.get_api_id.return_value = 'myaid'
res = get_api_id(conf, args)
assert res == 'myaid'
assert mocks['TerraformRunner'].mock_calls == [
call(conf, 'tfpath'),
call()._get_outputs()
]
assert mocks['AWSInfo'].mock_calls == [
call(conf),
call().get_api_id()
]
assert mocks['logger'].mock_calls == [
call.debug('Trying to get Terraform rest_api_id output'),
call.info('Unable to find API rest_api_id from Terraform state; '
'querying AWS.', exc_info=1),
call.debug('AWS API ID: \'%s\'', 'myaid')
]
开发者ID:jantman,项目名称:webhook2lambda2sqs,代码行数:33,代码来源:test_runner.py
示例6: test_get_api_id_tf
def test_get_api_id_tf(self):
conf = Mock()
args = Mock(tf_path='tfpath')
with patch.multiple(
pbm,
autospec=True,
logger=DEFAULT,
TerraformRunner=DEFAULT,
AWSInfo=DEFAULT
) as mocks:
mocks['TerraformRunner'].return_value._get_outputs.return_value = {
'base_url': 'mytfbase',
'rest_api_id': 'myid'
}
res = get_api_id(conf, args)
assert res == 'myid'
assert mocks['TerraformRunner'].mock_calls == [
call(conf, 'tfpath'),
call()._get_outputs()
]
assert mocks['AWSInfo'].mock_calls == []
assert mocks['logger'].mock_calls == [
call.debug('Trying to get Terraform rest_api_id output'),
call.debug('Terraform rest_api_id output: \'%s\'', 'myid')
]
开发者ID:jantman,项目名称:webhook2lambda2sqs,代码行数:25,代码来源:test_runner.py
示例7: test_update
def test_update(self):
req_data = {
'host_id': 'myhostid',
'sensors': {
'sensor1': {
'type': 's1type',
'value': 12.345,
'alias': 's1alias',
'extra': 'extraS1'
}
}
}
req_json = json.dumps(req_data)
mock_req = MagicMock(spec_set=Request)
type(mock_req).responseHeaders = Mock()
mock_req.content.getvalue.return_value = req_json
type(mock_req).client = Mock(host='myhost')
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s._parse_json_request' % pb) as mock_parse:
with patch('%s.update_sensor' % pbm, autospec=True) as mock_upd:
mock_upd.return_value = 'myid'
mock_parse.return_value = req_data
res = self.cls.update(None, mock_req)
assert res.result == '{"ids": ["myid"], "status": "ok"}'
assert mock_parse.mock_calls == [call(mock_req)]
assert mock_req.mock_calls == [call.setResponseCode(201)]
assert mock_logger.mock_calls == [
call.debug(
'Received sensor update request from %s with content: %s',
'myhost',
req_data
),
call.debug('update_sensor() return value: %s', 'myid')
]
开发者ID:jantman,项目名称:RPyMostat,代码行数:34,代码来源:test_sensors.py
示例8: test_register_callbacks
def test_register_callbacks(self):
mock_listener = Mock(spec_set=InputEventListener)
self.cls.listener = mock_listener
with patch('%s.logger' % pbm) as mock_logger:
self.cls.register_callbacks()
assert mock_logger.mock_calls == [
call.debug("registering callbacks"),
call.debug('registering callback for %s ON', 0),
call.debug('registering callback for %s OFF', 0),
call.debug('registering callback for %s ON', 1),
call.debug('registering callback for %s OFF', 1),
call.debug('registering callback for %s ON', 2),
call.debug('registering callback for %s OFF', 2),
call.debug('registering callback for %s ON', 3),
call.debug('registering callback for %s OFF', 3),
call.debug('done registering callbacks'),
call.info('Initial pin states: %s', [10, 11, 12, 13])
]
assert mock_listener.mock_calls == [
call.register(0, IODIR_ON, self.cls.handle_input_on),
call.register(0, IODIR_OFF, self.cls.handle_input_off),
call.register(1, IODIR_ON, self.cls.handle_input_on),
call.register(1, IODIR_OFF, self.cls.handle_input_off),
call.register(2, IODIR_ON, self.cls.handle_input_on),
call.register(2, IODIR_OFF, self.cls.handle_input_off),
call.register(3, IODIR_ON, self.cls.handle_input_on),
call.register(3, IODIR_OFF, self.cls.handle_input_off),
]
assert self.cls.current_values == [10, 11, 12, 13]
开发者ID:jantman,项目名称:piface-webhooks,代码行数:29,代码来源:test_listener.py
示例9: test_discover_owfs
def test_discover_owfs(self):
self.cls.owfs_paths = ['/foo', '/bar', '/baz']
def se_exists(path):
if path.startswith('/baz'):
return True
if path == '/bar':
return True
return False
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s.os.path.exists' % pbm, autospec=True) as mock_ex:
mock_ex.side_effect = se_exists
res = self.cls._discover_owfs()
assert res == '/baz'
assert mock_ex.mock_calls == [
call('/foo'),
call('/bar'),
call('/bar/settings/units/temperature_scale'),
call('/baz'),
call('/baz/settings/units/temperature_scale')
]
assert mock_logger.mock_calls == [
call.debug('Attempting to find OWFS path/mountpoint from list '
'of common options: %s', ['/foo', '/bar', '/baz']),
call.debug('Path %s does not exist; skipping', '/foo'),
call.debug('Path %s exists but does not appear to have OWFS '
'mounted', '/bar'),
call.info('Found OWFS mounted at: %s', '/baz')
]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:30,代码来源:test_owfs.py
示例10: test_write_zip
def test_write_zip(self):
with patch('%s.zipfile.ZipFile' % pbm, autospec=True) as mock_zf:
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
self.cls._write_zip('myfsrc', 'mypath.zip')
# the only way I can find to capture attributes being set on the ZipInfo
# is to not mock it, but use a real ZipInfo object. Unfortunately, that
# makes assertin on calls a bit more difficult...
assert len(mock_zf.mock_calls) == 4
assert mock_zf.mock_calls[0] == call('mypath.zip', 'w')
assert mock_zf.mock_calls[1] == call().__enter__()
assert mock_zf.mock_calls[3] == call().__exit__(None, None, None)
# ok, now handle the second call, which should have the ZipInfo
# as its first argument...
# test that it's the right chained method call
assert mock_zf.mock_calls[2][0] == '().__enter__().writestr'
# test its arguments
arg_tup = mock_zf.mock_calls[2][1]
assert isinstance(arg_tup[0], ZipInfo)
assert arg_tup[0].filename == 'webhook2lambda2sqs_func.py'
assert arg_tup[0].date_time == (2016, 7, 1, 2, 3, 4)
assert arg_tup[0].external_attr == 0x0755 << 16
assert arg_tup[1] == 'myfsrc'
assert mock_logger.mock_calls == [
call.debug('setting zipinfo date to: %s', (2016, 7, 1, 2, 3, 4)),
call.debug('setting zipinfo file mode to: %s', (0x0755 << 16)),
call.debug('writing zip file at: %s', 'mypath.zip')
]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_tf_generator.py
示例11: test_generate
def test_generate(self):
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s._get_config' % pb, autospec=True) as mock_get:
with patch('%s.open' % pbm, mock_open(), create=True) as m_open:
with patch('%s._write_zip' % pb, autospec=True) as mock_zip:
mock_get.return_value = 'myjson'
self.cls.generate('myfunc')
assert mock_get.mock_calls == [call(self.cls, 'myfunc')]
assert m_open.mock_calls == [
call('./webhook2lambda2sqs_func.py', 'w'),
call().__enter__(),
call().write('myfunc'),
call().__exit__(None, None, None),
call('./webhook2lambda2sqs.tf.json', 'w'),
call().__enter__(),
call().write('myjson'),
call().__exit__(None, None, None)
]
assert mock_zip.mock_calls == [
call(self.cls, 'myfunc', './webhook2lambda2sqs_func.zip')
]
assert mock_logger.mock_calls == [
call.warning('Writing lambda function source to: '
'./webhook2lambda2sqs_func.py'),
call.debug('lambda function written'),
call.warning('Writing lambda function source zip file to: '
'./webhook2lambda2sqs_func.zip'),
call.debug('lambda zip written'),
call.warning('Writing terraform configuration JSON to: '
'./webhook2lambda2sqs.tf.json'),
call.debug('terraform configuration written'),
call.warning('Completed writing lambda function and TF config.')
]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:33,代码来源:test_tf_generator.py
示例12: test_read_and_send_exception
def test_read_and_send_exception(self):
def se_exc(*args, **kwargs):
raise Exception()
s1 = Mock(spec_set=BaseSensor)
s1.read.return_value = {
'sensor1': {'data': 's1data'},
'sensor2': {'data': 's2data'},
}
self.cls.sensors = [s1]
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s.requests.post' % pbm, autospec=True) as mock_post:
mock_post.side_effect = se_exc
self.cls.read_and_send()
url = 'http://foo.bar.baz:1234/v1/sensors/update'
data = {
'host_id': 'myhostid',
'sensors': {
'sensor1': {'data': 's1data'},
'sensor2': {'data': 's2data'}
}
}
assert mock_post.mock_calls == [
call(url, json=data)
]
assert mock_logger.mock_calls == [
call.debug('Reading sensors'),
call.debug('POSTing sensor data to %s: %s', url, data),
call.exception('Exception caught when trying to POST data to '
'Engine; will try again at next interval.')
]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:34,代码来源:test_sensor_daemon.py
示例13: test_show_cloudwatch_logs_none
def test_show_cloudwatch_logs_none(self, capsys):
resp = {
'logStreams': []
}
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s.client' % pbm, autospec=True) as mock_conn:
with patch('%s._show_log_stream' % pb, autospec=True) as sls:
mock_conn.return_value.describe_log_streams.return_value = \
resp
sls.side_effect = [1, 10]
self.cls.show_cloudwatch_logs(5)
out, err = capsys.readouterr()
assert err == ''
assert out == ''
assert mock_conn.mock_calls == [
call('logs'),
call().describe_log_streams(descending=True, limit=5,
logGroupName='/aws/lambda/myfname',
orderBy='LastEventTime')
]
assert sls.mock_calls == []
assert mock_logger.mock_calls == [
call.debug('Log Group Name: %s', '/aws/lambda/myfname'),
call.debug('Connecting to AWS Logs API'),
call.debug('Getting log streams'),
call.debug('Found %d log streams', 0)
]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_aws.py
示例14: test_handle_files
def test_handle_files(self):
flist = [
'foobar',
'pinevent_1420863332.123456_pin2_state1',
'pinevent_csds_pin3_state1',
'pinevent_1420863326.123456_pin3_state0',
'pinevent_1420863326.123456_pin2_state1',
'xsfjef_fhejfec_dfhe',
'pinevent_1420863326.456789_pin3_state2',
]
ex = Exception('foo')
def se_handle(fname, evt_datetime, pin, state):
if fname == 'pinevent_1420863332.123456_pin2_state1':
raise ex
type(self.config).QUEUE_PATH = '/foo/bar'
with patch('%s.logger' % pbm) as mock_logger:
with patch('%s.os.listdir' % pbm) as mock_listdir:
with patch('%s.handle_one_file' % pb) as mock_handle:
with patch('%s.os.unlink' % pbm) as mock_unlink:
mock_listdir.return_value = flist
mock_handle.side_effect = se_handle
self.cls.handle_files()
assert mock_logger.mock_calls == [
call.info("Found %d new events", 3),
call.debug('File handled; removing: %s',
'pinevent_1420863326.123456_pin2_state1'),
call.debug('File handled; removing: %s',
'pinevent_1420863326.123456_pin3_state0'),
call.exception('Execption while handling event file %s',
'pinevent_1420863332.123456_pin2_state1'),
]
assert mock_listdir.mock_calls == [call('/foo/bar')]
assert mock_handle.mock_calls == [
call('pinevent_1420863326.123456_pin2_state1',
datetime(2015, 1, 9, 23, 15, 26, 123456),
2, 1),
call('pinevent_1420863326.123456_pin3_state0',
datetime(2015, 1, 9, 23, 15, 26, 123456),
3, 0),
call('pinevent_1420863332.123456_pin2_state1',
datetime(2015, 1, 9, 23, 15, 32, 123456),
2, 1),
]
assert mock_unlink.mock_calls == [
call('/foo/bar/pinevent_1420863326.123456_pin2_state1'),
call('/foo/bar/pinevent_1420863326.123456_pin3_state0')
]
开发者ID:jantman,项目名称:piface-webhooks,代码行数:51,代码来源:test_worker.py
示例15: test_find_usage_apis_stages_now_paginated
def test_find_usage_apis_stages_now_paginated(self):
mock_conn = Mock()
res = result_fixtures.ApiGateway.get_rest_apis
mock_paginator = Mock()
mock_paginator.paginate.return_value = res
def se_res_paginate(restApiId=None):
return result_fixtures.ApiGateway.get_resources[restApiId]
mock_res_paginator = Mock()
mock_res_paginator.paginate.side_effect = se_res_paginate
def se_get_paginator(api_name):
if api_name == 'get_rest_apis':
return mock_paginator
elif api_name == 'get_resources':
return mock_res_paginator
def se_paginate_dict(*args, **kwargs):
if args[0] == mock_conn.get_documentation_parts:
return result_fixtures.ApiGateway.doc_parts[kwargs['restApiId']]
if args[0] == mock_conn.get_authorizers:
return result_fixtures.ApiGateway.authorizers[
kwargs['restApiId']
]
def se_get_stages(restApiId=None):
r = deepcopy(result_fixtures.ApiGateway.stages[restApiId])
r['position'] = 'foo'
return r
mock_conn.get_paginator.side_effect = se_get_paginator
mock_conn.get_stages.side_effect = se_get_stages
cls = _ApigatewayService(21, 43)
cls.conn = mock_conn
with patch('%s.paginate_dict' % pbm, autospec=True) as mock_pd:
with patch('%s.logger' % pbm) as mock_logger:
mock_pd.side_effect = se_paginate_dict
cls._find_usage_apis()
assert mock_logger.mock_calls == [
call.debug('Finding usage for APIs'),
call.debug('Found %d APIs', 5),
call.debug('Finding usage for per-API limits'),
call.warning(
'APIGateway get_stages returned more keys than present in '
'boto3 docs: %s', ['item', 'position']
)
]
开发者ID:jantman,项目名称:awslimitchecker,代码行数:48,代码来源:test_apigateway.py
示例16: test_can_add_mandate
def test_can_add_mandate(self, logging_mock):
political_office = PoliticalOfficeFactory.create()
legislator = LegislatorFactory.create()
now = datetime.utcnow()
d1 = now.date()
date_start = date_to_timestamp(d1)
d2 = (now + timedelta(days=10)).date()
date_end = date_to_timestamp(d2)
data = {
'legislator_id': legislator.id,
'political_office_id': political_office.id,
'date_start': date_start,
'date_end': date_end,
}
mandate = Mandate.add_mandate(self.db, data)
expect(mandate.legislator).to_equal(legislator)
expect(mandate.political_office).to_equal(political_office)
expect(mandate.date_start).to_equal(d1)
expect(mandate.date_end).to_equal(d2)
expect(logging_mock.mock_calls).to_include(
call.debug('Added mandate: "%s"', str(mandate))
)
开发者ID:marcelometal,项目名称:politicos-tornado,代码行数:25,代码来源:test_mandate.py
示例17: test_boto3_connection_kwargs_sts_again_other_account
def test_boto3_connection_kwargs_sts_again_other_account(self):
cls = ConnectableTester(account_id='123', account_role='myrole',
region='myregion')
mock_creds = Mock()
type(mock_creds).access_key = 'sts_ak'
type(mock_creds).secret_key = 'sts_sk'
type(mock_creds).session_token = 'sts_token'
type(mock_creds).account_id = '456'
with patch('%s._get_sts_token' % pb) as mock_get_sts:
with patch('%s.logger' % pbm) as mock_logger:
with patch('%s.boto3.Session' % pbm) as mock_sess:
mock_get_sts.return_value = mock_creds
Connectable.credentials = mock_creds
res = cls._boto3_connection_kwargs
assert mock_get_sts.mock_calls == [call()]
assert mock_logger.mock_calls == [
call.debug("Previous STS credentials are for account %s; "
"getting new credentials for current account "
"(%s)", '456', '123')
]
assert mock_sess.mock_calls == []
assert res == {
'region_name': 'myregion',
'aws_access_key_id': 'sts_ak',
'aws_secret_access_key': 'sts_sk',
'aws_session_token': 'sts_token'
}
开发者ID:bflad,项目名称:awslimitchecker,代码行数:28,代码来源:test_connectable.py
示例18: test_set_account_info_env
def test_set_account_info_env(self):
self.cls.aws_account_id = None
self.cls.aws_region = None
with patch('%s.logger' % pbm, autospec=True) as mock_logger:
with patch('%s.client' % pbm, autospec=True) as mock_client:
mock_client.return_value.get_user.return_value = {
'User': {'Arn': 'arn:aws:iam::123456789:user/foo'}
}
type(mock_client.return_value)._client_config = Mock(
region_name='myregion')
with patch.dict(
'%s.os.environ' % pbm,
{'AWS_REGION': 'ar'},
clear=True):
self.cls._set_account_info()
assert self.cls.aws_account_id == '123456789'
assert self.cls.aws_region == 'myregion'
assert mock_client.mock_calls == [
call('iam', region_name='ar'),
call().get_user(),
call('lambda', region_name='ar')
]
assert mock_logger.mock_calls == [
call.debug('Connecting to IAM with region_name=%s', 'ar'),
call.info('Found AWS account ID as %s; region: %s',
'123456789', 'myregion')
]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_tf_generator.py
示例19: test_boto3_connection_kwargs_profile
def test_boto3_connection_kwargs_profile(self):
cls = ConnectableTester(profile_name='myprof')
m_creds = Mock()
type(m_creds).access_key = 'ak'
type(m_creds).secret_key = 'sk'
type(m_creds).token = 'tkn'
mock_session = Mock()
m_sess = Mock()
m_sess.get_credentials.return_value = m_creds
type(mock_session)._session = m_sess
with patch('%s._get_sts_token' % pb) as mock_get_sts:
with patch('%s.logger' % pbm) as mock_logger:
with patch('%s.boto3.Session' % pbm) as mock_sess:
Connectable.credentials = None
mock_sess.return_value = mock_session
res = cls._boto3_connection_kwargs
assert mock_get_sts.mock_calls == []
assert mock_logger.mock_calls == [
call.debug('Using credentials profile: %s', 'myprof')
]
assert mock_sess.mock_calls == [call(profile_name='myprof')]
assert res == {
'region_name': None,
'aws_access_key_id': 'ak',
'aws_secret_access_key': 'sk',
'aws_session_token': 'tkn'
}
开发者ID:bflad,项目名称:awslimitchecker,代码行数:28,代码来源:test_connectable.py
示例20: test_parser_timeout
def test_parser_timeout(self):
"""
Test to verify fix for https://github.com/andresriancho/w3af/issues/6723
"w3af running long time more than 24h"
"""
modc = 'w3af.core.data.parsers.parser_cache.%s'
modp = 'w3af.core.data.parsers.document_parser.%s'
with patch(modc % 'om.out') as om_mock,\
patch(modc % 'ParserCache.PARSER_TIMEOUT', new_callable=PropertyMock) as timeout_mock,\
patch(modp % 'DocumentParser.PARSERS', new_callable=PropertyMock) as parsers_mock:
timeout_mock.return_value = 1
parsers_mock.return_value = [DelayedParser]
html = '<html>foo!</html>'
http_resp = _build_http_response(html, u'text/html')
try:
self.dpc.get_document_parser_for(http_resp)
except BaseFrameworkException:
msg = '[timeout] The parser took more than %s seconds'\
' to complete parsing of "%s", killed it!'
error = msg % (ParserCache.PARSER_TIMEOUT,
http_resp.get_url())
self.assertIn(call.debug(error), om_mock.mock_calls)
else:
self.assertTrue(False)
开发者ID:RON313,项目名称:w3af,代码行数:30,代码来源:test_parser_cache.py
注:本文中的mock.call.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论