• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python call.debug函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mock.call.debug函数的典型用法代码示例。如果您正苦于以下问题:Python debug函数的具体用法?Python debug怎么用?Python debug使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了debug函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_run

    def test_run(self):
        def se_ras(klass):
            if mock_ras.call_count < 4:
                return None
            raise RuntimeError()

        with patch('%s.read_and_send' % pb, autospec=True) as mock_ras:
            with patch('%s.sleep' % pbm, autospec=True) as mock_sleep:
                with patch('%s.logger' % pbm, autospec=True) as mock_logger:
                    mock_ras.side_effect = se_ras
                    with pytest.raises(RuntimeError):
                        self.cls.run()
        assert mock_ras.mock_calls == [
            call(self.cls),
            call(self.cls),
            call(self.cls),
            call(self.cls)
        ]
        assert mock_sleep.mock_calls == [
            call(60.0),
            call(60.0),
            call(60.0)
        ]
        assert mock_logger.mock_calls == [
            call.info('Running sensor daemon loop...'),
            call.debug('Sleeping %ss', 60.0),
            call.debug('Sleeping %ss', 60.0),
            call.debug('Sleeping %ss', 60.0)
        ]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:29,代码来源:test_sensor_daemon.py


示例2: test_read_and_send_bad_status_code

    def test_read_and_send_bad_status_code(self):
        s1 = Mock(spec_set=BaseSensor)
        s1.read.return_value = {
            'sensor1': {'data': 's1data'},
            'sensor2': {'data': 's2data'},
        }

        self.cls.sensors = [s1]

        with patch('%s.logger' % pbm, autospec=True) as mock_logger:
            with patch('%s.requests.post' % pbm, autospec=True) as mock_post:
                mock_post.return_value = Mock(status_code=404, text='foo')
                self.cls.read_and_send()
        url = 'http://foo.bar.baz:1234/v1/sensors/update'
        data = {
             'host_id': 'myhostid',
             'sensors': {
                 'sensor1': {'data': 's1data'},
                 'sensor2': {'data': 's2data'}
             }
         }
        assert mock_post.mock_calls == [
            call(url, json=data)
        ]
        assert mock_logger.mock_calls == [
            call.debug('Reading sensors'),
            call.debug('POSTing sensor data to %s: %s', url, data),
            call.error('Error POSTing sensor data; got status code %s: %s',
                       404, 'foo')
        ]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:30,代码来源:test_sensor_daemon.py


示例3: test_find_usage_nat_gateways

    def test_find_usage_nat_gateways(self):
        subnets = result_fixtures.VPC.test_find_usage_nat_gateways_subnets
        response = result_fixtures.VPC.test_find_usage_nat_gateways

        mock_conn = Mock()
        mock_conn.describe_nat_gateways.return_value = response

        with patch('%s.logger' % self.pbm) as mock_logger:
            cls = _VpcService(21, 43)
            cls.conn = mock_conn
            cls._find_usage_nat_gateways(subnets)

        assert len(cls.limits['NAT Gateways per AZ'].get_current_usage()) == 2
        az2 = cls.limits['NAT Gateways per AZ'].get_current_usage()[0]
        assert az2.get_value() == 3
        assert az2.resource_id == 'az2'
        az3 = cls.limits['NAT Gateways per AZ'].get_current_usage()[1]
        assert az3.get_value() == 1
        assert az3.resource_id == 'az3'
        assert mock_conn.mock_calls == [
            call.describe_nat_gateways(),
        ]
        assert mock_logger.mock_calls == [
            call.error(
                'ERROR: NAT Gateway %s in SubnetId %s, but SubnetId not '
                'found in subnet_to_az; Gateway cannot be counted!',
                'nat-124', 'subnet4'
            ),
            call.debug(
                'Skipping NAT Gateway %s in state: %s', 'nat-125', 'deleted'
            ),
            call.debug(
                'Skipping NAT Gateway %s in state: %s', 'nat-127', 'failed'
            )
        ]
开发者ID:jantman,项目名称:awslimitchecker,代码行数:35,代码来源:test_vpc.py


