本文整理汇总了Python中mongoengine.connection.get_db函数的典型用法代码示例。如果您正苦于以下问题:Python get_db函数的具体用法?Python get_db怎么用?Python get_db使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_db函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_file_multidb
def test_file_multidb(self):
register_connection('test_files', 'test_files')
class TestFile(Document):
name = StringField()
the_file = FileField(db_alias="test_files",
collection_name="macumba")
TestFile.drop_collection()
# delete old filesystem
get_db("test_files").macumba.files.drop()
get_db("test_files").macumba.chunks.drop()
# First instance
test_file = TestFile()
test_file.name = "Hello, World!"
test_file.the_file.put(six.b('Hello, World!'),
name="hello.txt")
test_file.save()
data = get_db("test_files").macumba.files.find_one()
self.assertEqual(data.get('name'), 'hello.txt')
test_file = TestFile.objects.first()
self.assertEqual(test_file.the_file.read(), six.b('Hello, World!'))
test_file = TestFile.objects.first()
test_file.the_file = six.b('HELLO, WORLD!')
test_file.save()
test_file = TestFile.objects.first()
self.assertEqual(test_file.the_file.read(),
six.b('HELLO, WORLD!'))
开发者ID:mikeckennedy,项目名称:mongoengine,代码行数:34,代码来源:file_tests.py
示例2: init_db
def init_db():
with current_app.app_context():
folder_name = app.config.get('INIT_DATA_FOLDER_NAME')
folder_path = ResourceLoader.get().get_resoure(folder_name).path
if folder_path and os.path.isdir(folder_path):
for data_file in os.listdir(folder_path):
with open(folder_path + os.path.sep + data_file, 'r') as mqls:
get_db().eval(mqls.read())
开发者ID:IamFive,项目名称:gc,代码行数:8,代码来源:__init__.py
示例3: _dump_collections
def _dump_collections(collection_names=None):
if collection_names is None:
collection_names = [coll for coll in get_db().collection_names() if coll != 'system.indexes']
for coll in collection_names:
subprocess.call([
'mongoexport',
'-d', '%s' % get_db().name,
'-c', '%s' % coll,
'-o', 'output_%s.json' % coll])
开发者ID:snowcloud,项目名称:engine-groups,代码行数:9,代码来源:tests.py
示例4: _load_collections
def _load_collections(collection_names=None, drop='--drop'):
if collection_names is None:
collection_names = [coll for coll in get_db().collection_names() if coll != 'system.indexes']
for coll in collection_names:
subprocess.call([
'mongoimport',
'-d', '%s' % get_db().name,
'-c', '%s' % coll,
'--file', 'output_%s.json' % coll,
'%s' % drop])
开发者ID:snowcloud,项目名称:engine-groups,代码行数:10,代码来源:tests.py
示例5: setUp
def setUp(self):
# データベースに接続
addr = '127.0.0.1'
port = 27017
connect('test', host=addr, port=port)
self.conn = get_connection()
self.db = get_db()
开发者ID:JFK,项目名称:python-tornado-site-template,代码行数:7,代码来源:test_user.py
示例6: create_text_indexes
def create_text_indexes(request,service=None):
url = '/create_indexes/'
db = get_db()
print 'INDEX'
print service
if service=='facebook' or service == None:
print 'ensure facebook index'
db.facebook_data.ensure_index([("$**","text")],name="FacebookTextIndex")
if service=='twitter' or service == None:
print 'ensure twitter index'
db.twitter_data.ensure_index([("$**","text")],name="TwitterTextIndex")
if service=='foursquare' or service == None:
print 'ensure foursquare index'
db.foursquare_data.ensure_index([("$**","text")],name="FoursquareTextIndex")
if service=='dropbox' or service == None:
print 'ensure dropbox index'
db.dropbox_data.ensure_index([("$**","text")],name="DropboxTextIndex")
if service=='linkedin' or service == None:
print 'ensure linkedin index'
db.linked_in_data.ensure_index([("$**","text")],name="LinkedInTextIndex")
if service=='googledrive' or service == None:
print 'ensure googledrive index'
db.gdrive_data.ensure_index([("$**","text")],name="GDriveTextIndex")
if service=='gcal' or service == None:
print 'ensure gcal index'
db.gcal_data.ensure_index([("$**","text")],name="GCalendarTextIndex")
if service=='googlelattitde' or service == None:
print 'ensure latitude index'
db.g_latitude_data.ensure_index([("$**","text")],name="GoogleLatitudeTextIndex")
return HttpResponseRedirect(url)
开发者ID:ameliemarian,项目名称:DigitalSelf,代码行数:32,代码来源:index.py
示例7: test_query_counter
def test_query_counter(self):
connect('mongoenginetest')
db = get_db()
collection = db.query_counter
collection.drop()
def issue_1_count_query():
collection.find({}).count()
def issue_1_insert_query():
collection.insert_one({'test': 'garbage'})
def issue_1_find_query():
collection.find_one()
counter = 0
with query_counter() as q:
self.assertEqual(q, counter)
self.assertEqual(q, counter) # Ensures previous count query did not get counted
for _ in range(10):
issue_1_insert_query()
counter += 1
self.assertEqual(q, counter)
for _ in range(4):
issue_1_find_query()
counter += 1
self.assertEqual(q, counter)
for _ in range(3):
issue_1_count_query()
counter += 1
self.assertEqual(q, counter)
开发者ID:srinivasreddy,项目名称:mongoengine,代码行数:35,代码来源:test_context_managers.py
示例8: test_connect_uri_with_authsource
def test_connect_uri_with_authsource(self):
"""Ensure that the connect() method works well with
the option `authSource` in URI.
"""
# Create users
c = connect('mongoenginetest')
c.admin.system.users.remove({})
c.admin.add_user('username', 'password')
# Authentication fails without "authSource"
self.assertRaises(
ConnectionError, connect, 'mongoenginetest', alias='test1',
host='mongodb://username:[email protected]/mongoenginetest'
)
self.assertRaises(ConnectionError, get_db, 'test1')
# Authentication succeeds with "authSource"
connect(
'mongoenginetest', alias='test2',
host=('mongodb://username:[email protected]/'
'mongoenginetest?authSource=admin')
)
db = get_db('test2')
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
# Clear all users
c.admin.system.users.remove({})
开发者ID:MasterMind2k,项目名称:mongoengine,代码行数:28,代码来源:test_connection.py
示例9: _delete_expired_gridfs_files
def _delete_expired_gridfs_files(cls, expired_date, collection_name):
database = get_db()
collection = database[collection_name]
files = collection.files
# chunks = collection.chunks
# 只要删除files就可以,chunks会自动处理
files.delete_many({"uploadDate": {'$lt': expired_date}})
开发者ID:restran,项目名称:api-gateway-dashboard,代码行数:7,代码来源:models.py
示例10: test_connect_uri
def test_connect_uri(self):
"""Ensure that the connect() method works properly with uri's
"""
c = connect(alias='admin')
register_db('mongoenginetest', 'admin', 'admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
self.assertRaises(
ConnectionError, connect, "testdb_uri_bad",
host='mongodb://test:[email protected]')
# Whilst database names can be specified in the URI, they are ignored
# in mongoengine since the DB/connection split
connect(host='mongodb://username:[email protected]/mongoenginetest')
register_db('testdb_uri')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.connection.Connection))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'testdb_uri')
开发者ID:aszwemin,项目名称:mongoengine,代码行数:27,代码来源:connection.py
示例11: _sync
def _sync(cls, key, src):
tgt, created = cls.objects.get_or_create(awsid=src[key])
modified = set()
for k, v in src.items():
t_v = getattr(tgt, k, None)
if t_v != v:
setattr(tgt, k, v)
modified.add(k)
if created:
tgt.ctime = datetime.datetime.utcnow()
tgt.awsid = src[key]
elif modified:
print "Modified", src[key], modified
data = tgt.to_mongo()
data['res_class'] = cls.__name__.lower()
del data['_id']
db = connection.get_db()
db.versions.insert(data, w=1)
if created or modified:
tgt.ltime = datetime.datetime.utcnow()
tgt.save()
if created:
return CREATED
if modified:
return MODIFIED
return NOCHANGE
开发者ID:kapilt,项目名称:zephyr,代码行数:30,代码来源:models.py
示例12: test_connect_uri
def test_connect_uri(self):
"""Ensure that the connect() method works properly with URIs."""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
if not IS_PYMONGO_3:
self.assertRaises(
MongoEngineConnectionError, connect, 'testdb_uri_bad',
host='mongodb://test:[email protected]'
)
connect("testdb_uri", host='mongodb://username:[email protected]/mongoenginetest')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
开发者ID:mikeckennedy,项目名称:mongoengine,代码行数:27,代码来源:test_connection.py
示例13: default
def default(self, obj):
if isinstance(obj, QuerySet):
return list(obj)
if isinstance(obj, DBRef):
doc = get_db().dereference(obj)
try:
doc.pop('_cls')
doc.pop('_types')
except:
pass
return doc
if isinstance(obj, (Document, EmbeddedDocument)):
doc = obj.to_mongo()
try:
doc.pop('_cls')
doc.pop('_types')
except:
pass
return doc
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, datetime):
return obj.isoformat().replace('T', ' ')
elif isinstance(obj, (date, time)):
return obj.isoformat()
return JSONEncoder.default(self, obj)
开发者ID:cloud9ers,项目名称:j25framework,代码行数:26,代码来源:JSONCustomEncoder.py
示例14: __init__
def __init__(self, ext):
"""
Constructor.
:param ext: instance of :class:`EveMongoengine`.
"""
# get authentication info
username = ext.app.config.get('MONGO_USERNAME')
password = ext.app.config.get('MONGO_PASSWORD')
auth = (username, password)
if any(auth) and not all(auth):
raise ConfigException('Must set both USERNAME and PASSWORD '
'or neither')
# try to connect to db
self.conn = connect(ext.app.config['MONGO_DBNAME'],
host=ext.app.config['MONGO_HOST'],
port=ext.app.config['MONGO_PORT'])
self.models = ext.models
self.app = ext.app
# create dummy driver instead of PyMongo, which causes errors
# when instantiating after config was initialized
self.driver = type('Driver', (), {})()
self.driver.db = get_db()
# authenticate
if any(auth):
self.driver.db.authenticate(username, password)
开发者ID:aequitas,项目名称:eve-mongoengine,代码行数:26,代码来源:datalayer.py
示例15: test_connect_uri_without_db
def test_connect_uri_without_db(self):
"""Ensure that the connect() method works properly with uri's
without database_name
"""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
self.assertRaises(ConnectionError, connect, "testdb_uri_bad", host='mongodb://test:[email protected]')
connect("mongoenginetest", host='mongodb://localhost/')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
开发者ID:ThisGuyCodes,项目名称:mongoengine,代码行数:25,代码来源:test_connection.py
示例16: test_connect_uri_without_username_password
def test_connect_uri_without_username_password(self):
"""Ensure that the connect() method works properly with a uri,
when the username/password is specified outside the uri
"""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
conn = connect(alias='test_uri_no_username', host='mongodb://localhost/mongoenginetest', username="username", password="password")
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
# Since the mongodb instance used for testing doesn't require
# authentication (and turning that on breaks some 85 tests), and there
# doesn't appear to be any way to check to see if a connection has
# authenticated, I instead expose some internals of mongoengine to
# make sure the correct settings have been saved.
# Without this, instead of the test failing everything would appear to
# work fine, but there would be no username/password on the
# connection.
self.assertEqual(me_connection._connection_settings['test_uri_no_username']['username'], 'username')
self.assertEqual(me_connection._connection_settings['test_uri_no_username']['password'], 'password')
db = get_db(alias='test_uri_no_username')
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
开发者ID:ThisGuyCodes,项目名称:mongoengine,代码行数:32,代码来源:test_connection.py
示例17: handle_noargs
def handle_noargs(self, **options):
database = options.get('database')
verbosity = int(options.get('verbosity'))
interactive = options.get('interactive')
if not database:
raise base.CommandError("No MongoDB database specified.")
db = connection.get_db(database)
if interactive:
confirm = raw_input("""You have requested a flush of the database.
This will IRREVERSIBLY DESTROY all data currently in the '%s' database (alias '%s').
Are you sure you want to do this?
Type 'yes' to continue, or 'no' to cancel: """ % (db.name, database))
else:
confirm = 'yes'
if confirm == 'yes':
try:
for collection in db.collection_names():
if collection == 'system.indexes':
continue
db.drop_collection(collection)
except Exception, e:
raise base.CommandError("""Database '%s' couldn't be flushed.
The full error: %s""" % (database, e))
if verbosity > 1:
self.stdout.write("Database '%s' flushed.\n" % database)
开发者ID:Domen91,项目名称:PiplMesh,代码行数:31,代码来源:flush.py
示例18: __init__
def __init__(self, ext):
"""
Constructor.
:param ext: instance of :class:`EveMongoengine`.
"""
# get authentication info
username = ext.app.config['MONGO_USERNAME']
password = ext.app.config['MONGO_PASSWORD']
auth = (username, password)
if any(auth) and not all(auth):
raise ConfigException('Must set both USERNAME and PASSWORD '
'or neither')
# try to connect to db
self.conn = connect(ext.app.config['MONGO_DBNAME'],
host=ext.app.config['MONGO_HOST'],
port=ext.app.config['MONGO_PORT'])
self.models = ext.models
self.app = ext.app
# create dummy driver instead of PyMongo, which causes errors
# when instantiating after config was initialized
self.driver = type('Driver', (), {})()
self.driver.db = get_db()
# authenticate
if any(auth):
self.driver.db.authenticate(username, password)
# helper object for managing PATCHes, which are a bit dirty
self.updater = MongoengineUpdater(self)
# map resource -> Mongoengine class
self.cls_map = ResourceClassMap(self)
开发者ID:satyanani40,项目名称:eve-mongoengine,代码行数:30,代码来源:datalayer.py
示例19: test_connect_uri_with_authsource
def test_connect_uri_with_authsource(self):
"""Ensure that the connect() method works well with
the option `authSource` in URI.
This feature was introduced in MongoDB 2.4 and removed in 2.6
"""
# Create users
c = connect('mongoenginetest')
c.admin.system.users.remove({})
c.admin.add_user('username2', 'password')
# Authentication fails without "authSource"
if IS_PYMONGO_3:
test_conn = connect('mongoenginetest', alias='test1',
host='mongodb://username2:[email protected]/mongoenginetest')
self.assertRaises(OperationFailure, test_conn.server_info)
else:
self.assertRaises(
ConnectionError, connect, 'mongoenginetest', alias='test1',
host='mongodb://username2:[email protected]/mongoenginetest'
)
self.assertRaises(ConnectionError, get_db, 'test1')
# Authentication succeeds with "authSource"
connect(
'mongoenginetest', alias='test2',
host=('mongodb://username2:[email protected]/'
'mongoenginetest?authSource=admin')
)
# This will fail starting from MongoDB 2.6+
db = get_db('test2')
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
# Clear all users
c.admin.system.users.remove({})
开发者ID:OJFord,项目名称:mongoengine,代码行数:35,代码来源:test_connection.py
示例20: test_connect_uri
def test_connect_uri(self):
"""Ensure that the connect() method works properly with URIs."""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.delete_many({})
c.mongoenginetest.system.users.delete_many({})
c.admin.command("createUser", "admin", pwd="password", roles=["root"])
c.admin.authenticate("admin", "password")
c.admin.command("createUser", "username", pwd="password", roles=["dbOwner"])
if not IS_PYMONGO_3:
self.assertRaises(
MongoEngineConnectionError, connect, 'testdb_uri_bad',
host='mongodb://test:[email protected]'
)
connect("testdb_uri", host='mongodb://username:[email protected]/mongoenginetest')
conn = get_connection()
self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)
db = get_db()
self.assertIsInstance(db, pymongo.database.Database)
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.delete_many({})
c.mongoenginetest.system.users.delete_many({})
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:27,代码来源:test_connection.py
注:本文中的mongoengine.connection.get_db函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论