本文整理汇总了Python中test.test_client.get_client函数的典型用法代码示例。如果您正苦于以下问题:Python get_client函数的具体用法?Python get_client怎么用?Python get_client使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_client函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_auth_network_error
def test_auth_network_error(self):
# Make sure there's no semaphore leak if we get a network error
# when authenticating a new socket with cached credentials.
auth_client = get_client()
auth_context.client.admin.add_user('admin', 'password')
auth_client.admin.authenticate('admin', 'password')
try:
# Get a client with one socket so we detect if it's leaked.
c = get_client(max_pool_size=1, waitQueueTimeoutMS=1)
# Simulate an authenticate() call on a different socket.
credentials = auth._build_credentials_tuple(
'DEFAULT', 'admin',
unicode('admin'), unicode('password'),
{})
c._cache_credentials('test', credentials, connect=False)
# Cause a network error on the actual socket.
pool = get_pool(c)
socket_info = one(pool.sockets)
socket_info.sock.close()
# In __check_auth, the client authenticates its socket with the
# new credential, but gets a socket.error. Should be reraised as
# AutoReconnect.
self.assertRaises(AutoReconnect, c.test.collection.find_one)
# No semaphore leak, the pool is allowed to make a new socket.
c.test.collection.find_one()
finally:
auth_client.admin.remove_user('admin')
开发者ID:arunbaabte,项目名称:DjangoAppService,代码行数:33,代码来源:test_auth.py
示例2: setUp
def setUp(self):
super(TestBulkWriteConcern, self).setUp()
client = get_client()
ismaster = client.test.command('ismaster')
self.is_repl = bool(ismaster.get('setName'))
self.w = len(ismaster.get("hosts", []))
self.secondary = None
if self.w > 1:
for member in ismaster['hosts']:
if member != ismaster['primary']:
host, port = _partition_node(member)
self.secondary = MongoClient(host, port)
break
self.client = client
self.coll = client.pymongo_test.test
self.coll.remove()
# We tested wtimeout errors by specifying a write concern greater than
# the number of members, but in MongoDB 2.7.8+ this causes a different
# sort of error, "Not enough data-bearing nodes". In recent servers we
# use a failpoint to pause replication on a secondary.
self.need_replication_stopped = version.at_least(self.client,
(2, 7, 8))
self.test_commands_enabled = ("enableTestCommands=1"
in get_command_line(self.client)["argv"])
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:28,代码来源:test_bulk.py
示例3: test_authenticate_and_request
def test_authenticate_and_request(self):
if (is_mongos(self.client) and not
version.at_least(self.client, (2, 0, 0))):
raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")
# Database.authenticate() needs to be in a request - check that it
# always runs in a request, and that it restores the request state
# (in or not in a request) properly when it's finished.
self.assertFalse(self.client.auto_start_request)
db = self.client.pymongo_test
db.system.users.remove({})
db.remove_user("mike")
db.add_user("mike", "password")
self.assertFalse(self.client.in_request())
self.assertTrue(db.authenticate("mike", "password"))
self.assertFalse(self.client.in_request())
request_cx = get_client(auto_start_request=True)
request_db = request_cx.pymongo_test
self.assertTrue(request_cx.in_request())
self.assertTrue(request_db.authenticate("mike", "password"))
self.assertTrue(request_cx.in_request())
# just make sure there are no exceptions here
db.logout()
db.collection.find_one()
request_db.logout()
request_db.collection.find_one()
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:28,代码来源:test_database.py
示例4: test_authenticate_and_request
def test_authenticate_and_request(self):
if (is_mongos(self.client) and not
version.at_least(self.client, (2, 0, 0))):
raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")
if not server_started_with_auth(self.client):
raise SkipTest('Authentication is not enabled on server')
# Database.authenticate() needs to be in a request - check that it
# always runs in a request, and that it restores the request state
# (in or not in a request) properly when it's finished.
self.assertFalse(self.client.auto_start_request)
db = self.client.pymongo_test
db.add_user("mike", "password",
roles=["userAdmin", "dbAdmin", "readWrite"])
try:
self.assertFalse(self.client.in_request())
self.assertTrue(db.authenticate("mike", "password"))
self.assertFalse(self.client.in_request())
request_cx = get_client(auto_start_request=True)
request_db = request_cx.pymongo_test
self.assertTrue(request_cx.in_request())
self.assertTrue(request_db.authenticate("mike", "password"))
self.assertTrue(request_cx.in_request())
finally:
db.authenticate("mike", "password")
db.remove_user("mike")
db.logout()
request_db.logout()
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:29,代码来源:test_database.py
示例5: setUp
def setUp(self):
client = get_client()
ismaster = client.test.command('ismaster')
self.is_repl = bool(ismaster.get('setName'))
self.w = len(ismaster.get("hosts", []))
self.coll = client.pymongo_test.test
self.coll.remove()
开发者ID:yangcol,项目名称:mongo-python-driver,代码行数:7,代码来源:test_bulk.py
示例6: test_only_secondary_ok_commands_have_read_prefs
def test_only_secondary_ok_commands_have_read_prefs(self):
c = get_client(read_preference=ReadPreference.SECONDARY)
is_mongos = utils.is_mongos(c)
if not is_mongos:
raise SkipTest("Only mongos have read_prefs added to the spec")
# Ensure secondary_ok_commands have readPreference
for cmd in secondary_ok_commands:
if cmd == 'mapreduce': # map reduce is a special case
continue
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([('$query', command)])
command['$readPreference'] = {'mode': 'secondary'}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce inline should have read prefs
command = SON([('mapreduce', 'test'), ('out', {'inline': 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([('$query', command)])
command['$readPreference'] = {'mode': 'secondary'}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce that outputs to a collection shouldn't have read prefs
command = SON([('mapreduce', 'test'), ('out', {'mrtest': 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
# Other commands shouldn't be changed
for cmd in ('drop', 'create', 'any-future-cmd'):
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
开发者ID:easonlin,项目名称:mongo-python-driver,代码行数:35,代码来源:test_read_preferences.py
示例7: test_uuid_queries
def test_uuid_queries(self):
if not should_test_uuid:
raise SkipTest("No uuid module")
c = get_client()
coll = c.pymongo_test.test
coll.drop()
uu = uuid.uuid4()
# Wrap uu.bytes in binary_type to work
# around http://bugs.python.org/issue7380.
coll.insert({'uuid': Binary(binary_type(uu.bytes), 3)})
self.assertEqual(1, coll.count())
# Test UUIDLegacy queries.
coll.uuid_subtype = 4
self.assertEqual(0, coll.find({'uuid': uu}).count())
cur = coll.find({'uuid': UUIDLegacy(uu)})
self.assertEqual(1, cur.count())
retrieved = cur.next()
self.assertEqual(uu, retrieved['uuid'])
# Test regular UUID queries (using subtype 4).
coll.insert({'uuid': uu})
self.assertEqual(2, coll.count())
cur = coll.find({'uuid': uu})
self.assertEqual(1, cur.count())
retrieved = cur.next()
self.assertEqual(uu, retrieved['uuid'])
# Test both.
cur = coll.find({'uuid': {'$in': [uu, UUIDLegacy(uu)]}})
self.assertEqual(2, cur.count())
coll.drop()
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:34,代码来源:test_binary.py
示例8: test_server_disconnect
def test_server_disconnect(self):
# PYTHON-345, we need to make sure that threads' request sockets are
# closed by disconnect().
#
# 1. Create a client with auto_start_request=True
# 2. Start N threads and do a find() in each to get a request socket
# 3. Pause all threads
# 4. In the main thread close all sockets, including threads' request
# sockets
# 5. In main thread, do a find(), which raises AutoReconnect and resets
# pool
# 6. Resume all threads, do a find() in them
#
# If we've fixed PYTHON-345, then only one AutoReconnect is raised,
# and all the threads get new request sockets.
cx = get_client(auto_start_request=True)
collection = cx.db.pymongo_test
# acquire a request socket for the main thread
collection.find_one()
pool = get_pool(collection.database.connection)
socket_info = pool._get_request_state()
assert isinstance(socket_info, SocketInfo)
request_sock = socket_info.sock
state = FindPauseFind.create_shared_state(nthreads=40)
threads = [FindPauseFind(collection, state) for _ in range(state.nthreads)]
# Each thread does a find(), thus acquiring a request socket
for t in threads:
t.start()
# Wait for the threads to reach the rendezvous
FindPauseFind.wait_for_rendezvous(state)
try:
# Simulate an event that closes all sockets, e.g. primary stepdown
for t in threads:
t.request_sock.close()
# Finally, ensure the main thread's socket's last_checkout is
# updated:
collection.find_one()
# ... and close it:
request_sock.close()
# Doing an operation on the client raises an AutoReconnect and
# resets the pool behind the scenes
self.assertRaises(AutoReconnect, collection.find_one)
finally:
# Let threads do a second find()
FindPauseFind.resume_after_rendezvous(state)
joinall(threads)
for t in threads:
self.assertTrue(t.passed, "%s threw exception" % t)
开发者ID:helenseo,项目名称:mongo-python-driver,代码行数:60,代码来源:test_threads.py
示例9: _get_client
def _get_client(self):
"""
Intended for overriding in TestThreadsAuthReplicaSet. This method
returns a MongoClient here, and a MongoReplicaSetClient in
test_threads_replica_set_connection.py.
"""
# Regular test client
return get_client()
开发者ID:Parastou,项目名称:mongo-python-driver,代码行数:8,代码来源:test_threads.py
示例10: setUp
def setUp(self):
self.db = get_client().pymongo_test
self.db.drop_collection("fs.files")
self.db.drop_collection("fs.chunks")
self.db.drop_collection("alt.files")
self.db.drop_collection("alt.chunks")
self.fs = gridfs.GridFS(self.db)
self.alt = gridfs.GridFS(self.db, "alt")
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:8,代码来源:test_gridfs.py
示例11: setUp
def setUp(self):
super(TestBulkWriteConcern, self).setUp()
client = get_client()
ismaster = client.test.command('ismaster')
self.is_repl = bool(ismaster.get('setName'))
self.w = len(ismaster.get("hosts", []))
self.coll = client.pymongo_test.test
self.coll.remove()
开发者ID:1060460048,项目名称:mongo-python-driver,代码行数:8,代码来源:test_bulk.py
示例12: test_authenticate_multiple
def test_authenticate_multiple(self):
client = get_client()
if is_mongos(client) and not version.at_least(self.client, (2, 2, 0)):
raise SkipTest("Need mongos >= 2.2.0")
if not server_started_with_auth(client):
raise SkipTest("Authentication is not enabled on server")
# Setup
users_db = client.pymongo_test
admin_db = client.admin
other_db = client.pymongo_test1
users_db.test.remove()
other_db.test.remove()
admin_db.add_user("admin", "pass", roles=["userAdminAnyDatabase", "dbAdmin", "clusterAdmin", "readWrite"])
try:
self.assertTrue(admin_db.authenticate("admin", "pass"))
if version.at_least(self.client, (2, 5, 3, -1)):
admin_db.add_user("ro-admin", "pass", roles=["userAdmin", "readAnyDatabase"])
else:
admin_db.add_user("ro-admin", "pass", read_only=True)
users_db.add_user("user", "pass", roles=["userAdmin", "readWrite"])
admin_db.logout()
self.assertRaises(OperationFailure, users_db.test.find_one)
# Regular user should be able to query its own db, but
# no other.
users_db.authenticate("user", "pass")
self.assertEqual(0, users_db.test.count())
self.assertRaises(OperationFailure, other_db.test.find_one)
# Admin read-only user should be able to query any db,
# but not write.
admin_db.authenticate("ro-admin", "pass")
self.assertEqual(0, other_db.test.count())
self.assertRaises(OperationFailure, other_db.test.insert, {})
# Force close all sockets
client.disconnect()
# We should still be able to write to the regular user's db
self.assertTrue(users_db.test.remove())
# And read from other dbs...
self.assertEqual(0, other_db.test.count())
# But still not write to other dbs...
self.assertRaises(OperationFailure, other_db.test.insert, {})
# Cleanup
finally:
admin_db.logout()
users_db.logout()
admin_db.authenticate("admin", "pass")
remove_all_users(users_db)
remove_all_users(admin_db)
开发者ID:Tokutek,项目名称:mongo-python-driver,代码行数:57,代码来源:test_database.py
示例13: test_authenticate_multiple
def test_authenticate_multiple(self):
client = get_client()
if (is_mongos(client) and not
version.at_least(self.client, (2, 0, 0))):
raise SkipTest("Auth with sharding requires MongoDB >= 2.0.0")
if not server_started_with_auth(client):
raise SkipTest("Authentication is not enabled on server")
# Setup
users_db = client.pymongo_test
admin_db = client.admin
other_db = client.pymongo_test1
users_db.system.users.remove()
admin_db.system.users.remove()
users_db.test.remove()
other_db.test.remove()
admin_db.add_user('admin', 'pass')
self.assertTrue(admin_db.authenticate('admin', 'pass'))
admin_db.add_user('ro-admin', 'pass', read_only=True)
users_db.add_user('user', 'pass')
admin_db.logout()
self.assertRaises(OperationFailure, users_db.test.find_one)
# Regular user should be able to query its own db, but
# no other.
users_db.authenticate('user', 'pass')
self.assertEqual(0, users_db.test.count())
self.assertRaises(OperationFailure, other_db.test.find_one)
# Admin read-only user should be able to query any db,
# but not write.
admin_db.authenticate('ro-admin', 'pass')
self.assertEqual(0, other_db.test.count())
self.assertRaises(OperationFailure,
other_db.test.insert, {})
# Force close all sockets
client.disconnect()
# We should still be able to write to the regular user's db
self.assertTrue(users_db.test.remove())
# And read from other dbs...
self.assertEqual(0, other_db.test.count())
# But still not write to other dbs...
self.assertRaises(OperationFailure,
other_db.test.insert, {})
# Cleanup
admin_db.logout()
users_db.logout()
self.assertTrue(admin_db.authenticate('admin', 'pass'))
self.assertTrue(admin_db.system.users.remove())
self.assertEqual(0, admin_db.system.users.count())
self.assertTrue(users_db.system.users.remove())
开发者ID:Khefren,项目名称:mongo-python-driver,代码行数:57,代码来源:test_database.py
示例14: teardown
def teardown():
c = get_client()
c.drop_database("pymongo-pooling-tests")
c.drop_database("pymongo_test")
c.drop_database("pymongo_test1")
c.drop_database("pymongo_test2")
c.drop_database("pymongo_test_mike")
c.drop_database("pymongo_test_bernie")
开发者ID:DaBbleR23,项目名称:mongo-python-driver,代码行数:9,代码来源:__init__.py
示例15: test_file_exists
def test_file_exists(self):
db = get_client(w=1).pymongo_test
fs = gridfs.GridFS(db)
oid = fs.put(b("hello"))
self.assertRaises(FileExists, fs.put, b("world"), _id=oid)
one = fs.new_file(_id=123)
one.write(b("some content"))
one.close()
two = fs.new_file(_id=123)
self.assertRaises(FileExists, two.write, b('x' * 262146))
开发者ID:ArturFis,项目名称:mongo-python-driver,代码行数:13,代码来源:test_gridfs.py
示例16: test_only_secondary_ok_commands_have_read_prefs
def test_only_secondary_ok_commands_have_read_prefs(self):
c = get_client(read_preference=ReadPreference.SECONDARY)
ctx = catch_warnings()
try:
warnings.simplefilter("ignore", UserWarning)
is_mongos = utils.is_mongos(c)
finally:
ctx.exit()
if not is_mongos:
raise SkipTest("Only mongos have read_prefs added to the spec")
# Ensure secondary_ok_commands have readPreference
for cmd in secondary_ok_commands:
if cmd == "mapreduce": # map reduce is a special case
continue
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([("$query", command)])
command["$readPreference"] = {"mode": "secondary"}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce inline should have read prefs
command = SON([("mapreduce", "test"), ("out", {"inline": 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
# White-listed commands also have to be wrapped in $query
command = SON([("$query", command)])
command["$readPreference"] = {"mode": "secondary"}
self.assertEqual(command, cursor._Cursor__query_spec())
# map_reduce that outputs to a collection shouldn't have read prefs
command = SON([("mapreduce", "test"), ("out", {"mrtest": 1})])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
# Other commands shouldn't be changed
for cmd in ("drop", "create", "any-future-cmd"):
command = SON([(cmd, 1)])
cursor = c.pymongo_test["$cmd"].find(command.copy())
self.assertEqual(command, cursor._Cursor__query_spec())
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:40,代码来源:test_read_preferences.py
示例17: test_authenticate_and_request
def test_authenticate_and_request(self):
# Database.authenticate() needs to be in a request - check that it
# always runs in a request, and that it restores the request state
# (in or not in a request) properly when it's finished.
self.assertFalse(self.client.auto_start_request)
db = self.client.pymongo_test
auth_context.client.pymongo_test.add_user(
"mike", "password",
roles=["userAdmin", "dbAdmin", "readWrite"])
try:
self.assertFalse(self.client.in_request())
self.assertTrue(db.authenticate("mike", "password"))
self.assertFalse(self.client.in_request())
request_cx = get_client(auto_start_request=True)
request_db = request_cx.pymongo_test
self.assertTrue(request_db.authenticate("mike", "password"))
self.assertTrue(request_cx.in_request())
finally:
db.authenticate("mike", "password")
db.remove_user("mike")
db.logout()
request_db.logout()
开发者ID:arunbaabte,项目名称:DjangoAppService,代码行数:23,代码来源:test_auth.py
示例18: get_client
def get_client(self, *args, **kwargs):
opts = kwargs.copy()
opts['use_greenlets'] = self.use_greenlets
return get_client(*args, **opts)
开发者ID:xowenx,项目名称:mongo-python-driver,代码行数:4,代码来源:test_pooling_base.py
示例19: _test_pool
def _test_pool(self, use_request):
"""
Test that the connection pool prevents both threads and greenlets from
using a socket at the same time.
Sequence:
gr0: start a slow find()
gr1: start a fast find()
gr1: get results
gr0: get results
"""
cx = get_client(
use_greenlets=self.use_greenlets,
auto_start_request=False
)
db = cx.pymongo_test
db.test.remove()
db.test.insert({'_id': 1})
history = []
def find_fast():
if use_request:
cx.start_request()
history.append('find_fast start')
# With greenlets and the old connection._Pool, this would throw
# AssertionError: "This event is already used by another
# greenlet"
self.assertEqual({'_id': 1}, db.test.find_one())
history.append('find_fast done')
if use_request:
cx.end_request()
def find_slow():
if use_request:
cx.start_request()
history.append('find_slow start')
# Javascript function that pauses N seconds per document
fn = delay(10)
if (is_mongos(db.connection) or not
version.at_least(db.connection, (1, 7, 2))):
# mongos doesn't support eval so we have to use $where
# which is less reliable in this context.
self.assertEqual(1, db.test.find({"$where": fn}).count())
else:
# 'nolock' allows find_fast to start and finish while we're
# waiting for this to complete.
self.assertEqual({'ok': 1.0, 'retval': True},
db.command('eval', fn, nolock=True))
history.append('find_slow done')
if use_request:
cx.end_request()
if self.use_greenlets:
gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
gr0.start()
gr1.start_later(.1)
else:
gr0 = threading.Thread(target=find_slow)
gr0.setDaemon(True)
gr1 = threading.Thread(target=find_fast)
gr1.setDaemon(True)
gr0.start()
time.sleep(.1)
gr1.start()
gr0.join()
gr1.join()
self.assertEqual([
'find_slow start',
'find_fast start',
'find_fast done',
'find_slow done',
], history)
开发者ID:xowenx,项目名称:mongo-python-driver,代码行数:84,代码来源:test_pooling_base.py
示例20: test_mongos_connection
def test_mongos_connection(self):
c = get_client()
is_mongos = utils.is_mongos(c)
# Test default mode, PRIMARY
cursor = c.pymongo_test.test.find()
if is_mongos:
# We only set $readPreference if it's something other than
# PRIMARY to avoid problems with mongos versions that don't
# support read preferences.
self.assertEqual(
None,
cursor._Cursor__query_spec().get('$readPreference')
)
else:
self.assertFalse(
'$readPreference' in cursor._Cursor__query_spec())
# Copy these constants for brevity
PRIMARY_PREFERRED = ReadPreference.PRIMARY_PREFERRED
SECONDARY = ReadPreference.SECONDARY
SECONDARY_PREFERRED = ReadPreference.SECONDARY_PREFERRED
NEAREST = ReadPreference.NEAREST
SLAVE_OKAY = _QUERY_OPTIONS['slave_okay']
# Test non-PRIMARY modes which can be combined with tags
for kwarg, value, mongos_mode in (
('read_preference', PRIMARY_PREFERRED, 'primaryPreferred'),
('read_preference', SECONDARY, 'secondary'),
('read_preference', SECONDARY_PREFERRED, 'secondaryPreferred'),
('read_preference', NEAREST, 'nearest'),
('slave_okay', True, 'secondaryPreferred'),
('slave_okay', False, 'primary')
):
for tag_sets in (
None, [{}]
):
# Create a client e.g. with read_preference=NEAREST or
# slave_okay=True
c = get_client(tag_sets=tag_sets, **{kwarg: value})
self.assertEqual(is_mongos, c.is_mongos)
cursor = c.pymongo_test.test.find()
if is_mongos:
# We don't set $readPreference for SECONDARY_PREFERRED
# unless tags are in use. slaveOkay has the same effect.
if mongos_mode == 'secondaryPreferred':
self.assertEqual(
None,
cursor._Cursor__query_spec().get('$readPreference'))
self.assertTrue(
cursor._Cursor__query_options() & SLAVE_OKAY)
# Don't send $readPreference for PRIMARY either
elif mongos_mode == 'primary':
self.assertEqual(
None,
cursor._Cursor__query_spec().get('$readPreference'))
self.assertFalse(
cursor._Cursor__query_options() & SLAVE_OKAY)
else:
self.assertEqual(
{'mode': mongos_mode},
cursor._Cursor__query_spec().get('$readPreference'))
self.assertTrue(
cursor._Cursor__query_options() & SLAVE_OKAY)
else:
self.assertFalse(
'$readPreference' in cursor._Cursor__query_spec())
for tag_sets in (
[{'dc': 'la'}],
[{'dc': 'la'}, {'dc': 'sf'}],
[{'dc': 'la'}, {'dc': 'sf'}, {}],
):
if kwarg == 'slave_okay':
# Can't use tags with slave_okay True or False, need a
# real read preference
self.assertRaises(
ConfigurationError,
get_client, tag_sets=tag_sets, **{kwarg: value})
continue
c = get_client(tag_sets=tag_sets, **{kwarg: value})
self.assertEqual(is_mongos, c.is_mongos)
cursor = c.pymongo_test.test.find()
if is_mongos:
self.assertEqual(
{'mode': mongos_mode, 'tags': tag_sets},
cursor._Cursor__query_spec().get('$readPreference'))
else:
self.assertFalse(
'$readPreference' in cursor._Cursor__query_spec())
开发者ID:easonlin,项目名称:mongo-python-driver,代码行数:98,代码来源:test_read_preferences.py
注:本文中的test.test_client.get_client函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论