示例4: test_show_one_queue_empty

 def test_show_one_queue_empty(self, capsys):
     conn = Mock()
     conn.receive_message.return_value = {}
     with patch('%s.logger' % pbm, autospec=True) as mock_logger:
         with patch('%s._url_for_queue' % pb, autospec=True) as mock_url:
             mock_url.return_value = 'myurl'
             with patch('%s._delete_msg' % pb, autospec=True) as mock_del:
                 self.cls._show_one_queue(conn, 'foo', 1, delete=True)
     out, err = capsys.readouterr()
     assert err == ''
     expected_out = "=> Queue 'foo' appears empty.\n"
     assert out == expected_out
     assert mock_del.mock_calls == []
     assert conn.mock_calls == [
         call.receive_message(
             QueueUrl='myurl',
             AttributeNames=['All'],
             MessageAttributeNames=['All'],
             MaxNumberOfMessages=1,
             WaitTimeSeconds=20
         )
     ]
     assert mock_url.mock_calls == [
         call(self.cls, conn, 'foo')
     ]
     assert mock_logger.mock_calls == [
         call.debug("Queue '%s' url: %s", 'foo', 'myurl'),
         call.warning("Receiving %d messages from queue'%s'; this may "
                      "take up to 20 seconds.", 1, 'foo'),
         call.debug('received no messages')
     ]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:31,代码来源:test_aws.py


示例5: test_get_api_id_aws

    def test_get_api_id_aws(self):

        def se_exc(*args, **kwargs):
            raise Exception()

        conf = Mock()
        args = Mock(tf_path='tfpath')
        with patch.multiple(
            pbm,
            autospec=True,
            logger=DEFAULT,
            TerraformRunner=DEFAULT,
            AWSInfo=DEFAULT
        ) as mocks:
            mocks['TerraformRunner'].return_value._get_outputs.side_effect = \
                se_exc
            mocks['AWSInfo'].return_value.get_api_id.return_value = 'myaid'
            res = get_api_id(conf, args)
        assert res == 'myaid'
        assert mocks['TerraformRunner'].mock_calls == [
            call(conf, 'tfpath'),
            call()._get_outputs()
        ]
        assert mocks['AWSInfo'].mock_calls == [
            call(conf),
            call().get_api_id()
        ]
        assert mocks['logger'].mock_calls == [
            call.debug('Trying to get Terraform rest_api_id output'),
            call.info('Unable to find API rest_api_id from Terraform state; '
                      'querying AWS.', exc_info=1),
            call.debug('AWS API ID: \'%s\'', 'myaid')
        ]
开发者ID:jantman,项目名称:webhook2lambda2sqs,代码行数:33,代码来源:test_runner.py


示例6: test_get_api_id_tf

 def test_get_api_id_tf(self):
     conf = Mock()
     args = Mock(tf_path='tfpath')
     with patch.multiple(
         pbm,
         autospec=True,
         logger=DEFAULT,
         TerraformRunner=DEFAULT,
         AWSInfo=DEFAULT
     ) as mocks:
         mocks['TerraformRunner'].return_value._get_outputs.return_value = {
             'base_url': 'mytfbase',
             'rest_api_id': 'myid'
         }
         res = get_api_id(conf, args)
     assert res == 'myid'
     assert mocks['TerraformRunner'].mock_calls == [
         call(conf, 'tfpath'),
         call()._get_outputs()
     ]
     assert mocks['AWSInfo'].mock_calls == []
     assert mocks['logger'].mock_calls == [
         call.debug('Trying to get Terraform rest_api_id output'),
         call.debug('Terraform rest_api_id output: \'%s\'', 'myid')
     ]
开发者ID:jantman,项目名称:webhook2lambda2sqs,代码行数:25,代码来源:test_runner.py


