本文整理汇总了Python中mongoengine.connect函数的典型用法代码示例。如果您正苦于以下问题:Python connect函数的具体用法?Python connect怎么用?Python connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了connect函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get
def get(self, request, *args, **kwargs):
"""Shows logs of imports that happened as a result of a git import"""
course_id = kwargs.get('course_id')
if course_id:
course_id = SlashSeparatedCourseKey.from_deprecated_string(course_id)
# Set mongodb defaults even if it isn't defined in settings
mongo_db = {
'host': 'localhost',
'user': '',
'password': '',
'db': 'xlog',
}
# Allow overrides
if hasattr(settings, 'MONGODB_LOG'):
for config_item in ['host', 'user', 'password', 'db', ]:
mongo_db[config_item] = settings.MONGODB_LOG.get(
config_item, mongo_db[config_item])
mongouri = 'mongodb://{user}:{password}@{host}/{db}'.format(**mongo_db)
error_msg = ''
try:
if mongo_db['user'] and mongo_db['password']:
mdb = mongoengine.connect(mongo_db['db'], host=mongouri)
else:
mdb = mongoengine.connect(mongo_db['db'], host=mongo_db['host'])
except mongoengine.connection.ConnectionError:
log.exception('Unable to connect to mongodb to save log, '
'please check MONGODB_LOG settings.')
if course_id is None:
# Require staff if not going to specific course
if not request.user.is_staff:
raise Http404
cilset = CourseImportLog.objects.all().order_by('-created')
else:
try:
course = get_course_by_id(course_id)
except Exception: # pylint: disable=broad-except
log.info('Cannot find course {0}'.format(course_id))
raise Http404
# Allow only course team, instructors, and staff
if not (request.user.is_staff or
CourseInstructorRole(course.id).has_user(request.user) or
CourseStaffRole(course.id).has_user(request.user)):
raise Http404
log.debug('course_id={0}'.format(course_id))
cilset = CourseImportLog.objects.filter(course_id=course_id).order_by('-created')
log.debug('cilset length={0}'.format(len(cilset)))
mdb.disconnect()
context = {'cilset': cilset,
'course_id': course_id.to_deprecated_string() if course_id else None,
'error_msg': error_msg}
return render_to_response(self.template_name, context)
开发者ID:Bvic,项目名称:edx-platform,代码行数:60,代码来源:sysadmin.py
示例2: test_connect_uri_without_db
def test_connect_uri_without_db(self):
"""Ensure connect() method works properly with uri's without database_name
"""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
c.admin.add_user("admin", "password")
c.admin.authenticate("admin", "password")
c.mongoenginetest.add_user("username", "password")
if not IS_PYMONGO_3:
self.assertRaises(ConnectionError, connect, "testdb_uri_bad", host='mongodb://test:[email protected]')
connect("mongoenginetest", host='mongodb://localhost/')
conn = get_connection()
self.assertTrue(isinstance(conn, pymongo.mongo_client.MongoClient))
db = get_db()
self.assertTrue(isinstance(db, pymongo.database.Database))
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.remove({})
c.mongoenginetest.system.users.remove({})
开发者ID:OJFord,项目名称:mongoengine,代码行数:25,代码来源:test_connection.py
示例3: test_gitlog_pagination_out_of_range_invalid
def test_gitlog_pagination_out_of_range_invalid(self):
"""
Make sure the pagination behaves properly when the requested page is out
of range.
"""
self._setstaff_login()
mongoengine.connect(TEST_MONGODB_LOG['db'])
for _ in xrange(15):
CourseImportLog(
course_id=CourseKey.from_string("test/test/test"),
location="location",
import_log="import_log",
git_log="git_log",
repo_dir="repo_dir",
created=datetime.now()
).save()
for page, expected in [(-1, 1), (1, 1), (2, 2), (30, 2), ('abc', 1)]:
response = self.client.get(
'{}?page={}'.format(
reverse('gitlogs'),
page
)
)
self.assertIn(
'Page {} of 2'.format(expected),
response.content
)
CourseImportLog.objects.delete()
开发者ID:mreyk,项目名称:edx-platform,代码行数:33,代码来源:test_sysadmin.py
示例4: handle
def handle(self, *args, **options):
''' The SuryaWebPortal initialization method.
'''
# Check if we have the right number of arguments
#if len(args) != 1:
# raise CommandError('Error insufficient params: use ./manage.py init -help for info')
# Check if mongod, is running
isMongod = False
processes = os.popen('''ps axo "%c"''')
for process in processes:
if 'mongod' in process:
isMongod = True
if not isMongod:
raise CommandError('Error please run mongod first')
# Import mongoengine and connect to the database
try:
import mongoengine
except:
raise CommandError('Error importing from mongoengine. Please ensure that mongoengine is installed')
# Drop the database SuryaDB (This Implies That we lose all the stored images as well)
mongoengine.connect('SuryaDB')
SuryaUploadData.drop_collection()
SuryaGroundTruth.drop_collection()
SuryaDeploymentData.drop_collection()
SuryaCalibrationData.drop_collection()
SuryaProcessingList.drop_collection()
SuryaIANAFailedResult.drop_collection()
SuryaIANAResult.drop_collection()
SuryaResult.drop_collection()
开发者ID:nalapati,项目名称:SuryaWebPortal,代码行数:35,代码来源:clearmongo.py
示例5: test_patch_unpatch
def test_patch_unpatch(self):
tracer = get_dummy_tracer()
# Test patch idempotence
patch()
patch()
client = mongoengine.connect(port=MONGO_CONFIG['port'])
Pin.get_from(client).clone(tracer=tracer).onto(client)
Artist.drop_collection()
spans = tracer.writer.pop()
assert spans, spans
eq_(len(spans), 1)
# Test unpatch
mongoengine.connection.disconnect()
unpatch()
mongoengine.connect(port=MONGO_CONFIG['port'])
Artist.drop_collection()
spans = tracer.writer.pop()
assert not spans, spans
# Test patch again
patch()
client = mongoengine.connect(port=MONGO_CONFIG['port'])
Pin.get_from(client).clone(tracer=tracer).onto(client)
Artist.drop_collection()
spans = tracer.writer.pop()
assert spans, spans
eq_(len(spans), 1)
开发者ID:tebriel,项目名称:dd-trace-py,代码行数:35,代码来源:test.py
示例6: test_connect_uri
def test_connect_uri(self):
"""Ensure that the connect() method works properly with URIs."""
c = connect(db='mongoenginetest', alias='admin')
c.admin.system.users.delete_many({})
c.mongoenginetest.system.users.delete_many({})
c.admin.command("createUser", "admin", pwd="password", roles=["root"])
c.admin.authenticate("admin", "password")
c.admin.command("createUser", "username", pwd="password", roles=["dbOwner"])
if not IS_PYMONGO_3:
self.assertRaises(
MongoEngineConnectionError, connect, 'testdb_uri_bad',
host='mongodb://test:[email protected]'
)
connect("testdb_uri", host='mongodb://username:[email protected]/mongoenginetest')
conn = get_connection()
self.assertIsInstance(conn, pymongo.mongo_client.MongoClient)
db = get_db()
self.assertIsInstance(db, pymongo.database.Database)
self.assertEqual(db.name, 'mongoenginetest')
c.admin.system.users.delete_many({})
c.mongoenginetest.system.users.delete_many({})
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:27,代码来源:test_connection.py
示例7: management_user_manage
def management_user_manage():
connect('user', port=27777)
users = MongoUser.objects()
if request.method == 'POST':
#
# check for selection
#
if 'selectedusers' in request.form:
data = dict(request.form)
usernames = data['selectedusers']
action = str(data['button'][0])
if action == 'delete':
for username in usernames:
user = MongoUser.objects(username=username)[0]
user.delete()
else:
for username in usernames:
user = MongoUser.objects(username=username)[0]
user.status = action
user.save()
users = MongoUser.objects()
return render_template('management/user_manage.html',
users=users,
with_edit="True",
states=['approved', 'pending', 'denied', 'blocked', 'delete'],
title="User Management")
开发者ID:lee212,项目名称:cloudmesh,代码行数:33,代码来源:management__.py
示例8: _connect_to_test_db
def _connect_to_test_db(cls):
me.connection.disconnect()
me.connect(
fixtures.DB_NAME,
host=c.MONGO_HOST,
port=c.MONGO_PORT
)
开发者ID:andyzg,项目名称:rmc,代码行数:7,代码来源:model_test_case.py
示例9: do_action
def do_action(self):
print "CLEANED", self.cleaned_data
try:
connect ('user', port=27777)
users = Users()
except:
print "ERROR: Internal Server error, please contact the admin"
pass
try:
user = User (
title = self.cleaned_data["title"],
firstname = self.cleaned_data["firstname"],
lastname = self.cleaned_data["lastname"],
email = self.cleaned_data["email"],
username = self.cleaned_data["username"],
password = self.cleaned_data["password"],
phone = self.cleaned_data["phone"],
department = self.cleaned_data["department"],
institution = self.cleaned_data["institution"],
address = self.cleaned_data["address"],
country = self.cleaned_data["country"],
citizenship = self.cleaned_data["citizenship"],
bio = self.cleaned_data["bio"],
)
users.add(user)
pass
except Exception, e:
print e
pass
开发者ID:cloudmesh,项目名称:management_old,代码行数:30,代码来源:forms.py
示例10: form_sources
def form_sources(self):
model_modules = [c for c in listdir(self.model_location) if
isfile(join(self.model_location, c)) if c != '__init__.py']
model_modules = [m for m in model_modules if PY_FILE_REGEX.match(m)]
for model_module in model_modules:
# get the name of the class
model_module = model_module.split('.')[0]
try:
module_path = self.load_path + '.' + model_module
model_class = model_module
module = importlib.import_module(module_path)
self.models.append(ModelIdentifier(db_name=module.db_name, db_alias=module.db_alias,
collection_name=module.collection_name,
fields=module.relation_fields,
payload_fields = module.payload_fields,
function_fields = module.function_fields,
model_class=module.model_class))
except (ImportError, Exception) as e:
raise RuntimeError("Error importing the module ", e)
for model in self.models:
if self.connect_str:
conn = connect(model.db_name, model.db_alias, host=self.connect_str)
else:
conn = connect(model.db_name, model.db_alias, host=MONGO_HOST, port=MONGO_PORT)
model_name = model.db_name + "." + model.db_alias + "." + model.collection_name
data_source = MongoDataSource(model_name, conn, model)
self.data_sources[model_name] = data_source
开发者ID:subhadeepmaji,项目名称:ml_algorithms,代码行数:29,代码来源:source.py
示例11: resetDB
def resetDB(name='tsp',host='localhost',port=27017,username=None,password=None):
''' DANGER: THIS WILL RESET DATABASE '''
import pymongo
# FIXME: Authenticate may break
conn=pymongo.Connection(host=host,port=port)
if name in conn.database_names():
# Backup
conn.copy_database(name,name+'_'+"_".join(map(str,localtime()[:5])),username=username,password=password)
# Drop Database
conn.drop_database(name)
connect(name,reconnect=True)
# (Re)initialize Database
Admin(
username='admin',
password=passwordHash('admin','admin'),
realname='administrator',
).save()
Settings(
phase=0,
ttl=86400,
).save()
开发者ID:Tydus,项目名称:tsp,代码行数:28,代码来源:util.py
示例12: setUp
def setUp(self):
connect("test")
self.user = mongoengine_auth.User(username="superuser")
self.user.set_password("password")
self.user.save()
开发者ID:ericmoritz,项目名称:flask-auth,代码行数:7,代码来源:mongoengine_auth.py
示例13: __init__
def __init__(self, release, logger=None):
self.release = release
mongoengine.connect('configdb')
config_obj = CMSSWConfig
config_obj._meta['collection'] = release
self.index = mongosearch.SearchIndex(config_obj, use_term_index=False)
self.logger = logger
开发者ID:dmwm,项目名称:DAS,代码行数:7,代码来源:base.py
示例14: _init
def _init():
global _initialized
global redis_client
global mongodb_client
global mongodb_client_db
global document_ids
if _initialized:
return
_initialized = True
_redis_pool = redis.ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=0
)
redis_client = redis.Redis(connection_pool=_redis_pool)
mongodb_client = pymongo.MongoClient(
host=settings.MONGODB_HOST,
port=settings.MONGODB_PORT
)
mongodb_client_db = mongodb_client[settings.MONGODB_DB]
connect(settings.MONGODB_DB,
host=settings.MONGODB_HOST,
port=settings.MONGODB_PORT
)
document_ids = _DocumentIds(mongodb_client_db)
开发者ID:yaosj,项目名称:sanguo-server,代码行数:33,代码来源:drives.py
示例15: test_connect_fails_if_connect_2_times_with_custom_alias
def test_connect_fails_if_connect_2_times_with_custom_alias(self):
connect('mongoenginetest', alias='alias1')
with self.assertRaises(MongoEngineConnectionError) as ctx_err:
connect('mongoenginetest2', alias='alias1')
self.assertEqual("A different connection with alias `alias1` was already registered. Use disconnect() first", str(ctx_err.exception))
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:7,代码来源:test_connection.py
示例16: test_connect_disconnect_works_on_same_document
def test_connect_disconnect_works_on_same_document(self):
"""Ensure that the connect/disconnect works properly with a single Document"""
db1 = 'db1'
db2 = 'db2'
# Ensure freshness of the 2 databases through pymongo
client = MongoClient('localhost', 27017)
client.drop_database(db1)
client.drop_database(db2)
# Save in db1
connect(db1)
class User(Document):
name = StringField(required=True)
user1 = User(name='John is in db1').save()
disconnect()
# Make sure save doesnt work at this stage
with self.assertRaises(MongoEngineConnectionError):
User(name='Wont work').save()
# Save in db2
connect(db2)
user2 = User(name='Bob is in db2').save()
disconnect()
db1_users = list(client[db1].user.find())
self.assertEqual(db1_users, [{'_id': user1.id, 'name': 'John is in db1'}])
db2_users = list(client[db2].user.find())
self.assertEqual(db2_users, [{'_id': user2.id, 'name': 'Bob is in db2'}])
开发者ID:MongoEngine,项目名称:mongoengine,代码行数:32,代码来源:test_connection.py
示例17: process
def process(event, host, port, db, collection):
connect(db, host=host, port=int(port))
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
meta_info = MetaInfo.objects(key='news_tweets_meta').first()
last_tweet_id = None if not meta_info else meta_info.value['toi']
for news_handle in news_handles:
tweets = tweepy.API(auth).user_timeline(screen_name=news_handle,
since_id=last_tweet_id,
count=50)
last_tweet_id = None
for tweet in tweets:
tweet = tweet._json
tweet = process_tweet(tweet)
if not tweet:
continue
if not last_tweet_id:
last_tweet_id = tweet.get('id')
status = save_tweet(tweet, host, port, db, collection)
if status:
print 'Tweet with id %d saved' % tweet.get('id')
else:
print 'Error while saving tweet %d ' % tweet.get('id')
if last_tweet_id:
obj = MetaInfo.objects(key='news_tweets_meta')
obj.update_one(set__value__toi=last_tweet_id, upsert=True)
开发者ID:arpitbbhayani,项目名称:lorvet,代码行数:32,代码来源:news_tweets.py
示例18: management_project_manage
def management_project_manage():
connect('user', port=27777)
projects = MongoProject.objects()
if request.method == 'POST':
#
# check for selection
#
if 'selectedprojects' in request.form:
data = dict(request.form)
project_ids = data['selectedprojects']
action = str(data['button'][0])
for projectid in project_ids:
project = MongoProject.objects(projectid=projectid)[0]
project.status = action
project.save()
connect('user', port=27777)
projects = MongoProject.objects()
return render_template('management/project_manage.html',
projects=projects, with_edit="True",
title="Project Management",
states=ProjectSTATUS)
开发者ID:lee212,项目名称:cloudmesh,代码行数:26,代码来源:management__.py
示例19: get_question
def get_question(tag):
mongoengine.connect('Words', host='mongodb://localhost/Words')
count = len(Question.objects(tags__name__istartswith=tag.encode()))
number = randint(1, count)
question = Question.objects(tags__name__istartswith=tag.encode())[number:number + 1][0]
option = Question.objects(id__ne=question.id)[:3]
wrong = []
tag = []
for index in option:
wrong.append(dict(word=index['answer'], answer=False))
wrong.append(dict(word=question.answer, answer=True))
for index in question.tags:
tag.append(dict(id=index['id'], name=index['name']))
js = {
'word': question.word,
'tags': json.loads(JSONEncoder().encode(tag)),
'option': json.loads(JSONEncoder().encode(wrong))
}
result = json.dumps(js)
resp = Response(result, status=200, mimetype='application/json')
return resp
开发者ID:cemkurtulus,项目名称:words,代码行数:28,代码来源:WordsApi.py
示例20: __init__
def __init__(self, handlers=None, default_host='', transforms=None, wsgi=False, **settings):
connect(options.db, host=options.host, replicaSet = options.replset)
super(Application, self).__init__(handlers=handlers,
default_host=default_host,
transforms=transforms,
wsgi=wsgi,
**settings)
开发者ID:fatelei,项目名称:storage-api,代码行数:7,代码来源:app.py
注:本文中的mongoengine.connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论