本文整理汇总了Python中unit_tests._testing._Monkey函数的典型用法代码示例。如果您正苦于以下问题:Python _Monkey函数的具体用法?Python _Monkey怎么用?Python _Monkey使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了_Monkey函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_ctor_w_implicit_inputs
def test_ctor_w_implicit_inputs(self):
from unit_tests._testing import _Monkey
from google.cloud.datastore import client as _MUT
from google.cloud import client as _base_client
OTHER = 'other'
creds = object()
default_called = []
def fallback_mock(project):
default_called.append(project)
return project or OTHER
klass = self._getTargetClass()
with _Monkey(_MUT,
_determine_default_project=fallback_mock):
with _Monkey(_base_client,
get_credentials=lambda: creds):
client = klass()
self.assertEqual(client.project, OTHER)
self.assertEqual(client.namespace, None)
self.assertTrue(isinstance(client.connection, _MockConnection))
self.assertTrue(client.connection.credentials is creds)
self.assertTrue(client.connection.http is None)
self.assertTrue(client.current_batch is None)
self.assertTrue(client.current_transaction is None)
self.assertEqual(default_called, [None])
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:27,代码来源:test_client.py
示例2: test_gae
def test_gae(self):
from google.cloud import _helpers as MUT
from unit_tests._testing import _Monkey
with _Monkey(os, name='not-nt'):
with _Monkey(MUT, _USER_ROOT=None):
result = self._callFUT()
self.assertIsNone(result)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:9,代码来源:test__helpers.py
示例3: test_it
def test_it(self):
from google.cloud import _helpers as MUT
from unit_tests._testing import _Monkey
appdata_dir = 'a'
environ = {'APPDATA': appdata_dir}
config_file = 'b'
with _Monkey(os, getenv=environ.get):
with _Monkey(MUT, _GCLOUD_CONFIG_FILE=config_file):
result = self._callFUT()
expected = os.path.join(appdata_dir, config_file)
self.assertEqual(result, expected)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:13,代码来源:test__helpers.py
示例4: test_without_emulator
def test_without_emulator(self):
from unit_tests._testing import _Monkey
from google.cloud.bigtable import client as MUT
credentials = _Credentials()
user_agent = 'you-sir-age-int'
client = _Client(credentials, user_agent)
fake_stub = object()
make_secure_stub_args = []
def mock_make_secure_stub(*args):
make_secure_stub_args.append(args)
return fake_stub
with _Monkey(MUT, make_secure_stub=mock_make_secure_stub):
result = self._callFUT(client)
self.assertIs(result, fake_stub)
self.assertEqual(make_secure_stub_args, [
(
client.credentials,
client.user_agent,
MUT.operations_grpc_pb2.OperationsStub,
MUT.OPERATIONS_API_HOST,
),
])
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:27,代码来源:test_client.py
示例5: test_w_exceptions_lt_max_retries
def test_w_exceptions_lt_max_retries(self):
from google.cloud.streaming.exceptions import RetryAfterError
from google.cloud.streaming import http_wrapper as MUT
from unit_tests._testing import _Monkey
HTTP, RESPONSE = object(), object()
REQUEST = _Request()
_created, _checked = [], []
_counter = [None] * 4
def _wo_exception(*args, **kw):
_created.append((args, kw))
if _counter:
_counter.pop()
raise RetryAfterError(RESPONSE, '', REQUEST.url, 0.1)
return RESPONSE
with _Monkey(MUT, _make_api_request_no_retry=_wo_exception,
_check_response=_checked.append):
response = self._callFUT(HTTP, REQUEST, retries=5)
self.assertTrue(response is RESPONSE)
self.assertEqual(len(_created), 5)
expected_kw = {'redirections': MUT._REDIRECTIONS}
for attempt in _created:
self.assertEqual(attempt, ((HTTP, REQUEST), expected_kw))
self.assertEqual(_checked, []) # not called by '_wo_exception'
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:27,代码来源:test_http_wrapper.py
示例6: test_w_loggable_body_w_http
def test_w_loggable_body_w_http(self):
from unit_tests._testing import _Monkey
from google.cloud.streaming import http_wrapper as MUT
class _Connection(object):
debuglevel = 0
def set_debuglevel(self, value):
self.debuglevel = value
request = _Request(loggable_body=object())
LEVEL = 1
_httplib2 = _Dummy(debuglevel=0)
update_me = _Connection()
skip_me = _Connection()
connections = {'update:me': update_me, 'skip_me': skip_me}
_http = _Dummy(connections=connections)
with _Monkey(MUT, httplib2=_httplib2):
with self._makeOne(request, LEVEL, _http):
self.assertEqual(_httplib2.debuglevel, LEVEL)
self.assertEqual(update_me.debuglevel, LEVEL)
self.assertEqual(skip_me.debuglevel, 0)
self.assertEqual(_httplib2.debuglevel, 0)
self.assertEqual(update_me.debuglevel, 0)
self.assertEqual(skip_me.debuglevel, 0)
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:25,代码来源:test_http_wrapper.py
示例7: test_from_pb_w_metadata_and_kwargs
def test_from_pb_w_metadata_and_kwargs(self):
from google.longrunning import operations_pb2
from google.protobuf.any_pb2 import Any
from google.protobuf.struct_pb2 import Struct, Value
from google.cloud import operation as MUT
from unit_tests._testing import _Monkey
TYPE_URI = 'type.googleapis.com/%s' % (Struct.DESCRIPTOR.full_name,)
type_url_map = {TYPE_URI: Struct}
client = _Client()
meta = Struct(fields={'foo': Value(string_value=u'Bar')})
metadata_pb = Any(type_url=TYPE_URI, value=meta.SerializeToString())
operation_pb = operations_pb2.Operation(
name=self.OPERATION_NAME, metadata=metadata_pb)
klass = self._getTargetClass()
with _Monkey(MUT, _TYPE_URL_MAP=type_url_map):
operation = klass.from_pb(operation_pb, client, baz='qux')
self.assertEqual(operation.name, self.OPERATION_NAME)
self.assertTrue(operation.client is client)
pb_metadata = operation.pb_metadata
self.assertTrue(isinstance(pb_metadata, Struct))
self.assertEqual(list(pb_metadata.fields), ['foo'])
self.assertEqual(pb_metadata.fields['foo'].string_value, 'Bar')
self.assertEqual(operation.metadata, {'baz': 'qux'})
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:26,代码来源:test_operation.py
示例8: test_ctor_defaults
def test_ctor_defaults(self):
from unit_tests._testing import _Monkey
from google.cloud import client
PROJECT = 'PROJECT'
CREDENTIALS = object()
FUNC_CALLS = []
def mock_determine_proj(project):
FUNC_CALLS.append((project, '_determine_default_project'))
return PROJECT
def mock_get_credentials():
FUNC_CALLS.append('get_credentials')
return CREDENTIALS
with _Monkey(client, get_credentials=mock_get_credentials,
_determine_default_project=mock_determine_proj):
client_obj = self._makeOne()
self.assertEqual(client_obj.project, PROJECT)
self.assertTrue(isinstance(client_obj.connection, _MockConnection))
self.assertTrue(client_obj.connection.credentials is CREDENTIALS)
self.assertEqual(
FUNC_CALLS,
[(None, '_determine_default_project'), 'get_credentials'])
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:26,代码来源:test_client.py
示例9: test_logging_api_w_gax
def test_logging_api_w_gax(self):
from google.cloud.logging import client as MUT
from unit_tests._testing import _Monkey
wrapped = object()
_called_with = []
def _generated_api(*args, **kw):
_called_with.append((args, kw))
return wrapped
class _GaxLoggingAPI(object):
def __init__(self, _wrapped):
self._wrapped = _wrapped
creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)
with _Monkey(MUT,
_USE_GAX=True,
GeneratedLoggingAPI=_generated_api,
GAXLoggingAPI=_GaxLoggingAPI):
api = client.logging_api
self.assertIsInstance(api, _GaxLoggingAPI)
self.assertTrue(api._wrapped is wrapped)
# API instance is cached
again = client.logging_api
self.assertTrue(again is api)
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:30,代码来源:test_client.py
示例10: test_subscriber_api_w_gax
def test_subscriber_api_w_gax(self):
from google.cloud.pubsub import client as MUT
from unit_tests._testing import _Monkey
wrapped = object()
_called_with = []
def _generated_api(*args, **kw):
_called_with.append((args, kw))
return wrapped
class _GaxSubscriberAPI(object):
def __init__(self, _wrapped):
self._wrapped = _wrapped
creds = _Credentials()
client = self._makeOne(project=self.PROJECT, credentials=creds)
with _Monkey(MUT,
_USE_GAX=True,
make_gax_subscriber_api=_generated_api,
GAXSubscriberAPI=_GaxSubscriberAPI):
api = client.subscriber_api
self.assertIsInstance(api, _GaxSubscriberAPI)
self.assertTrue(api._wrapped is wrapped)
# API instance is cached
again = client.subscriber_api
self.assertTrue(again is api)
args = (client.connection,)
self.assertEqual(_called_with, [(args, {})])
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:32,代码来源:test_client.py
示例11: test_it_with_stubs
def test_it_with_stubs(self):
from unit_tests._testing import _Monkey
from google.cloud.storage import _helpers as MUT
class _Buffer(object):
def __init__(self, return_vals):
self.return_vals = return_vals
self._block_sizes = []
def read(self, block_size):
self._block_sizes.append(block_size)
return self.return_vals.pop()
BASE64 = _Base64()
DIGEST_VAL = object()
BYTES_TO_SIGN = b'BYTES_TO_SIGN'
BUFFER = _Buffer([b'', BYTES_TO_SIGN])
MD5 = _MD5(DIGEST_VAL)
with _Monkey(MUT, base64=BASE64, md5=MD5):
SIGNED_CONTENT = self._callFUT(BUFFER)
self.assertEqual(BUFFER._block_sizes, [8192, 8192])
self.assertTrue(SIGNED_CONTENT is DIGEST_VAL)
self.assertEqual(BASE64._called_b64encode, [DIGEST_VAL])
self.assertEqual(MD5._called, [None])
self.assertEqual(MD5.hash_obj.num_digest_calls, 1)
self.assertEqual(MD5.hash_obj._blocks, [BYTES_TO_SIGN])
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:29,代码来源:test__helpers.py
示例12: test_no_value
def test_no_value(self):
from unit_tests._testing import _Monkey
environ = {}
with _Monkey(os, getenv=environ.get):
project = self._callFUT()
self.assertEqual(project, None)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:7,代码来源:test__helpers.py
示例13: test_emulator
def test_emulator(self):
from unit_tests._testing import _Monkey
from google.cloud.pubsub import _gax as MUT
channels = []
mock_result = object()
insecure_args = []
mock_channel = object()
def mock_subscriber_api(channel):
channels.append(channel)
return mock_result
def mock_insecure_channel(host):
insecure_args.append(host)
return mock_channel
host = 'CURR_HOST:1234'
connection = _Connection(in_emulator=True, host=host)
with _Monkey(MUT, SubscriberApi=mock_subscriber_api,
insecure_channel=mock_insecure_channel):
result = self._callFUT(connection)
self.assertIs(result, mock_result)
self.assertEqual(channels, [mock_channel])
self.assertEqual(insecure_args, [host])
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:26,代码来源:test__gax.py
示例14: test_get_multi_max_loops
def test_get_multi_max_loops(self):
from unit_tests._testing import _Monkey
from google.cloud.datastore import client as _MUT
from google.cloud.datastore.key import Key
KIND = 'Kind'
ID = 1234
# Make a found entity pb to be returned from mock backend.
entity_pb = _make_entity_pb(self.PROJECT, KIND, ID, 'foo', 'Foo')
# Make a connection to return the entity pb.
creds = object()
client = self._makeOne(credentials=creds)
client.connection._add_lookup_result([entity_pb])
key = Key(KIND, ID, project=self.PROJECT)
deferred = []
missing = []
with _Monkey(_MUT, _MAX_LOOPS=-1):
result = client.get_multi([key], missing=missing,
deferred=deferred)
# Make sure we have no results, even though the connection has been
# set up as in `test_hit` to return a single result.
self.assertEqual(result, [])
self.assertEqual(missing, [])
self.assertEqual(deferred, [])
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:28,代码来源:test_client.py
示例15: _helper
def _helper(self, target, host, port=None):
from unit_tests._testing import _Monkey
from google.cloud import _helpers as MUT
mock_result = object()
stub_inputs = []
CHANNEL = object()
class _GRPCModule(object):
def insecure_channel(self, *args):
self.insecure_channel_args = args
return CHANNEL
grpc_mod = _GRPCModule()
def mock_stub_class(channel):
stub_inputs.append(channel)
return mock_result
with _Monkey(MUT, grpc=grpc_mod):
result = self._callFUT(mock_stub_class, host, port=port)
self.assertTrue(result is mock_result)
self.assertEqual(stub_inputs, [CHANNEL])
self.assertEqual(grpc_mod.insecure_channel_args, (target,))
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:26,代码来源:test__helpers.py
示例16: test_no_environment_variable_set
def test_no_environment_variable_set(self):
from unit_tests._testing import _Monkey
environ = {}
with _Monkey(os, getenv=environ.get):
result = self._callFUT()
self.assertIsNone(result)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:8,代码来源:test__helpers.py
示例17: test_ctor_w_project_no_environ
def test_ctor_w_project_no_environ(self):
from unit_tests._testing import _Monkey
from google.cloud.datastore import client as _MUT
# Some environments (e.g. AppVeyor CI) run in GCE, so
# this test would fail artificially.
with _Monkey(_MUT, _base_default_project=lambda project: None):
self.assertRaises(EnvironmentError, self._makeOne, None)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:8,代码来源:test_client.py
示例18: test_value_set
def test_value_set(self):
from unit_tests._testing import _Monkey
from google.cloud._helpers import PROJECT
MOCK_PROJECT = object()
environ = {PROJECT: MOCK_PROJECT}
with _Monkey(os, getenv=environ.get):
project = self._callFUT()
self.assertEqual(project, MOCK_PROJECT)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:9,代码来源:test__helpers.py
示例19: test_it
def test_it(self):
from unit_tests._testing import _Monkey
from google.cloud import credentials as MUT
client = _Client()
with _Monkey(MUT, client=client):
found = self._callFUT()
self.assertTrue(isinstance(found, _Credentials))
self.assertTrue(found is client._signed)
self.assertTrue(client._get_app_default_called)
开发者ID:kwoodson,项目名称:gcloud-python,代码行数:10,代码来源:test_credentials.py
示例20: test_value_set
def test_value_set(self):
import os
from unit_tests._testing import _Monkey
from google.cloud.datastore.client import GCD_DATASET
MOCK_PROJECT = object()
environ = {GCD_DATASET: MOCK_PROJECT}
with _Monkey(os, getenv=environ.get):
project = self._callFUT()
self.assertEqual(project, MOCK_PROJECT)
开发者ID:tmatsuo,项目名称:gcloud-python,代码行数:10,代码来源:test_client.py
注:本文中的unit_tests._testing._Monkey函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论