示例7: test_update

 def test_update(self):
     req_data = {
         'host_id': 'myhostid',
         'sensors': {
             'sensor1': {
                 'type': 's1type',
                 'value': 12.345,
                 'alias': 's1alias',
                 'extra': 'extraS1'
             }
         }
     }
     req_json = json.dumps(req_data)
     mock_req = MagicMock(spec_set=Request)
     type(mock_req).responseHeaders = Mock()
     mock_req.content.getvalue.return_value = req_json
     type(mock_req).client = Mock(host='myhost')
     with patch('%s.logger' % pbm, autospec=True) as mock_logger:
         with patch('%s._parse_json_request' % pb) as mock_parse:
             with patch('%s.update_sensor' % pbm, autospec=True) as mock_upd:
                 mock_upd.return_value = 'myid'
                 mock_parse.return_value = req_data
                 res = self.cls.update(None, mock_req)
     assert res.result == '{"ids": ["myid"], "status": "ok"}'
     assert mock_parse.mock_calls == [call(mock_req)]
     assert mock_req.mock_calls == [call.setResponseCode(201)]
     assert mock_logger.mock_calls == [
         call.debug(
             'Received sensor update request from %s with content: %s',
             'myhost',
             req_data
         ),
         call.debug('update_sensor() return value: %s', 'myid')
     ]
开发者ID:jantman,项目名称:RPyMostat,代码行数:34,代码来源:test_sensors.py


示例8: test_register_callbacks

 def test_register_callbacks(self):
     mock_listener = Mock(spec_set=InputEventListener)
     self.cls.listener = mock_listener
     with patch('%s.logger' % pbm) as mock_logger:
         self.cls.register_callbacks()
     assert mock_logger.mock_calls == [
         call.debug("registering callbacks"),
         call.debug('registering callback for %s ON', 0),
         call.debug('registering callback for %s OFF', 0),
         call.debug('registering callback for %s ON', 1),
         call.debug('registering callback for %s OFF', 1),
         call.debug('registering callback for %s ON', 2),
         call.debug('registering callback for %s OFF', 2),
         call.debug('registering callback for %s ON', 3),
         call.debug('registering callback for %s OFF', 3),
         call.debug('done registering callbacks'),
         call.info('Initial pin states: %s', [10, 11, 12, 13])
     ]
     assert mock_listener.mock_calls == [
         call.register(0, IODIR_ON, self.cls.handle_input_on),
         call.register(0, IODIR_OFF, self.cls.handle_input_off),
         call.register(1, IODIR_ON, self.cls.handle_input_on),
         call.register(1, IODIR_OFF, self.cls.handle_input_off),
         call.register(2, IODIR_ON, self.cls.handle_input_on),
         call.register(2, IODIR_OFF, self.cls.handle_input_off),
         call.register(3, IODIR_ON, self.cls.handle_input_on),
         call.register(3, IODIR_OFF, self.cls.handle_input_off),
     ]
     assert self.cls.current_values == [10, 11, 12, 13]
开发者ID:jantman,项目名称:piface-webhooks,代码行数:29,代码来源:test_listener.py


示例9: test_discover_owfs

    def test_discover_owfs(self):
        self.cls.owfs_paths = ['/foo', '/bar', '/baz']

        def se_exists(path):
            if path.startswith('/baz'):
                return True
            if path == '/bar':
                return True
            return False

        with patch('%s.logger' % pbm, autospec=True) as mock_logger:
            with patch('%s.os.path.exists' % pbm, autospec=True) as mock_ex:
                mock_ex.side_effect = se_exists
                res = self.cls._discover_owfs()
        assert res == '/baz'
        assert mock_ex.mock_calls == [
            call('/foo'),
            call('/bar'),
            call('/bar/settings/units/temperature_scale'),
            call('/baz'),
            call('/baz/settings/units/temperature_scale')
        ]
        assert mock_logger.mock_calls == [
            call.debug('Attempting to find OWFS path/mountpoint from list '
                       'of common options: %s', ['/foo', '/bar', '/baz']),
            call.debug('Path %s does not exist; skipping', '/foo'),
            call.debug('Path %s exists but does not appear to have OWFS '
                       'mounted', '/bar'),
            call.info('Found OWFS mounted at: %s', '/baz')
        ]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:30,代码来源:test_owfs.py


