本文整理汇总了Python中mongoengine.register_connection函数的典型用法代码示例。如果您正苦于以下问题:Python register_connection函数的具体用法?Python register_connection怎么用?Python register_connection使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了register_connection函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUp
def setUp( self ):
mongoengine.register_connection( mongoengine.DEFAULT_CONNECTION_NAME, 'mongoengine_relational_test' )
c = mongoengine.connection.get_connection()
c.drop_database( 'mongoengine_relational_test' )
# Setup application/request config
self.request = Request.blank( '/api/v1/' )
# Instantiate a DocumentCache; it will attach itself to `request.cache`.
DocumentCache( self.request )
self.config = testing.setUp( request=self.request )
# Setup data
d = self.data = Struct()
d.blijdorp = Zoo( id=ObjectId(), name='Blijdorp' )
d.office = Office( tenant=d.blijdorp )
d.bear = Animal( name='Baloo', species='bear', zoo=d.blijdorp )
d.mammoth = Animal( id=ObjectId(), name='Manny', species='mammoth' )
d.artis = Zoo( id=ObjectId(), name='Artis', animals=[ d.mammoth ] )
d.tiger = Animal( id=ObjectId(), name='Shere Khan', species='tiger', zoo=d.artis )
d.node = Node()
开发者ID:ProgressivePlanning,项目名称:mongoengine-relational,代码行数:25,代码来源:test_relationalmixin.py
示例2: tenant_handler
def tenant_handler():
host = request.headers.get('Host')
try:
tenant = etcd_client.read('/atrium/hosts/' + host + '/tenant_id').value
except etcd.EtcdKeyNotFound:
return abort(404)
tenant_keys = etcd_client.read('/atrium/tenants/' + tenant)
tenant_config = {}
for result in tenant_keys.children:
tenant_config[result.key.split('/atrium/tenants/' + tenant + '/')[1]] = result.value
g.tenant = tenant
openid_items = [
('authorization_endpoint', 'openid_authorization'),
('token_endpoint', 'openid_token'),
('userinfo_endpoint', 'openid_userinfo'),
('client_id', 'openid_client'),
('client_secret', 'openid_secret'),
('redirect_uri', 'openid_redirect'),
('issuer_key', 'openid_issuer_key'),
('issuer_claim', 'openid_issuer_claim')
]
g.openid = {}
for item in openid_items:
g.openid[item[0]] = str(tenant_config[item[1]])
mongoengine.register_connection(tenant, name=tenant, host=app.config['MONGODB_HOST'], port=app.config['MONGODB_PORT'], connect=False, tz_aware=True)
for schema in schemas:
setattr(g, schema.__name__, schema)
getattr(g, schema.__name__)._meta['db_alias'] = tenant
getattr(g, schema.__name__)._collection = None
开发者ID:elerion,项目名称:atrium,代码行数:33,代码来源:__init__.py
示例3: main
def main(global_config, **settings):
""" This function returns a Pyramid WSGI application.
"""
# authn_policy = AuthTktAuthenticationPolicy(
# 'gabbleblotchits')
# authz_policy = SNMAuthorizationPolicy()
config = Configurator(
# authentication_policy=authn_policy,
# authorization_policy=authz_policy,
settings=settings
)
config.include("pyramid_jinja2")
config.commit()
config.get_jinja2_environment().filters['epoch2readable'] = epoch2readable
config.get_jinja2_environment().filters['duration'] = duration
def addSettings(event):
event["sci_platform"] = settings["sci_platform"]
event["google_tracking_code"] = settings["google_tracking_code"]
config.add_subscriber(addSettings, BeforeRender)
# Configuring static assets
config.add_static_view(name="static", path="static")
config.add_settings({"sci_platform": settings["sci_platform"],
"google_tracking_code": settings["google_tracking_code"]})
config.add_route("home", "/")
config.add_route("dsm", "/dsm")
config.add_route("app_used_with", "/application/{name}/used_with")
config.add_route("app_usage", "/application/{name}/usage")
config.add_route("app_users", "/application/{name}/users")
config.add_route("app_dashboard", "/application/{name}/dashboard")
config.add_route("application", "/application/{name}")
config.add_route("app_pubs", "/application/{name}/publications")
config.add_route("app_gitprojects", "/application/{name}/gitprojects")
config.add_route("compare", "/compare")
config.add_route("about", "/about")
config.add_route("data_source", "/data_source")
config.add_route("browse", "/browse")
config.add_route("overview", "/overview")
config.add_route("login", "/login")
config.add_route("notebook", "/notebook")
config.add_route("status", "/status")
config.add_route("sys_usage", "/status/usage")
config.add_route("accept_login", "/accept_login")
config.add_route("api_home", "/api")
config.add_route("api_home.category", "/api/{category}")
config.add_route("api_home.category.id", "/api/{category}/{id}")
connect(settings['db_name'])
register_connection('raw-records', settings['raw_db_name'])
config.scan()
return config.make_wsgi_app()
开发者ID:Medida-CMU,项目名称:scisoft-net-map,代码行数:60,代码来源:__init__.py
示例4: init_mongoengine
def init_mongoengine():
hosts = []
for item in MONGO_DATABASES:
hosts.append('%s:%s'% (item['host'], item['port']))
print '-' * 40,'init mongoengine connect!'
mongoengine.register_connection('default', DBNAME, host=','.join(hosts))
开发者ID:xzregg,项目名称:z7z8,代码行数:7,代码来源:baozou_dfw.py
示例5: test_register_connection_defaults
def test_register_connection_defaults(self):
"""Ensure that defaults are used when the host and port are None.
"""
register_connection('testdb', 'mongoenginetest', host=None, port=None)
conn = get_connection('testdb')
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
开发者ID:OJFord,项目名称:mongoengine,代码行数:7,代码来源:test_connection.py
示例6: setUp
def setUp(self):
"""
Connects to the test database.
"""
# register_connection(TEST_DB_ALIAS, name=TEST_DB_NAME, host="10.1.8.102")
register_connection(TEST_DB_ALIAS, name=TEST_DB_NAME, host='mongodb://mongo')
self.client = Client()
self.maxDiff = None
开发者ID:hdzierz,项目名称:Kaka,代码行数:8,代码来源:tests.py
示例7: main
def main():
app.secret_key = 'as90kSDKO#[email protected]|4245losadim'
build_app(app)
register_connection('irianas_web', 'irianas_web')
register_connection('irianas_server', 'irianas_server')
app.run(debug=debug, ssl_context=context, host='0.0.0.0', port=9001,
threaded=True)
开发者ID:Irigonzalez,项目名称:irianas-server,代码行数:8,代码来源:main.py
示例8: setup_db
def setup_db( drop=True ):
mongoengine.register_connection( mongoengine.DEFAULT_CONNECTION_NAME, 'tastymongo_test' )
c = mongoengine.connection.get_connection()
if drop:
c.drop_database( 'tastymongo_test' )
return c
开发者ID:ProgressivePlanning,项目名称:TastyMongo,代码行数:8,代码来源:run_tests.py
示例9: register_mongo_connection
def register_mongo_connection(db):
register_connection(
"default",
name=db["MONGO_NAME"],
host=db["MONGO_HOST"],
port=db["MONGO_PORT"],
**{"maxPoolSize": db["MONGO_MAX_POOL_SIZE"]}
)
开发者ID:yuri1992,项目名称:LikeStatsDjango,代码行数:8,代码来源:base.py
示例10: connect_db
def connect_db():
from mongoengine import register_connection
from django.conf import settings
if hasattr(settings, "MONGODB_DATABASES"):
databases = settings.MONGODB_DATABASES
else:
databases = {}
db_name = getattr(settings, "MONGODB_DATABASE", "supportcenter")
host = getattr(settings, "MONGODB_HOST", None)
port = getattr(settings, "MONGODB_PORT", None)
username = getattr(settings, "MONGODB_USERNAME", None)
password = getattr(settings, "MONGODB_PASSWORD", None)
d = {'NAME': db_name}
if host:
d['HOST'] = host
if port:
d['PORT'] = port
if username:
d['USERNAME'] = username
if password:
d['PASSWORD'] = password
databases['default'] = d
for alias in ('fs', 'tmp', 'log'):
d = {'NAME': db_name+alias}
if host:
d['HOST'] = host
if port:
d['PORT'] = port
if username:
d['USERNAME'] = username
if password:
d['PASSWORD'] = password
databases[alias] = d
for alias, db in databases.iteritems():
kwargs = {}
kwargs['name'] = db.get('NAME')
if db.get('HOST'):
kwargs['host'] = db['HOST']
if db.get('PORT'):
kwargs['port'] = db['PORT']
if db.get('USERNAME'):
kwargs['username'] = db['USERNAME']
if db.get('PASSWORD'):
kwargs['password'] = db['PASSWORD']
register_connection(alias, **kwargs)
preload_models()
开发者ID:LethusTI,项目名称:supportcenter,代码行数:57,代码来源:__init__.py
示例11: mongo_connect
def mongo_connect():
register_connection(
"default",
app.config.get("MONGODB_DATABASE", "grano"),
host=app.config.get("MONGODB_HOST", "localhost"),
port=app.config.get("MONGODB_PORT", 27017),
username=app.config.get("MONGODB_USERNAME"),
password=app.config.get("MONGODB_PASSWORD"),
)
开发者ID:pombredanne,项目名称:grano,代码行数:9,代码来源:core.py
示例12: test_register_connection
def test_register_connection(self):
"""Ensure that connections with different aliases may be registered.
"""
register_connection('testdb', 'mongoenginetest2')
self.assertRaises(ConnectionError, get_connection)
conn = get_connection('testdb')
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db('testdb')
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest2')
开发者ID:OJFord,项目名称:mongoengine,代码行数:12,代码来源:test_connection.py
示例13: setup
def setup(self):
mongoengine.register_connection(mongoengine.DEFAULT_CONNECTION_NAME, TEST_DB_NAME, DB_HOST, DB_PORT)
# TODO
# this code is using pymongo
# maybe mongoengine have the same code
db = _get_db()
for name in db.collection_names(False):
db[name].remove()
self.cover = tempfile.TemporaryFile()
self.cover.write('cover')
self.cover.seek(0)
self.audio = tempfile.TemporaryFile()
self.audio.write('audio')
self.audio.seek(0)
开发者ID:DouFM,项目名称:wang_fm,代码行数:14,代码来源:utils.py
示例14: init_app
def init_app(self, app):
conn_settings = {
'db': app.config.get('MONGODB_DB', None),
'username': app.config.get('MONGODB_USERNAME', None),
'password': app.config.get('MONGODB_PASSWORD', None),
'host': app.config.get('MONGODB_HOST', None),
'port': int(app.config.get('MONGODB_PORT') or 0) or None
}
conn_settings = dict([(k, v) for k, v in conn_settings.items() if v])
# lazy connection
mongoengine.register_connection(
mongoengine.DEFAULT_CONNECTION_NAME,
conn_settings.pop('db', None),
**conn_settings
)
开发者ID:apophys,项目名称:zitkino.cz,代码行数:16,代码来源:mongo.py
示例15: config
def config():
import mongoengine
print '[config] start', datetime.datetime.now()
try:
mongoengine.connect('miao_fm', host=MONGODB_URL, port=MONGODB_PORT)
mongoengine.register_connection('miao_fm_cdn', 'miao_fm_cdn', host=MONGODB_URL, port=MONGODB_PORT)
except mongoengine.connection.ConnectionError:
print '[Error] Can\'t connect to MongoDB!'
os._exit(-1)
print '[user] add admin...'
add_user_admin()
print '[music] add demo music...'
add_demo_music()
print '[status] gen init status...'
update_init_status()
print '[config] finish', datetime.datetime.now()
开发者ID:carlware,项目名称:miao_fm,代码行数:16,代码来源:config.py
示例16: main
def main():
register_connection(alias="default", name=TARGET_DB_NAME)
with switch_db(Shred, "default") as TargetShred, \
switch_db(Cluster, "default") as TargetCluster:
client = pymongo.MongoClient()
source_db = client[SOURCE_DB_NAME]
for src_shred in source_db.shreds.find({}):
new_shred, cluster = transform_shred(src_shred)
shred_obj = TargetShred.objects.create(**new_shred)
cluster_member = ClusterMember(shred=shred_obj, position=[0, 0],
angle=0)
cluster['members'] = [cluster_member]
cluster_obj = TargetCluster.objects.create(**cluster)
开发者ID:kostyll,项目名称:unshred-tag,代码行数:17,代码来源:convert_db.py
示例17: global_init
def global_init(user=None, password=None, port=27017, server='localhost', use_ssl=True):
if user or password:
data = dict(
username=user,
password=password,
host=server,
port=port,
authentication_source='admin',
authentication_mechanism='SCRAM-SHA-1',
ssl=use_ssl,
ssl_cert_reqs=ssl.CERT_NONE)
mongoengine.register_connection(alias='core', name='demo_dealership', **data)
data['password'] = '*************'
print(" --> Registering prod connection: {}".format(data))
else:
print(" --> Registering dev connection")
mongoengine.register_connection(alias='core', name='demo_dealership')
开发者ID:sethips,项目名称:mongodb-for-python-developers,代码行数:17,代码来源:mongo_setup.py
示例18: __init__
def __init__(self, app, user_repr=None):
app.before_request(self.track_before)
app.after_request(self.track_after)
app.wsgi_app = WSGICopyBody(app.wsgi_app)
register_connection('tracking_db',name=app.config.get('TRACKING_DB_NAME','tracking_db'),host=app.config.get('DB_URL','localhost'),port=app.config.get('DB_PORT',27017))
self.hostname = socket.gethostname()
self.max_body_length = app.config.get('TRACKING_MAX_BODY_LENGTH', 64*1024)
self.exclude_paths = app.config.get('TRACKING_EXCLUDE', [])
self.exclude_body_paths = app.config.get('TRACKING_EXCLUDE_BODY', [])
self.exclude_status_codes = app.config.get('TRACKING_EXCLUDE_STATUS_CODES', [])
self.min_execution_time = app.config.get('TRACKING_MIN_EXECUTION_TIME', 0)
self.table_size = app.config.get('TRACKING_TABLE_SIZE', 100*1024*1024)
documents.Tracking._meta['max_size'] = self.table_size
if user_repr:
documents.Tracking.user_repr = user_repr
开发者ID:aop,项目名称:flask-tracking,代码行数:19,代码来源:__init__.py
示例19: register_database
def register_database(alias, host, dbname, replica_set=None,
username=None, password=None, auth_db=None,
logger=None):
if alias in _connection_settings:
# already registered
return
kwargs = dict()
if replica_set:
# the very presence of a replicaSet argument to 'register_connection' triggers the behavior
kwargs['replicaSet'] = replica_set
# MongoEngine converts 'host' to 'hosts_or_uri' for replica sets
kwargs['host'] = host
else:
hostname, _, port = host.partition(':')
kwargs['host'] = hostname
if port:
kwargs['port'] = int(port)
register_connection(
alias=alias,
name=dbname,
username=username,
password=password,
**kwargs)
# TODO: handle auth_db
# 'get_connection' has the side effect of creating and caching the
# connection on the first time that it's called
connection = get_connection(alias)
# automatically retry any queries that get disconnected. using exponential
# backoff for up to 60 seconds
if logger is None:
logger = current_app.logger
connection = MongoProxy(connection, logger=logger, wait_time=60)
# subsequent calls to 'get_connection' will return from the cache, so update
# the cached connection
_connections[alias] = connection
开发者ID:SlideAtlas,项目名称:SlideAtlas-Server,代码行数:39,代码来源:connection.py
示例20: setup_database
def setup_database(self):
# Disconnect from the default mongo db, and use a test db instead.
self.disconnect_dbs()
connection.connect(rhic_serve,
alias='default', tz_aware=True)
register_connection(rhic_serve, rhic_serve)
register_connection(checkin_service, checkin_service)
register_connection(report, report)
register_connection('default', rhic_serve)
for collection in ['rhic', 'account', 'user', 'fs.chunks']:
#print 'importing %s collection' % collection
call(['mongoimport', '--db', rhic_serve,
'-c', collection, '--file',
'%s.json' % os.path.join(settings.DUMP_DIR, collection)],
stdout=PIPE, stderr=PIPE)
for collection in ['splice_server', 'product']:
#print 'importing %s collection' % collection
call(['mongoimport', '--db', checkin_service,
'-c', collection, '--file',
'%s.json' % os.path.join(settings.DUMP_DIR, collection)],
stdout=PIPE, stderr=PIPE)
开发者ID:malditha,项目名称:report_server,代码行数:23,代码来源:general.py
注:本文中的mongoengine.register_connection函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论