本文整理汇总了Python中test_connection.get_connection函数的典型用法代码示例。如果您正苦于以下问题:Python get_connection函数的具体用法?Python get_connection怎么用?Python get_connection使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_connection函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_auto_auth_login
def test_auto_auth_login(self):
conn = get_connection()
self.assertRaises(OperationFailure, conn.auth_test.test.find_one)
# Admin auth
conn = get_connection()
conn.admin.authenticate("admin-user", "password")
threads = []
for _ in xrange(10):
t = AutoAuthenticateThreads(conn.auth_test.test, 100)
t.start()
threads.append(t)
for t in threads:
t.join()
self.assertTrue(t.success)
# Database-specific auth
conn = get_connection()
conn.auth_test.authenticate("test-user", "password")
threads = []
for _ in xrange(10):
t = AutoAuthenticateThreads(conn.auth_test.test, 100)
t.start()
threads.append(t)
for t in threads:
t.join()
self.assertTrue(t.success)
开发者ID:Ivideon,项目名称:mongo-python-driver,代码行数:29,代码来源:test_threads.py
示例2: test_multiple_connections
def test_multiple_connections(self):
a = get_connection(auto_start_request=False)
b = get_connection(auto_start_request=False)
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(1, len(b._Connection__pool.sockets))
a.start_request()
a.test.test.find_one()
self.assertEqual(0, len(a._Connection__pool.sockets))
a.end_request()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(1, len(b._Connection__pool.sockets))
a_sock = one(a._Connection__pool.sockets)
b.end_request()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(1, len(b._Connection__pool.sockets))
b.start_request()
b.test.test.find_one()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(0, len(b._Connection__pool.sockets))
b.end_request()
b_sock = one(b._Connection__pool.sockets)
b.test.test.find_one()
a.test.test.find_one()
self.assertEqual(b_sock,
b._Connection__pool.get_socket((b.host, b.port)))
self.assertEqual(a_sock,
a._Connection__pool.get_socket((a.host, a.port)))
开发者ID:dampier,项目名称:mongo-python-driver,代码行数:32,代码来源:test_pooling.py
示例3: test_multiple_connections
def test_multiple_connections(self):
a = get_connection()
b = get_connection()
self.assertEqual(0, len(a._Connection__pool.sockets))
self.assertEqual(0, len(b._Connection__pool.sockets))
a.test.test.find_one()
a.end_request()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(0, len(b._Connection__pool.sockets))
a_sock = a._Connection__pool.sockets[0]
b.end_request()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(0, len(b._Connection__pool.sockets))
b.test.test.find_one()
self.assertEqual(1, len(a._Connection__pool.sockets))
self.assertEqual(0, len(b._Connection__pool.sockets))
b.end_request()
b_sock = b._Connection__pool.sockets[0]
b.test.test.find_one()
a.test.test.find_one()
self.assertEqual(b_sock, b._Connection__pool.socket())
self.assertEqual(a_sock, a._Connection__pool.socket())
开发者ID:drewp,项目名称:mongo-python-driver,代码行数:26,代码来源:test_pooling.py
示例4: setUp
def setUp(self):
self.db = get_connection().pymongo_test
self.db.drop_collection("fs.files")
self.db.drop_collection("fs.chunks")
self.db.drop_collection("pymongo_test.files")
self.db.drop_collection("pymongo_test.chunks")
self.fs = gridfs.GridFS(self.db)
开发者ID:JMassapina,项目名称:mongo-python-driver,代码行数:7,代码来源:test_gridfs.py
示例5: test_pool_with_greenlets
def test_pool_with_greenlets(self):
try:
from greenlet import greenlet
except ImportError:
raise SkipTest()
c = get_connection()
c.test.test.find_one()
c.end_request()
self.assertEqual(1, len(c._Connection__pool.sockets))
a_sock = c._Connection__pool.sockets[0]
def loop(name, pipe):
c.test.test.find_one()
self.assertEqual(0, len(c._Connection__pool.sockets))
greenlet.getcurrent().parent.switch()
c.end_request()
pipe.append(c._Connection__pool.sockets[-1])
ga1 = []
ga2 = []
g1 = greenlet(loop)
g2 = greenlet(loop)
g1.switch('g1', ga1)
g2.switch('g2', ga2)
g1.switch()
g2.switch()
b_sock = ga1[0]
c_sock = ga2[0]
self.assert_(a_sock is b_sock)
self.assert_(a_sock is not c_sock)
self.assert_(b_sock is not c_sock)
开发者ID:flanked,项目名称:mongo-python-driver,代码行数:35,代码来源:test_pooling.py
示例6: setUp
def setUp(self):
self.c = get_connection(auto_start_request=False)
# reset the db
self.c.drop_database(DB)
self.c[DB].unique.insert({"_id": "mike"})
self.c[DB].test.insert([{} for i in range(1000)])
开发者ID:shvechikov,项目名称:mongo-python-driver,代码行数:7,代码来源:test_pooling.py
示例7: test_uuid_queries
def test_uuid_queries(self):
if not should_test_uuid:
raise SkipTest()
c = get_connection()
coll = c.pymongo_test.test
coll.drop()
uu = uuid.uuid4()
coll.insert({'uuid': Binary(uu.bytes, 3)})
self.assertEqual(1, coll.count())
# Test UUIDLegacy queries.
coll.uuid_subtype = 4
self.assertEqual(0, coll.find({'uuid': uu}).count())
cur = coll.find({'uuid': UUIDLegacy(uu)})
self.assertEqual(1, cur.count())
retrieved = cur.next()['uuid']
self.assertEqual(uu, retrieved)
# Test regular UUID queries (using subtype 4).
coll.insert({'uuid': uu})
self.assertEqual(2, coll.count())
cur = coll.find({'uuid': uu})
self.assertEqual(1, cur.count())
self.assertEqual(uu, cur.next()['uuid'])
# Test both.
cur = coll.find({'uuid': {'$in': [uu, UUIDLegacy(uu)]}})
self.assertEqual(2, cur.count())
coll.drop()
开发者ID:Schuk,项目名称:mongo-python-driver,代码行数:31,代码来源:test_binary.py
示例8: loop
def loop(pipe):
c = get_connection()
self.assertEqual(0, len(c._Connection__pool.sockets))
c.test.test.find_one()
c.end_request()
self.assertEqual(1, len(c._Connection__pool.sockets))
pipe.send(c._Connection__pool.sockets[0].getsockname())
开发者ID:drewp,项目名称:mongo-python-driver,代码行数:7,代码来源:test_pooling.py
示例9: test_auto_auth_login
def test_auto_auth_login(self):
conn = get_connection()
try:
conn.auth_test.test.find_one()
assert False # Find should have failed
except OperationFailure, e:
pass
开发者ID:jmurty,项目名称:mongo-python-driver,代码行数:7,代码来源:test_threads.py
示例10: test_uuid_queries
def test_uuid_queries(self):
if not should_test_uuid:
raise SkipTest()
c = get_connection()
coll = c.pymongo_test.test
coll.drop()
uu = uuid.uuid4()
coll.insert({"uuid": uuid.uuid4()})
coll.insert({"uuid": uuid.uuid4()})
coll.insert({"uuid": Binary(uu.bytes, 3)})
# Test UUIDLegacy queries.
self.assertEquals(0, coll.find({"uuid": uu}).count())
cur = coll.find({"uuid": UUIDLegacy(uu)})
self.assertEquals(1, cur.count())
retrieved = cur.next()["uuid"]
self.assertEquals(uu, retrieved)
# Test regular UUID queries.
coll.insert({"uuid": uu})
cur = coll.find({"uuid": uu})
self.assertEquals(1, cur.count())
self.assertEquals(uu, cur.next()["uuid"])
# Test both.
cur = coll.find({"uuid": {"$in": [uu, UUIDLegacy(uu)]}})
self.assertEquals(2, cur.count())
coll.drop()
开发者ID:hghazal,项目名称:mongo-python-driver,代码行数:30,代码来源:test_binary.py
示例11: setUp
def setUp(self):
self.c = get_connection()
# reset the db
self.c.drop_database(DB)
self.c[DB].unique.insert({"_id": "mike"})
self.c[DB].unique.find_one()
开发者ID:drewp,项目名称:mongo-python-driver,代码行数:7,代码来源:test_pooling.py
示例12: teardown
def teardown():
c = get_connection()
c.drop_database("pymongo-pooling-tests")
c.drop_database("pymongo_test")
c.drop_database("pymongo_test1")
c.drop_database("pymongo_test2")
c.drop_database("pymongo_test_mike")
开发者ID:YoSmudge,项目名称:APyMongo,代码行数:8,代码来源:__init__.py
示例13: _get_connection
def _get_connection(self):
"""
Intended for overriding in TestThreadsAuthReplicaSet. This method
returns a Connection here, and a ReplicaSetConnection in
test_threads_replica_set_connection.py.
"""
# Regular test connection
return get_connection()
开发者ID:Schuk,项目名称:mongo-python-driver,代码行数:8,代码来源:test_threads.py
示例14: setUp
def setUp(self):
self.c = get_connection(auto_start_request=False)
# reset the db
db = self.c[DB]
db.unique.drop()
db.test.drop()
db.unique.insert({"_id": "mike"})
db.test.insert([{} for i in range(1000)])
开发者ID:dampier,项目名称:mongo-python-driver,代码行数:9,代码来源:test_pooling.py
示例15: test_pool_with_fork
def test_pool_with_fork(self):
# Test that separate Connections have separate Pools, and that the
# driver can create a new Connection after forking
if sys.platform == "win32":
raise SkipTest("Can't test forking on Windows")
try:
from multiprocessing import Process, Pipe
except ImportError:
raise SkipTest("No multiprocessing module")
a = get_connection(auto_start_request=False)
a.test.test.remove(safe=True)
a.test.test.insert({'_id':1}, safe=True)
a.test.test.find_one()
self.assertEqual(1, len(a._Connection__pool.sockets))
a_sock = one(a._Connection__pool.sockets)
def loop(pipe):
c = get_connection(auto_start_request=False)
self.assertEqual(1,len(c._Connection__pool.sockets))
c.test.test.find_one()
self.assertEqual(1,len(c._Connection__pool.sockets))
pipe.send(one(c._Connection__pool.sockets).sock.getsockname())
cp1, cc1 = Pipe()
cp2, cc2 = Pipe()
p1 = Process(target=loop, args=(cc1,))
p2 = Process(target=loop, args=(cc2,))
p1.start()
p2.start()
p1.join(1)
p2.join(1)
p1.terminate()
p2.terminate()
p1.join()
p2.join()
cc1.close()
cc2.close()
b_sock = cp1.recv()
c_sock = cp2.recv()
self.assertTrue(a_sock.sock.getsockname() != b_sock)
self.assertTrue(a_sock.sock.getsockname() != c_sock)
self.assertTrue(b_sock != c_sock)
self.assertEqual(a_sock,
a._Connection__pool.get_socket((a.host, a.port)))
开发者ID:dampier,项目名称:mongo-python-driver,代码行数:53,代码来源:test_pooling.py
示例16: test_pool_with_fork
def test_pool_with_fork(self):
if sys.platform == "win32":
raise SkipTest()
try:
from multiprocessing import Process, Pipe
except ImportError:
raise SkipTest()
a = get_connection()
a.test.test.find_one()
a.end_request()
self.assertEqual(1, len(a._Connection__pool.sockets))
a_sock = a._Connection__pool.sockets[0]
def loop(pipe):
c = get_connection()
self.assertEqual(1, len(c._Connection__pool.sockets))
c.test.test.find_one()
self.assertEqual(0, len(c._Connection__pool.sockets))
c.end_request()
self.assertEqual(1, len(c._Connection__pool.sockets))
pipe.send(c._Connection__pool.sockets[0].getsockname())
cp1, cc1 = Pipe()
cp2, cc2 = Pipe()
p1 = Process(target=loop, args=(cc1,))
p2 = Process(target=loop, args=(cc2,))
p1.start()
p2.start()
p1.join(1)
p2.join(1)
p1.terminate()
p2.terminate()
p1.join()
p2.join()
cc1.close()
cc2.close()
b_sock = cp1.recv()
c_sock = cp2.recv()
self.assert_(a_sock.getsockname() != b_sock)
self.assert_(a_sock.getsockname() != c_sock)
self.assert_(b_sock != c_sock)
self.assertEqual(a_sock,
a._Connection__pool.get_socket(a.host, a.port)[0])
开发者ID:doncatnip,项目名称:mongo-python-driver,代码行数:52,代码来源:test_pooling.py
示例17: test_max_pool_size
def test_max_pool_size(self):
c = get_connection()
threads = []
for i in range(40):
t = CreateAndReleaseSocket(c)
t.start()
threads.append(t)
for t in threads:
t.join()
# There's a race condition, so be lenient
self.assert_(abs(10 - len(c._Connection__pool.sockets)) < 10)
开发者ID:jmurty,项目名称:mongo-python-driver,代码行数:14,代码来源:test_pooling.py
示例18: test_dependent_pools
def test_dependent_pools(self):
c = get_connection()
self.assertEqual(0, len(c._Connection__pool.sockets))
c.test.test.find_one()
self.assertEqual(0, len(c._Connection__pool.sockets))
c.end_request()
self.assertEqual(1, len(c._Connection__pool.sockets))
t = OneOp(c)
t.start()
t.join()
self.assertEqual(1, len(c._Connection__pool.sockets))
c.test.test.find_one()
self.assertEqual(0, len(c._Connection__pool.sockets))
开发者ID:drewp,项目名称:mongo-python-driver,代码行数:15,代码来源:test_pooling.py
示例19: setUp
def setUp(self):
conn = get_connection()
self.conn = conn
# Setup auth users
conn.admin.system.users.remove({})
conn.admin.add_user("admin-user", "password")
try:
conn.admin.system.users.find_one()
# If we reach here auth must be disabled in server
self.tearDown()
raise SkipTest()
except OperationFailure:
pass
conn.admin.authenticate("admin-user", "password")
conn.auth_test.system.users.remove({})
conn.auth_test.add_user("test-user", "password")
开发者ID:jmurty,项目名称:mongo-python-driver,代码行数:17,代码来源:test_threads.py
示例20: setUp
def setUp(self):
self.conn = get_connection()
# Setup auth users
self.conn.admin.system.users.remove({})
self.conn.admin.add_user('admin-user', 'password')
try:
self.conn.admin.system.users.find_one()
# If we reach here mongod was likely started
# without --auth. Skip this test since it's
# pointless without auth enabled.
self.tearDown()
raise SkipTest()
except OperationFailure:
pass
self.conn.admin.authenticate("admin-user", "password")
self.conn.auth_test.system.users.remove({})
self.conn.auth_test.add_user("test-user", "password")
开发者ID:Ivideon,项目名称:mongo-python-driver,代码行数:18,代码来源:test_threads.py
注:本文中的test_connection.get_connection函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论