示例10: test_write_zip

 def test_write_zip(self):
     with patch('%s.zipfile.ZipFile' % pbm, autospec=True) as mock_zf:
         with patch('%s.logger' % pbm, autospec=True) as mock_logger:
             self.cls._write_zip('myfsrc', 'mypath.zip')
     # the only way I can find to capture attributes being set on the ZipInfo
     # is to not mock it, but use a real ZipInfo object. Unfortunately, that
     # makes assertin on calls a bit more difficult...
     assert len(mock_zf.mock_calls) == 4
     assert mock_zf.mock_calls[0] == call('mypath.zip', 'w')
     assert mock_zf.mock_calls[1] == call().__enter__()
     assert mock_zf.mock_calls[3] == call().__exit__(None, None, None)
     # ok, now handle the second call, which should have the ZipInfo
     # as its first argument...
     # test that it's the right chained method call
     assert mock_zf.mock_calls[2][0] == '().__enter__().writestr'
     # test its arguments
     arg_tup = mock_zf.mock_calls[2][1]
     assert isinstance(arg_tup[0], ZipInfo)
     assert arg_tup[0].filename == 'webhook2lambda2sqs_func.py'
     assert arg_tup[0].date_time == (2016, 7, 1, 2, 3, 4)
     assert arg_tup[0].external_attr == 0x0755 << 16
     assert arg_tup[1] == 'myfsrc'
     assert mock_logger.mock_calls == [
         call.debug('setting zipinfo date to: %s', (2016, 7, 1, 2, 3, 4)),
         call.debug('setting zipinfo file mode to: %s', (0x0755 << 16)),
         call.debug('writing zip file at: %s', 'mypath.zip')
     ]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_tf_generator.py


示例11: test_generate

 def test_generate(self):
     with patch('%s.logger' % pbm, autospec=True) as mock_logger:
         with patch('%s._get_config' % pb, autospec=True) as mock_get:
             with patch('%s.open' % pbm, mock_open(), create=True) as m_open:
                 with patch('%s._write_zip' % pb, autospec=True) as mock_zip:
                     mock_get.return_value = 'myjson'
                     self.cls.generate('myfunc')
     assert mock_get.mock_calls == [call(self.cls, 'myfunc')]
     assert m_open.mock_calls == [
         call('./webhook2lambda2sqs_func.py', 'w'),
         call().__enter__(),
         call().write('myfunc'),
         call().__exit__(None, None, None),
         call('./webhook2lambda2sqs.tf.json', 'w'),
         call().__enter__(),
         call().write('myjson'),
         call().__exit__(None, None, None)
     ]
     assert mock_zip.mock_calls == [
         call(self.cls, 'myfunc', './webhook2lambda2sqs_func.zip')
     ]
     assert mock_logger.mock_calls == [
         call.warning('Writing lambda function source to: '
                      './webhook2lambda2sqs_func.py'),
         call.debug('lambda function written'),
         call.warning('Writing lambda function source zip file to: '
                      './webhook2lambda2sqs_func.zip'),
         call.debug('lambda zip written'),
         call.warning('Writing terraform configuration JSON to: '
                      './webhook2lambda2sqs.tf.json'),
         call.debug('terraform configuration written'),
         call.warning('Completed writing lambda function and TF config.')
     ]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:33,代码来源:test_tf_generator.py


示例12: test_read_and_send_exception

    def test_read_and_send_exception(self):

        def se_exc(*args, **kwargs):
            raise Exception()

        s1 = Mock(spec_set=BaseSensor)
        s1.read.return_value = {
            'sensor1': {'data': 's1data'},
            'sensor2': {'data': 's2data'},
        }

        self.cls.sensors = [s1]

        with patch('%s.logger' % pbm, autospec=True) as mock_logger:
            with patch('%s.requests.post' % pbm, autospec=True) as mock_post:
                mock_post.side_effect = se_exc
                self.cls.read_and_send()
        url = 'http://foo.bar.baz:1234/v1/sensors/update'
        data = {
             'host_id': 'myhostid',
             'sensors': {
                 'sensor1': {'data': 's1data'},
                 'sensor2': {'data': 's2data'}
             }
         }
        assert mock_post.mock_calls == [
            call(url, json=data)
        ]
        assert mock_logger.mock_calls == [
            call.debug('Reading sensors'),
            call.debug('POSTing sensor data to %s: %s', url, data),
            call.exception('Exception caught when trying to POST data to '
                           'Engine; will try again at next interval.')
        ]
