本文整理汇总了Python中mongokit.Connection类的典型用法代码示例。如果您正苦于以下问题:Python Connection类的具体用法?Python Connection怎么用?Python Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: init
def init():
global connection
logger.info('connecting to database')
#conn_args = getattr(config,'conn_args',{})
connection = Connection()
logger.info('database connected ')
connection.register([User])
开发者ID:iceout,项目名称:group_talk_bot,代码行数:7,代码来源:structure.py
示例2: main
def main(*args):
con = Connection()
con.register([Vote, GistPoints, UserPoints,
User, Gist, Comment])
db = con[settings.DEFAULT_DATABASE_NAME]
for user in db.User.find():
up = db.UserPoints.one({'user.$id': user._id})
total_up = 0
if not up:
up = db.UserPoints()
up.points = 0
up.user = user
for gist in db.Gist.find({'user.$id': user._id}):
gp = db.GistPoints.one({'gist.$id': gist._id})
if not gp:
gp = db.GistPoints()
gp.gist = gist
gp.points = 0
gp.points = sum(x['points'] for x in
db.Vote.collection.find({'gist.$id': gist._id}))
gp.save()
total_up += gp.points
up.points = total_up
up.save()
return 0
开发者ID:alexmerser,项目名称:tornado_gists,代码行数:29,代码来源:recalculate_voting_points.py
示例3: test_create_test_database_by_specific_good_name
def test_create_test_database_by_specific_good_name(self):
from django.conf import settings
try:
assert 'mongodb' in settings.DATABASES
except AttributeError:
# Django <1.2
return
settings.DATABASES['mongodb']['TEST_NAME'] = "test_mustard"
old_database_name = settings.DATABASES['mongodb']['NAME']
from django.db import connections
connection = connections['mongodb']
connection.creation.create_test_db()
test_database_name = settings.DATABASES['mongodb']['NAME']
self.assertTrue('test_' in test_database_name)
from mongokit import Connection
con = Connection()
# the test database isn't created till it's needed
self.assertTrue(test_database_name not in con.database_names())
# creates it
db = con[settings.DATABASES['mongodb']['NAME']]
coll = db.test_collection_name
# do a query on the collection to force the database to be created
list(coll.find())
test_database_name = settings.DATABASES['mongodb']['NAME']
self.assertTrue(test_database_name in con.database_names())
connection.creation.destroy_test_db(old_database_name)
self.assertTrue('test_mustard' not in settings.DATABASES['mongodb']['NAME'])
self.assertTrue(test_database_name not in con.database_names())
开发者ID:dominno,项目名称:django-mongokit,代码行数:32,代码来源:tests.py
示例4: run
def run(max_updates=5):
from apps.main.models import User
from mongokit import Connection
con = Connection()
con.register([User])
import settings
db = con[settings.DEFAULT_DATABASE_NAME]
print db.User.find().count(), "users in total"
today = datetime.date.today()
today = datetime.datetime(*(today.timetuple()[:6]))
search = {'modify_date': {'$lt': today}}
print db.User.find(search).count(), "left to update today"
for user in db.User.find(search).sort('modify_date', 1).limit(max_updates):
print repr(user.login), user.modify_date
details = pull_details(user.login)
if details is None:
print "FAIL!"
print "??? http://github.com/api/v2/json/user/show/%s" % user.login
continue
for key in 'name company email gravatar_id'.split():
if key in details:
if getattr(user, key, '') != details[key]:
print "\t", key, repr(getattr(user, key, '*blank*')),
print '-->', repr(details[key])
setattr(user, key, details[key])
user.modify_date = datetime.datetime.now()
user.save()
开发者ID:alexmerser,项目名称:tornado_gists,代码行数:27,代码来源:update-user-details.py
示例5: get_connection
def get_connection():
Item.__database__ = settings.MONGO_DB
connection = Connection(host=settings.MONGO_HOST)
connection.register([Item])
return connection
开发者ID:rockybars,项目名称:grimlock,代码行数:7,代码来源:connect.py
示例6: get_connection
def get_connection(models_to_register):
ctx = _app_ctx_stack.top
con = getattr(ctx, 'synced_database', None)
if con is None:
con = Connection(os.environ['MONGOHQ_CONN'])
con.register(models_to_register)
ctx.synced_database = con
return con
开发者ID:dkislyuk,项目名称:synced.fm,代码行数:8,代码来源:db.py
示例7: get_tracks
def get_tracks(self):
connection = Connection() if conn_str is None else Connection(conn_str)
connection.register([TrackData])
for track_id in self.data:
track = connection.TrackData.one({"_id": ObjectId(track_id)})
yield (track.label, track.data)
开发者ID:dkislyuk,项目名称:synced.fm,代码行数:8,代码来源:models.py
示例8: setUp
def setUp(self):
if not self._once:
self._once = True
from mongokit import Connection
con = Connection()
con.register([User, Event, UserSettings, Share,
FeatureRequest, FeatureRequestComment])
self.db = con.test
self._emptyCollections()
开发者ID:amitdash,项目名称:worklog,代码行数:9,代码来源:base.py
示例9: FlaskMongoTestCase
class FlaskMongoTestCase(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.raw_app = app
self.app = app.test_client()
self.db = Connection(app.config['MONGODB_HOST'], app.config['MONGODB_PORT'])[app.config['MONGODB_DATABASE']]
def tearDown(self):
self.db.drop_collection("runs")
开发者ID:kwilcox,项目名称:ioos-service-monitor,代码行数:9,代码来源:flask_mongo.py
示例10: Application
class Application(tornado.web.Application):
def __init__(self, handlers, database_name=None, **kwargs):
tornado.web.Application.__init__(self, handlers, **kwargs)
self.database_name = database_name and database_name or options.database_name
self.con = Connection()
self.con.register([User, ChatMessage])
@property
def db(self):
return self.con[self.database_name]
开发者ID:peterbe,项目名称:tornado_gkc,代码行数:10,代码来源:chatserver.py
示例11: delete_data
def delete_data(self):
connection = Connection() if conn_str is None else Connection(conn_str)
connection.register([TrackData])
for track_id in self.data:
track = connection.TrackData.one({"_id": ObjectId(track_id)})
track.delete()
self.data = []
self.save()
开发者ID:dkislyuk,项目名称:synced.fm,代码行数:10,代码来源:models.py
示例12: NotablyTestCase
class NotablyTestCase(unittest.TestCase):
'''Test case for notably--MONGO DB MUST BE RUNNING ON localhost:27107'''
def setUp(self):
app.config.update({
'DATABASE': 'test',
'MONGODB_HOST': 'localhost',
'MONGODB_PORT': 27017,
'TESTING': True,
'SECRET_KEY': 'testing key',
})
self.generic_entry = {'content': 'test', 'rows': '1', 'date': datetime.datetime.now()}
self.generic_user = {'name': 'test_user', 'pw': 'default'}
self.app = app.test_client()
self.conn = Connection(app.config['MONGODB_HOST'], app.config['MONGODB_PORT'])
#self.conn.register([Entry, User])
self.conn.register([Entry])
#reset the collections
self.conn[app.config['DATABASE']].entries.drop()
self.conn[app.config['DATABASE']].users.drop()
#we don't actually need to tear anything down--but we'll leave this in case that changes
def tearDown(self):
pass
def test_entries_view(self):
'''tests that the database is queried, index.html is rendered and returned.
there are no entries in the test db so the index page will have nothing but and empty textarea'''
rv = self.app.get('/')
assert '<!DOCTYPE html>' in rv.data
def test_add_entry(self):
'''tests adding a new entry'''
rv = self.app.post('/update/', data=self.generic_entry, follow_redirects=True)
assert 'Bad Request' not in rv.data
def test_update_entry(self):
'''test modifying an existing entry'''
#generate a new entry
entries = self.conn[app.config['DATABASE']].entries
entry = entries.Entry()
entry.content.append(u'test')
entry.rows.append(1)
entry.date.append(datetime.datetime.now())
entry.save()
new_entry = self.generic_entry
new_entry.update({'id': entry._id})
#post an entry update using the id of the entry we just created
rv = self.app.post('/update/', data=new_entry, follow_redirects=True)
import pymongo
entry = entries.Entry.one({'_id': pymongo.objectid.ObjectId(entry._id)})
assert len(entry.content) == 2
assert 'Bad Request' not in rv.data
开发者ID:BenjaminMalley,项目名称:Notably,代码行数:54,代码来源:notably_tests.py
示例13: init_connection
def init_connection(app):
"""
Init DB connection and register models (documents)
"""
config = get_config(app)
conn = Connection(**config['connection'])
conn.register([Product, Category])
return conn
开发者ID:twil,项目名称:catalog-backend,代码行数:11,代码来源:database.py
示例14: get_mongo_objs
def get_mongo_objs(host, port, dbname,
username=None, passwd=None):
"""
:return: Database connection, database instance and collection instance.
"""
connection = Connection(host=host, port=port,
read_preference=ReadPreference.NEAREST)
connection.register([BenchmarkResult])
benchresult = getattr(connection, dbname).benchresults.BenchmarkResult
db = Database(connection, dbname)
if username is not None:
db.authenticate(username, password=passwd)
return connection, db, benchresult
开发者ID:breynisson,项目名称:AutobenchmarkUI,代码行数:13,代码来源:dbmodel.py
示例15: MongoFest
class MongoFest(BaseDB):
def __init__(self,app):
self.conn = Connection(app.config['MONGODB_HOST'],app.config['MONGODB_PORT'])
self.conn.register(Host)
self.logger = app.logger
def find(self,hostname='',tags=''):
if hostname == '':
return [ self._clean_oid(h) for h in self.conn.sysfest.Host.find().sort("hostname", pymongo.ASCENDING) ]
elif hostname != '':
regx = re.compile(hostname)
return [ self._clean_oid(h) for h in self.conn.sysfest.Host.find({"$or":[{'hostname':regx},{'homes.hostnames.val':regx}]}).sort("hostname", pymongo.ASCENDING) ]
def search(self,query):
tag_pattern = re.compile('tag:(\w+)')
host_pattern = re.compile('host:([\w\.-]+)')
tags = tag_pattern.findall(query)
hosts = host_pattern.findall(query)
query = tag_pattern.sub('',query)
query = host_pattern.sub('',query)
key_words = query.split(' ')
terms = [ {"description":re.compile(x,flags=re.I)} for x in key_words]
if hosts:
terms = terms + [{"$or":[{'hostname':re.compile(x)},{'homes.hostnames.val':re.compile(x)}]} for x in hosts]
if tags: #pythonic (moronic) way to check if an array is empty
terms = terms + [{"tags":{"$all":tags}}]
return [ self._clean_oid(h) for h in self.conn.sysfest.Host.find({"$and":terms}).sort("hostname", pymongo.ASCENDING) ]
def find_one(self,host_id):
if isinstance(host_id, basestring):
host_id=bson.objectid.ObjectId(host_id)
host = self._clean_oid(self.conn.sysfest.Host.find_one({'_id':host_id}))
return host
def update(self,host_id,values):
self.conn.sysfest.hosts.update({"_id":bson.objectid.ObjectId(host_id)},{"$set":values})
return self.find_one(host_id)
def create(self,values):
oid = self.conn.sysfest.hosts.insert(values)
return self.find_one(host_id=oid)
def delete(self,host_id):
return self.conn.sysfest.hosts.remove({'_id':bson.objectid.ObjectId(host_id)})
def close(self):
self.conn.disconnect()
def _clean_oid(self,host):
if host is not None and isinstance(host["_id"], bson.objectid.ObjectId):
host["_id"]=str(host["_id"])
return host
开发者ID:ASCIIDuck,项目名称:SysFest,代码行数:51,代码来源:mongo.py
示例16: DBConnection
class DBConnection(object):
def __init__(self):
#mongodb_uri = "mongodb://13.76.244.22:27017"
#mongolab_uri = "mongodb://shahid:[email protected]:62818/MongoLab-25"
#connectionString = 'mongodb://MongoLab-25:[email protected]:62818/MongoLab-25'
#self.client = MongoClient(mongolab_uri,
# connectTimeoutMS=30000,
# socketTimeoutMS=None,
# socketKeepAlive=True)
#self.db = self.client.get_default_database()
#self.con = Connection(mongodb_uri)
self.con = Connection()
self.con.register([Route])
self.con.register([Stop])
self.routes = self.con.Route
self.stops = self.con.Stop
def __del__(self):
self.con.close()
self.con.disconnect()
def connect():
con = Connection()
routes = con.amdoraft.routes
stops = con.amdoraft.stops
return
def disconnect():
return
开发者ID:codeforpakistan,项目名称:Aamad-O-Raft,代码行数:32,代码来源:DBConnection.py
示例17: init
def init(host, port, db='test'):
global connection
global initialized
global cur_db
global root
global dbcon
connection = Connection(host, port)
initialized = True
cur_db = db
dbcon = connection[cur_db]
# register the User document with our current connection
connection.register([models.Item, models.Relation, models.User, models.Comment])
root = getRootItem()
开发者ID:yedi,项目名称:Avalon,代码行数:15,代码来源:db_operations.py
示例18: FlaskMongoTestCase
class FlaskMongoTestCase(unittest.TestCase):
def setUp(self):
app.config['TESTING'] = True
self.app = app.test_client()
self.db = Connection(app.config['MONGODB_HOST'], app.config['MONGODB_PORT'])[app.config['MONGODB_DATABASE']]
with self.app.session_transaction() as sess:
sess['user_email'] = u'[email protected]'
def tearDown(self):
self.db.drop_collection("capability")
self.db.drop_collection("diel")
self.db.drop_collection("taxis")
self.db.drop_collection("lifestage")
self.db.drop_collection("libraries")
self.db.drop_collection("users")
开发者ID:asascience-open,项目名称:larva-library,代码行数:15,代码来源:flask_mongo.py
示例19: TypesTestCase
class TypesTestCase(unittest.TestCase):
def setUp(self):
self.connection = Connection()
self.col = self.connection['test']['mongokit']
def tearDown(self):
self.connection.drop_database('test')
def test_authorized_type(self):
for auth_type in SchemaDocument.authorized_types:
if auth_type is dict:
auth_type = {}
class MyDoc(SchemaDocument):
structure = { "foo":auth_type }
if type(auth_type) is dict:
assert MyDoc() == {"foo":{}}, MyDoc()
elif auth_type is list:
assert MyDoc() == {"foo":[]}
else:
assert MyDoc() == {"foo":None}, auth_type
def test_not_authorized_type(self):
for unauth_type in [set]:
failed = False
try:
class MyDoc(SchemaDocument):
structure = { "foo":[unauth_type] }
except StructureError, e:
self.assertEqual(str(e), "MyDoc: %s is not an authorized type" % unauth_type)
failed = True
self.assertEqual(failed, True)
failed = False
try:
class MyDoc(SchemaDocument):
structure = { "foo":(unauth_type) }
except StructureError, e:
self.assertEqual(str(e), "MyDoc: %s is not an authorized type" % unauth_type)
failed = True
self.assertEqual(failed, True)
failed = False
try:
class MyDoc2(SchemaDocument):
structure = { 'foo':[{int:unauth_type }]}
except StructureError, e:
self.assertEqual(str(e), "MyDoc2: %s is not an authorized type" % unauth_type)
failed = True
开发者ID:ThoughtLeadr,项目名称:mongokit,代码行数:48,代码来源:test_types.py
示例20: database
def database(self):
print('Loading database')
self.connection = Connection()
self.connection.register([World])
self.connection.register([Island])
self.connection.register([Event])
self.connection.register([Player])
开发者ID:darrenmoore,项目名称:Settlers,代码行数:7,代码来源:game.py
注:本文中的mongokit.Connection类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论