本文整理汇总了Python中mediagoblin.db.base.Session类的典型用法代码示例。如果您正苦于以下问题:Python Session类的具体用法?Python Session怎么用?Python Session使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Session类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_connection_and_db_from_config
def setup_connection_and_db_from_config(app_config, migrations=False, app=None):
engine = create_engine(app_config['sql_engine'])
# @@: Maybe make a weak-ref so an engine can get garbage
# collected? Not that we expect to make a lot of MediaGoblinApp
# instances in a single process...
engine.app = app
# Enable foreign key checking for sqlite
if app_config['sql_engine'].startswith('sqlite://'):
if migrations:
event.listen(engine, 'connect',
_sqlite_disable_fk_pragma_on_connect)
else:
event.listen(engine, 'connect', _sqlite_fk_pragma_on_connect)
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
if DISABLE_GLOBALS:
return DatabaseManager(engine)
else:
Session.configure(bind=engine)
return DatabaseMaster(engine)
开发者ID:ausbin,项目名称:mediagoblin,代码行数:25,代码来源:open.py
示例2: import_file
def import_file(self, media):
try:
media_type, media_manager = (
#get_media_type_and_manager(media.filename))
type_match_handler(media,media.filename))
except FileTypeNotSupported:
print u"File type not supported: {0}".format(media.filename)
return
entry = self.db.MediaEntry()
entry.media_type = unicode(media_type)
entry.title = unicode(
os.path.basename(os.path.splitext(media.filename)[0]))
entry.uploader = 1
# Process the user's folksonomy "tags"
entry.tags = convert_to_tag_list_of_dicts("")
# Generate a slug from the title
entry.generate_slug()
task_id = unicode(uuid.uuid4())
entry.queued_media_file = media.filename.split("/")
entry.queued_task_id = task_id
try:
entry.save()
entry_id = entry.id
run_process_media(entry)
Session.commit()
return entry_id
except Exception:
Session.rollback()
raise
开发者ID:Xarthisius,项目名称:gmg_localfiles,代码行数:32,代码来源:import_files.py
示例3: fixture_add_user
def fixture_add_user(username=u'chris', password=u'toast',
active_user=True, wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = User.query.filter_by(username=username).first()
if test_user is None:
test_user = User()
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
test_user.pw_hash = gen_password_hash(password)
if active_user:
test_user.email_verified = True
test_user.status = u'active'
test_user.wants_comment_notification = wants_comment_notification
test_user.save()
# Reload
test_user = User.query.filter_by(username=username).first()
# ... and detach from session:
Session.expunge(test_user)
return test_user
开发者ID:RichoHan,项目名称:MediaGoblin,代码行数:25,代码来源:tools.py
示例4: fixture_add_comment_report
def fixture_add_comment_report(comment=None, reported_user=None,
reporter=None, created=None, report_content=None):
if comment is None:
comment = fixture_add_comment()
if reported_user is None:
reported_user = fixture_add_user()
if reporter is None:
reporter = fixture_add_user()
if created is None:
created=datetime.now()
if report_content is None:
report_content = \
'Auto-generated test report'
comment_report = CommentReport(comment=comment,
reported_user = reported_user,
reporter = reporter,
created = created,
report_content=report_content)
comment_report.save()
Session.expunge(comment_report)
return comment_report
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:29,代码来源:tools.py
示例5: fixture_add_comment
def fixture_add_comment(author=None, media_entry=None, comment=None):
if author is None:
author = fixture_add_user().id
if media_entry is None:
media_entry = fixture_media_entry()
if comment is None:
comment = \
'Auto-generated test comment by user #{0} on media #{0}'.format(
author, media_entry)
text_comment = TextComment(
actor=author,
content=comment
)
text_comment.save()
comment_link = Comment()
comment_link.target = media_entry
comment_link.comment = text_comment
comment_link.save()
Session.expunge(comment_link)
return text_comment
开发者ID:ausbin,项目名称:mediagoblin,代码行数:26,代码来源:tools.py
示例6: fixture_add_collection
def fixture_add_collection(name=u"My first Collection", user=None,
collection_type=Collection.USER_DEFINED_TYPE):
if user is None:
user = fixture_add_user()
coll = Collection.query.filter_by(
actor=user.id,
title=name,
type=collection_type
).first()
if coll is not None:
return coll
coll = Collection()
coll.actor = user.id
coll.title = name
coll.type = collection_type
coll.generate_slug()
coll.save()
# Reload
Session.refresh(coll)
# ... and detach from session:
Session.expunge(coll)
return coll
开发者ID:ausbin,项目名称:mediagoblin,代码行数:25,代码来源:tools.py
示例7: fixture_add_user
def fixture_add_user(username=u'chris', password=u'toast',
privileges=[], wants_comment_notification=True):
# Reuse existing user or create a new one
test_user = LocalUser.query.filter(LocalUser.username==username).first()
if test_user is None:
test_user = LocalUser()
test_user.username = username
test_user.email = username + u'@example.com'
if password is not None:
test_user.pw_hash = gen_password_hash(password)
test_user.wants_comment_notification = wants_comment_notification
for privilege in privileges:
query = Privilege.query.filter(Privilege.privilege_name==privilege)
if query.count():
test_user.all_privileges.append(query.one())
test_user.save()
# Reload - The `with_polymorphic` needs to be there to eagerly load
# the attributes on the LocalUser as this can't be done post detachment.
user_query = LocalUser.query.with_polymorphic(LocalUser)
test_user = user_query.filter(LocalUser.username==username).first()
# ... and detach from session:
Session.expunge(test_user)
return test_user
开发者ID:ausbin,项目名称:mediagoblin,代码行数:27,代码来源:tools.py
示例8: _test_authentication
def _test_authentication():
template.clear_test_template_context()
res = ldap_plugin_app.post(
'/auth/ldap/login/',
{'username': u'chris',
'password': u'toast'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
assert register_form.username.data == u'chris'
assert register_form.email.data == u'[email protected]'
template.clear_test_template_context()
res = ldap_plugin_app.post(
'/auth/ldap/register/',
{'username': u'chris',
'email': u'[email protected]'})
res.follow()
assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT
# Try to register with same email and username
template.clear_test_template_context()
res = ldap_plugin_app.post(
'/auth/ldap/register/',
{'username': u'chris',
'email': u'[email protected]'})
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
assert register_form.username.errors == [u'Sorry, a user with that name already exists.']
# Log out
ldap_plugin_app.get('/auth/logout/')
# Get user and detach from session
test_user = mg_globals.database.User.query.filter_by(
username=u'chris').first()
Session.expunge(test_user)
# Log back in
template.clear_test_template_context()
res = ldap_plugin_app.post(
'/auth/ldap/login/',
{'username': u'chris',
'password': u'toast'})
res.follow()
assert urlparse.urlsplit(res.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
assert session['user_id'] == unicode(test_user.id)
开发者ID:commonsmachinery,项目名称:mediagoblin,代码行数:59,代码来源:test_ldap.py
示例9: user_upload_limits
def user_upload_limits(self, uploaded=None, upload_limit=None):
our_user = self.our_user()
if uploaded:
our_user.uploaded = uploaded
if upload_limit:
our_user.upload_limit = upload_limit
our_user.save()
Session.expunge(our_user)
开发者ID:piratas,项目名称:biblioteca,代码行数:10,代码来源:test_submission.py
示例10: clean_orphan_tags
def clean_orphan_tags(commit=True):
"""Search for unused MediaTags and delete them"""
q1 = Session.query(Tag).outerjoin(MediaTag).filter(MediaTag.id==None)
for t in q1:
Session.delete(t)
# The "let the db do all the work" version:
# q1 = Session.query(Tag.id).outerjoin(MediaTag).filter(MediaTag.id==None)
# q2 = Session.query(Tag).filter(Tag.id.in_(q1))
# q2.delete(synchronize_session = False)
if commit:
Session.commit()
开发者ID:praveen97uma,项目名称:goblin,代码行数:11,代码来源:util.py
示例11: fixture_add_comment_notification
def fixture_add_comment_notification(entry_id, subject_id, user_id,
seen=False):
cn = CommentNotification(user_id=user_id,
seen=seen,
subject_id=subject_id)
cn.save()
cn = CommentNotification.query.filter_by(id=cn.id).first()
Session.expunge(cn)
return cn
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:12,代码来源:tools.py
示例12: get_app
def get_app(request, paste_config=None, mgoblin_config=None):
"""Create a MediaGoblin app for testing.
Args:
- request: Not an http request, but a pytest fixture request. We
use this to make temporary directories that pytest
automatically cleans up as needed.
- paste_config: particular paste config used by this application.
- mgoblin_config: particular mediagoblin config used by this
application.
"""
paste_config = paste_config or TEST_SERVER_CONFIG
mgoblin_config = mgoblin_config or TEST_APP_CONFIG
# This is the directory we're copying the paste/mgoblin config stuff into
run_dir = request.config._tmpdirhandler.mktemp(
'mgoblin_app', numbered=True)
user_dev_dir = run_dir.mkdir('user_dev').strpath
new_paste_config = run_dir.join('paste.ini').strpath
new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
shutil.copyfile(paste_config, new_paste_config)
shutil.copyfile(mgoblin_config, new_mgoblin_config)
Session.rollback()
Session.remove()
# install user_dev directories
for directory in USER_DEV_DIRECTORIES_TO_SETUP:
full_dir = os.path.join(user_dev_dir, directory)
os.makedirs(full_dir)
# Get app config
global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
app_config = global_config['mediagoblin']
# Run database setup/migrations
# @@: The *only* test that doesn't pass if we remove this is in
# test_persona.py... why?
run_dbupdate(app_config, global_config)
# setup app and return
test_app = loadapp(
'config:' + new_paste_config)
# Insert the TestingMeddleware, which can do some
# sanity checks on every request/response.
# Doing it this way is probably not the cleanest way.
# We'll fix it, when we have plugins!
mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))
app = TestApp(test_app)
return app
开发者ID:ausbin,项目名称:mediagoblin,代码行数:53,代码来源:tools.py
示例13: test_user_deletes_other_comments
def test_user_deletes_other_comments(test_app):
user_a = fixture_add_user(u"chris_a")
user_b = fixture_add_user(u"chris_b")
media_a = fixture_media_entry(uploader=user_a.id, save=False,
expunge=False, fake_upload=False)
media_b = fixture_media_entry(uploader=user_b.id, save=False,
expunge=False, fake_upload=False)
Session.add(media_a)
Session.add(media_b)
Session.flush()
# Create all 4 possible comments:
for u_id in (user_a.id, user_b.id):
for m_id in (media_a.id, media_b.id):
cmt = MediaComment()
cmt.media_entry = m_id
cmt.author = u_id
cmt.content = u"Some Comment"
Session.add(cmt)
Session.flush()
usr_cnt1 = User.query.count()
med_cnt1 = MediaEntry.query.count()
cmt_cnt1 = MediaComment.query.count()
User.query.get(user_a.id).delete(commit=False)
usr_cnt2 = User.query.count()
med_cnt2 = MediaEntry.query.count()
cmt_cnt2 = MediaComment.query.count()
# One user deleted
assert usr_cnt2 == usr_cnt1 - 1
# One media gone
assert med_cnt2 == med_cnt1 - 1
# Three of four comments gone.
assert cmt_cnt2 == cmt_cnt1 - 3
User.query.get(user_b.id).delete()
usr_cnt2 = User.query.count()
med_cnt2 = MediaEntry.query.count()
cmt_cnt2 = MediaComment.query.count()
# All users gone
assert usr_cnt2 == usr_cnt1 - 2
# All media gone
assert med_cnt2 == med_cnt1 - 2
# All comments gone
assert cmt_cnt2 == cmt_cnt1 - 4
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:52,代码来源:test_misc.py
示例14: test_media_data_init
def test_media_data_init(test_app):
Session.rollback()
Session.remove()
media = MediaEntry()
media.media_type = u"mediagoblin.media_types.image"
assert media.media_data is None
media.media_data_init()
assert media.media_data is not None
obj_in_session = 0
for obj in Session():
obj_in_session += 1
print(repr(obj))
assert obj_in_session == 0
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:13,代码来源:test_modelmethods.py
示例15: test_media_deletes_broken_attachment
def test_media_deletes_broken_attachment(test_app):
user_a = fixture_add_user(u"chris_a")
media = fixture_media_entry(uploader=user_a.id, save=False)
media.attachment_files.append(dict(
name=u"some name",
filepath=[u"does", u"not", u"exist"],
))
Session.add(media)
Session.flush()
MediaEntry.query.get(media.id).delete()
User.query.get(user_a.id).delete()
开发者ID:praveen97uma,项目名称:goblin,代码行数:13,代码来源:test_misc.py
示例16: _test_new_user
def _test_new_user():
openid_plugin_app.post(
'/auth/openid/login/', {
'openid': u'http://real.myopenid.com'})
# Right place?
assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
register_form = context['register_form']
# Register User
res = openid_plugin_app.post(
'/auth/openid/register/', {
'openid': register_form.openid.data,
'username': u'chris',
'email': u'[email protected]'})
res.follow()
# Correct place?
assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT
# No need to test if user is in logged in and verification email
# awaits, since openid uses the register_user function which is
# tested in test_auth
# Logout User
openid_plugin_app.get('/auth/logout')
# Get user and detach from session
test_user = mg_globals.database.LocalUser.query.filter(
LocalUser.username==u'chris'
).first()
Session.expunge(test_user)
# Log back in
# Could not get it to work by 'POST'ing to /auth/openid/login/
template.clear_test_template_context()
res = openid_plugin_app.post(
'/auth/openid/login/finish/', {
'openid': u'http://real.myopenid.com'})
res.follow()
assert urlparse.urlsplit(res.location)[2] == '/'
assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT
# Make sure user is in the session
context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
session = context['request'].session
assert session['user_id'] == six.text_type(test_user.id)
开发者ID:ausbin,项目名称:mediagoblin,代码行数:50,代码来源:test_openid.py
示例17: user_upload_limits
def user_upload_limits(self, uploaded=None, upload_limit=None):
if uploaded:
self.test_user.uploaded = uploaded
if upload_limit:
self.test_user.upload_limit = upload_limit
self.test_user.save()
# Reload
self.test_user = User.query.filter_by(
username=self.test_user.username
).first()
# ... and detach from session:
Session.expunge(self.test_user)
开发者ID:commonsmachinery,项目名称:mediagoblin,代码行数:15,代码来源:test_submission.py
示例18: check_db_up_to_date
def check_db_up_to_date():
"""Check if the database is up to date and quit if not"""
dbdatas = gather_database_data(mgg.global_config.get('plugins', {}).keys())
for dbdata in dbdatas:
session = Session()
try:
migration_manager = dbdata.make_migration_manager(session)
if migration_manager.database_current_migration is None or \
migration_manager.migrations_to_run():
sys.exit("Your database is not up to date. Please run "
"'gmg dbupdate' before starting MediaGoblin.")
finally:
Session.rollback()
Session.remove()
开发者ID:ausbin,项目名称:mediagoblin,代码行数:15,代码来源:util.py
示例19: take_punitive_actions
def take_punitive_actions(request, form, report, user):
message_body = ""
# The bulk of this action is running through all of the different
# punitive actions that a moderator could take.
if u"takeaway" in form.action_to_resolve.data:
for privilege_name in form.take_away_privileges.data:
take_away_privileges(user.username, privilege_name)
form.resolution_content.data += _(u"\n{mod} took away {user}'s {privilege} privileges.").format(
mod=request.user.username, user=user.username, privilege=privilege_name
)
# If the moderator elects to ban the user, a new instance of user_ban
# will be created.
if u"userban" in form.action_to_resolve.data:
user_ban = ban_user(
form.targeted_user.data, expiration_date=form.user_banned_until.data, reason=form.why_user_was_banned.data
)
Session.add(user_ban)
form.resolution_content.data += _(u"\n{mod} banned user {user} {expiration_date}.").format(
mod=request.user.username,
user=user.username,
expiration_date=(
_("until {date}").format(date=form.user_banned_until.data)
if form.user_banned_until.data
else _("indefinitely")
),
)
# If the moderator elects to send a warning message. An email will be
# sent to the email address given at sign up
if u"sendmessage" in form.action_to_resolve.data:
message_body = form.message_to_user.data
form.resolution_content.data += _(u"\n{mod} sent a warning email to the {user}.").format(
mod=request.user.username, user=user.username
)
if u"delete" in form.action_to_resolve.data and report.is_comment_report():
deleted_comment = report.obj()
Session.delete(deleted_comment)
form.resolution_content.data += _(u"\n{mod} deleted the comment.").format(mod=request.user.username)
elif u"delete" in form.action_to_resolve.data and report.is_media_entry_report():
deleted_media = report.obj()
deleted_media.delete()
form.resolution_content.data += _(u"\n{mod} deleted the media entry.").format(mod=request.user.username)
report.archive(resolver_id=request.user.id, resolved=datetime.now(), result=form.resolution_content.data)
Session.add(report)
Session.commit()
if message_body:
send_email(
mg_globals.app_config["email_sender_address"],
[user.email],
_("Warning from") + "- {moderator} ".format(moderator=request.user.username),
message_body,
)
return redirect(request, "mediagoblin.moderation.users_detail", user=user.username)
开发者ID:tofay,项目名称:mediagoblin,代码行数:58,代码来源:tools.py
示例20: check_media_slug_used
def check_media_slug_used(dummy_db, uploader_id, slug, ignore_m_id):
filt = (MediaEntry.uploader == uploader_id) \
& (MediaEntry.slug == slug)
if ignore_m_id is not None:
filt = filt & (MediaEntry.id != ignore_m_id)
does_exist = Session.query(MediaEntry.id).filter(filt).first() is not None
return does_exist
开发者ID:orblivion,项目名称:mediagoblin-quickstart-openshift,代码行数:7,代码来源:util.py
注:本文中的mediagoblin.db.base.Session类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论