开发者ID:jantman,项目名称:RPyMostat-sensor,代码行数:34,代码来源:test_sensor_daemon.py


示例13: test_show_cloudwatch_logs_none

 def test_show_cloudwatch_logs_none(self, capsys):
     resp = {
         'logStreams': []
     }
     with patch('%s.logger' % pbm, autospec=True) as mock_logger:
         with patch('%s.client' % pbm, autospec=True) as mock_conn:
             with patch('%s._show_log_stream' % pb, autospec=True) as sls:
                 mock_conn.return_value.describe_log_streams.return_value = \
                     resp
                 sls.side_effect = [1, 10]
                 self.cls.show_cloudwatch_logs(5)
     out, err = capsys.readouterr()
     assert err == ''
     assert out == ''
     assert mock_conn.mock_calls == [
         call('logs'),
         call().describe_log_streams(descending=True, limit=5,
                                     logGroupName='/aws/lambda/myfname',
                                     orderBy='LastEventTime')
     ]
     assert sls.mock_calls == []
     assert mock_logger.mock_calls == [
         call.debug('Log Group Name: %s', '/aws/lambda/myfname'),
         call.debug('Connecting to AWS Logs API'),
         call.debug('Getting log streams'),
         call.debug('Found %d log streams', 0)
     ]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_aws.py


示例14: test_handle_files

    def test_handle_files(self):
        flist = [
            'foobar',
            'pinevent_1420863332.123456_pin2_state1',
            'pinevent_csds_pin3_state1',
            'pinevent_1420863326.123456_pin3_state0',
            'pinevent_1420863326.123456_pin2_state1',
            'xsfjef_fhejfec_dfhe',
            'pinevent_1420863326.456789_pin3_state2',
        ]

        ex = Exception('foo')

        def se_handle(fname, evt_datetime, pin, state):
            if fname == 'pinevent_1420863332.123456_pin2_state1':
                raise ex

        type(self.config).QUEUE_PATH = '/foo/bar'

        with patch('%s.logger' % pbm) as mock_logger:
            with patch('%s.os.listdir' % pbm) as mock_listdir:
                with patch('%s.handle_one_file' % pb) as mock_handle:
                    with patch('%s.os.unlink' % pbm) as mock_unlink:
                        mock_listdir.return_value = flist
                        mock_handle.side_effect = se_handle
                        self.cls.handle_files()
        assert mock_logger.mock_calls == [
            call.info("Found %d new events", 3),
            call.debug('File handled; removing: %s',
                       'pinevent_1420863326.123456_pin2_state1'),
            call.debug('File handled; removing: %s',
                       'pinevent_1420863326.123456_pin3_state0'),
            call.exception('Execption while handling event file %s',
                           'pinevent_1420863332.123456_pin2_state1'),
        ]
        assert mock_listdir.mock_calls == [call('/foo/bar')]
        assert mock_handle.mock_calls == [
            call('pinevent_1420863326.123456_pin2_state1',
                 datetime(2015, 1, 9, 23, 15, 26, 123456),
                 2, 1),
            call('pinevent_1420863326.123456_pin3_state0',
                 datetime(2015, 1, 9, 23, 15, 26, 123456),
                 3, 0),
            call('pinevent_1420863332.123456_pin2_state1',
                 datetime(2015, 1, 9, 23, 15, 32, 123456),
                 2, 1),
        ]
        assert mock_unlink.mock_calls == [
            call('/foo/bar/pinevent_1420863326.123456_pin2_state1'),
            call('/foo/bar/pinevent_1420863326.123456_pin3_state0')
        ]
开发者ID:jantman,项目名称:piface-webhooks,代码行数:51,代码来源:test_worker.py


示例15: test_find_usage_apis_stages_now_paginated

    def test_find_usage_apis_stages_now_paginated(self):
        mock_conn = Mock()
        res = result_fixtures.ApiGateway.get_rest_apis
        mock_paginator = Mock()
        mock_paginator.paginate.return_value = res

        def se_res_paginate(restApiId=None):
            return result_fixtures.ApiGateway.get_resources[restApiId]

        mock_res_paginator = Mock()
        mock_res_paginator.paginate.side_effect = se_res_paginate

        def se_get_paginator(api_name):
            if api_name == 'get_rest_apis':
                return mock_paginator
            elif api_name == 'get_resources':
                return mock_res_paginator

        def se_paginate_dict(*args, **kwargs):
            if args[0] == mock_conn.get_documentation_parts:
                return result_fixtures.ApiGateway.doc_parts[kwargs['restApiId']]
            if args[0] == mock_conn.get_authorizers:
                return result_fixtures.ApiGateway.authorizers[
                    kwargs['restApiId']
                ]

        def se_get_stages(restApiId=None):
            r = deepcopy(result_fixtures.ApiGateway.stages[restApiId])
            r['position'] = 'foo'
            return r

        mock_conn.get_paginator.side_effect = se_get_paginator
        mock_conn.get_stages.side_effect = se_get_stages
        cls = _ApigatewayService(21, 43)
        cls.conn = mock_conn
        with patch('%s.paginate_dict' % pbm, autospec=True) as mock_pd:
            with patch('%s.logger' % pbm) as mock_logger:
                mock_pd.side_effect = se_paginate_dict
                cls._find_usage_apis()
        assert mock_logger.mock_calls == [
            call.debug('Finding usage for APIs'),
            call.debug('Found %d APIs', 5),
            call.debug('Finding usage for per-API limits'),
            call.warning(
                'APIGateway get_stages returned more keys than present in '
                'boto3 docs: %s', ['item', 'position']
            )
        ]
开发者ID:jantman,项目名称:awslimitchecker,代码行数:48,代码来源:test_apigateway.py


示例16: test_can_add_mandate

    def test_can_add_mandate(self, logging_mock):
        political_office = PoliticalOfficeFactory.create()
        legislator = LegislatorFactory.create()

        now = datetime.utcnow()
        d1 = now.date()
        date_start = date_to_timestamp(d1)
        d2 = (now + timedelta(days=10)).date()
        date_end = date_to_timestamp(d2)

        data = {
            'legislator_id': legislator.id,
            'political_office_id': political_office.id,
            'date_start': date_start,
            'date_end': date_end,
        }
        mandate = Mandate.add_mandate(self.db, data)

        expect(mandate.legislator).to_equal(legislator)
        expect(mandate.political_office).to_equal(political_office)
        expect(mandate.date_start).to_equal(d1)
        expect(mandate.date_end).to_equal(d2)
        expect(logging_mock.mock_calls).to_include(
            call.debug('Added mandate: "%s"', str(mandate))
        )
开发者ID:marcelometal,项目名称:politicos-tornado,代码行数:25,代码来源:test_mandate.py


示例17: test_boto3_connection_kwargs_sts_again_other_account

    def test_boto3_connection_kwargs_sts_again_other_account(self):
        cls = ConnectableTester(account_id='123', account_role='myrole',
                                region='myregion')
        mock_creds = Mock()
        type(mock_creds).access_key = 'sts_ak'
        type(mock_creds).secret_key = 'sts_sk'
        type(mock_creds).session_token = 'sts_token'
        type(mock_creds).account_id = '456'

        with patch('%s._get_sts_token' % pb) as mock_get_sts:
            with patch('%s.logger' % pbm) as mock_logger:
                with patch('%s.boto3.Session' % pbm) as mock_sess:
                    mock_get_sts.return_value = mock_creds
                    Connectable.credentials = mock_creds
                    res = cls._boto3_connection_kwargs
        assert mock_get_sts.mock_calls == [call()]
        assert mock_logger.mock_calls == [
            call.debug("Previous STS credentials are for account %s; "
                       "getting new credentials for current account "
                       "(%s)", '456', '123')
        ]
        assert mock_sess.mock_calls == []
        assert res == {
            'region_name': 'myregion',
            'aws_access_key_id': 'sts_ak',
            'aws_secret_access_key': 'sts_sk',
            'aws_session_token': 'sts_token'
        }
开发者ID:bflad,项目名称:awslimitchecker,代码行数:28,代码来源:test_connectable.py


示例18: test_set_account_info_env

 def test_set_account_info_env(self):
     self.cls.aws_account_id = None
     self.cls.aws_region = None
     with patch('%s.logger' % pbm, autospec=True) as mock_logger:
         with patch('%s.client' % pbm, autospec=True) as mock_client:
             mock_client.return_value.get_user.return_value = {
                 'User': {'Arn': 'arn:aws:iam::123456789:user/foo'}
             }
             type(mock_client.return_value)._client_config = Mock(
                 region_name='myregion')
             with patch.dict(
                     '%s.os.environ' % pbm,
                     {'AWS_REGION': 'ar'},
                     clear=True):
                 self.cls._set_account_info()
     assert self.cls.aws_account_id == '123456789'
     assert self.cls.aws_region == 'myregion'
     assert mock_client.mock_calls == [
         call('iam', region_name='ar'),
         call().get_user(),
         call('lambda', region_name='ar')
     ]
     assert mock_logger.mock_calls == [
         call.debug('Connecting to IAM with region_name=%s', 'ar'),
         call.info('Found AWS account ID as %s; region: %s',
                   '123456789', 'myregion')
     ]
开发者ID:waffle-iron,项目名称:webhook2lambda2sqs,代码行数:27,代码来源:test_tf_generator.py


示例19: test_boto3_connection_kwargs_profile

    def test_boto3_connection_kwargs_profile(self):
        cls = ConnectableTester(profile_name='myprof')
        m_creds = Mock()
        type(m_creds).access_key = 'ak'
        type(m_creds).secret_key = 'sk'
        type(m_creds).token = 'tkn'
        mock_session = Mock()
        m_sess = Mock()
        m_sess.get_credentials.return_value = m_creds
        type(mock_session)._session = m_sess

        with patch('%s._get_sts_token' % pb) as mock_get_sts:
            with patch('%s.logger' % pbm) as mock_logger:
                with patch('%s.boto3.Session' % pbm) as mock_sess:
                    Connectable.credentials = None
                    mock_sess.return_value = mock_session
                    res = cls._boto3_connection_kwargs
        assert mock_get_sts.mock_calls == []
        assert mock_logger.mock_calls == [
            call.debug('Using credentials profile: %s', 'myprof')
        ]
        assert mock_sess.mock_calls == [call(profile_name='myprof')]
        assert res == {
            'region_name': None,
            'aws_access_key_id': 'ak',
            'aws_secret_access_key': 'sk',
            'aws_session_token': 'tkn'
        }
开发者ID:bflad,项目名称:awslimitchecker,代码行数:28,代码来源:test_connectable.py


示例20: test_parser_timeout

    def test_parser_timeout(self):
        """
        Test to verify fix for https://github.com/andresriancho/w3af/issues/6723
        "w3af running long time more than 24h"
        """
        modc = 'w3af.core.data.parsers.parser_cache.%s'
        modp = 'w3af.core.data.parsers.document_parser.%s'

        with patch(modc % 'om.out') as om_mock,\
             patch(modc % 'ParserCache.PARSER_TIMEOUT', new_callable=PropertyMock) as timeout_mock,\
             patch(modp % 'DocumentParser.PARSERS', new_callable=PropertyMock) as parsers_mock:

            timeout_mock.return_value = 1
            parsers_mock.return_value = [DelayedParser]

            html = '<html>foo!</html>'
            http_resp = _build_http_response(html, u'text/html')

            try:
                self.dpc.get_document_parser_for(http_resp)
            except BaseFrameworkException:
                msg = '[timeout] The parser took more than %s seconds'\
                      ' to complete parsing of "%s", killed it!'

                error = msg % (ParserCache.PARSER_TIMEOUT,
                               http_resp.get_url())

                self.assertIn(call.debug(error), om_mock.mock_calls)
            else:
                self.assertTrue(False)
开发者ID:RON313,项目名称:w3af,代码行数:30,代码来源:test_parser_cache.py



注:本文中的mock.call.debug函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python call.info函数代码示例发布时间:2022-05-27
下一篇:
Python call.add_rule函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap