• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python base.Session类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中mediagoblin.db.base.Session的典型用法代码示例。如果您正苦于以下问题:Python Session类的具体用法?Python Session怎么用?Python Session使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了Session类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setup_connection_and_db_from_config

def setup_connection_and_db_from_config(app_config, migrations=False, app=None):
    engine = create_engine(app_config['sql_engine'])

    # @@: Maybe make a weak-ref so an engine can get garbage
    # collected?  Not that we expect to make a lot of MediaGoblinApp
    # instances in a single process...
    engine.app = app

    # Enable foreign key checking for sqlite
    if app_config['sql_engine'].startswith('sqlite://'):
        if migrations:
            event.listen(engine, 'connect',
                         _sqlite_disable_fk_pragma_on_connect)
        else:
            event.listen(engine, 'connect', _sqlite_fk_pragma_on_connect)

    # logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

    if DISABLE_GLOBALS:
        return DatabaseManager(engine)

    else:
        Session.configure(bind=engine)

        return DatabaseMaster(engine)
开发者ID:ausbin,项目名称:mediagoblin,代码行数:25,代码来源:open.py


示例2: import_file

    def import_file(self, media):
        try:
            media_type, media_manager = (
                #get_media_type_and_manager(media.filename))
                type_match_handler(media,media.filename))
        except FileTypeNotSupported:
            print u"File type not supported: {0}".format(media.filename)
            return
        entry = self.db.MediaEntry()
        entry.media_type = unicode(media_type)
        entry.title = unicode(
            os.path.basename(os.path.splitext(media.filename)[0]))

        entry.uploader = 1
        # Process the user's folksonomy "tags"
        entry.tags = convert_to_tag_list_of_dicts("")
        # Generate a slug from the title
        entry.generate_slug()

        task_id = unicode(uuid.uuid4())
        entry.queued_media_file = media.filename.split("/")
        entry.queued_task_id = task_id

        try:
            entry.save()
            entry_id = entry.id
            run_process_media(entry)
            Session.commit()
            return entry_id
        except Exception:
            Session.rollback()
            raise
开发者ID:Xarthisius,项目名称:gmg_localfiles,代码行数:32,代码来源:import_files.py


示例3: fixture_add_user

def fixture_add_user(username=u'chris', password=u'toast',
                     active_user=True, wants_comment_notification=True):
    # Reuse existing user or create a new one
    test_user = User.query.filter_by(username=username).first()
    if test_user is None:
        test_user = User()
    test_user.username = username
    test_user.email = username + u'@example.com'
    if password is not None:
        test_user.pw_hash = gen_password_hash(password)
    if active_user:
        test_user.email_verified = True
        test_user.status = u'active'

    test_user.wants_comment_notification = wants_comment_notification

    test_user.save()

    # Reload
    test_user = User.query.filter_by(username=username).first()

    # ... and detach from session:
    Session.expunge(test_user)

    return test_user
开发者ID:RichoHan,项目名称:MediaGoblin,代码行数:25,代码来源:tools.py


示例4: fixture_add_comment_report

def fixture_add_comment_report(comment=None, reported_user=None,
        reporter=None, created=None, report_content=None):
    if comment is None:
        comment = fixture_add_comment()

    if reported_user is None:
        reported_user = fixture_add_user()

    if reporter is None:
        reporter = fixture_add_user()

    if created is None:
        created=datetime.now()

    if report_content is None:
        report_content = \
            'Auto-generated test report'

    comment_report = CommentReport(comment=comment,
        reported_user = reported_user,
        reporter = reporter,
        created = created,
        report_content=report_content)

    comment_report.save()

    Session.expunge(comment_report)

    return comment_report
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:29,代码来源:tools.py


示例5: fixture_add_comment

def fixture_add_comment(author=None, media_entry=None, comment=None):
    if author is None:
        author = fixture_add_user().id

    if media_entry is None:
        media_entry = fixture_media_entry()

    if comment is None:
        comment = \
            'Auto-generated test comment by user #{0} on media #{0}'.format(
                author, media_entry)

    text_comment = TextComment(
        actor=author,
        content=comment
    )
    text_comment.save()

    comment_link = Comment()
    comment_link.target = media_entry
    comment_link.comment = text_comment
    comment_link.save()

    Session.expunge(comment_link)

    return text_comment
开发者ID:ausbin,项目名称:mediagoblin,代码行数:26,代码来源:tools.py


示例6: fixture_add_collection

def fixture_add_collection(name=u"My first Collection", user=None,
                           collection_type=Collection.USER_DEFINED_TYPE):
    if user is None:
        user = fixture_add_user()
    coll = Collection.query.filter_by(
        actor=user.id,
        title=name,
        type=collection_type
    ).first()
    if coll is not None:
        return coll
    coll = Collection()
    coll.actor = user.id
    coll.title = name
    coll.type = collection_type
    coll.generate_slug()
    coll.save()

    # Reload
    Session.refresh(coll)

    # ... and detach from session:
    Session.expunge(coll)

    return coll
开发者ID:ausbin,项目名称:mediagoblin,代码行数:25,代码来源:tools.py


示例7: fixture_add_user

def fixture_add_user(username=u'chris', password=u'toast',
                     privileges=[], wants_comment_notification=True):
    # Reuse existing user or create a new one
    test_user = LocalUser.query.filter(LocalUser.username==username).first()
    if test_user is None:
        test_user = LocalUser()
    test_user.username = username
    test_user.email = username + u'@example.com'
    if password is not None:
        test_user.pw_hash = gen_password_hash(password)
    test_user.wants_comment_notification = wants_comment_notification
    for privilege in privileges:
        query = Privilege.query.filter(Privilege.privilege_name==privilege)
        if query.count():
            test_user.all_privileges.append(query.one())

    test_user.save()

    # Reload - The `with_polymorphic` needs to be there to eagerly load
    # the attributes on the LocalUser as this can't be done post detachment.
    user_query = LocalUser.query.with_polymorphic(LocalUser)
    test_user = user_query.filter(LocalUser.username==username).first()

    # ... and detach from session:
    Session.expunge(test_user)

    return test_user
开发者ID:ausbin,项目名称:mediagoblin,代码行数:27,代码来源:tools.py


示例8: _test_authentication

    def _test_authentication():
        template.clear_test_template_context()
        res = ldap_plugin_app.post(
            '/auth/ldap/login/',
            {'username': u'chris',
             'password': u'toast'})

        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
        register_form = context['register_form']

        assert register_form.username.data == u'chris'
        assert register_form.email.data == u'[email protected]'

        template.clear_test_template_context()
        res = ldap_plugin_app.post(
            '/auth/ldap/register/',
            {'username': u'chris',
             'email': u'[email protected]'})
        res.follow()

        assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
        assert 'mediagoblin/user_pages/user.html' in template.TEMPLATE_TEST_CONTEXT

        # Try to register with same email and username
        template.clear_test_template_context()
        res = ldap_plugin_app.post(
            '/auth/ldap/register/',
            {'username': u'chris',
             'email': u'[email protected]'})

        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
        register_form = context['register_form']

        assert register_form.email.errors == [u'Sorry, a user with that email address already exists.']
        assert register_form.username.errors == [u'Sorry, a user with that name already exists.']

        # Log out
        ldap_plugin_app.get('/auth/logout/')

        # Get user and detach from session
        test_user = mg_globals.database.User.query.filter_by(
            username=u'chris').first()
        Session.expunge(test_user)

        # Log back in
        template.clear_test_template_context()
        res = ldap_plugin_app.post(
            '/auth/ldap/login/',
            {'username': u'chris',
             'password': u'toast'})
        res.follow()

        assert urlparse.urlsplit(res.location)[2] == '/'
        assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT

        # Make sure user is in the session
        context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
        session = context['request'].session
        assert session['user_id'] == unicode(test_user.id)
开发者ID:commonsmachinery,项目名称:mediagoblin,代码行数:59,代码来源:test_ldap.py


示例9: user_upload_limits

    def user_upload_limits(self, uploaded=None, upload_limit=None):
        our_user = self.our_user()

        if uploaded:
            our_user.uploaded = uploaded
        if upload_limit:
            our_user.upload_limit = upload_limit

        our_user.save()
        Session.expunge(our_user)
开发者ID:piratas,项目名称:biblioteca,代码行数:10,代码来源:test_submission.py


示例10: clean_orphan_tags

def clean_orphan_tags(commit=True):
    """Search for unused MediaTags and delete them"""
    q1 = Session.query(Tag).outerjoin(MediaTag).filter(MediaTag.id==None)
    for t in q1:
        Session.delete(t)
    # The "let the db do all the work" version:
    # q1 = Session.query(Tag.id).outerjoin(MediaTag).filter(MediaTag.id==None)
    # q2 = Session.query(Tag).filter(Tag.id.in_(q1))
    # q2.delete(synchronize_session = False)
    if commit:
        Session.commit()
开发者ID:praveen97uma,项目名称:goblin,代码行数:11,代码来源:util.py


示例11: fixture_add_comment_notification

def fixture_add_comment_notification(entry_id, subject_id, user_id,
                                     seen=False):
    cn = CommentNotification(user_id=user_id,
                             seen=seen,
                             subject_id=subject_id)
    cn.save()

    cn = CommentNotification.query.filter_by(id=cn.id).first()

    Session.expunge(cn)

    return cn
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:12,代码来源:tools.py


示例12: get_app

def get_app(request, paste_config=None, mgoblin_config=None):
    """Create a MediaGoblin app for testing.

    Args:
     - request: Not an http request, but a pytest fixture request.  We
       use this to make temporary directories that pytest
       automatically cleans up as needed.
     - paste_config: particular paste config used by this application.
     - mgoblin_config: particular mediagoblin config used by this
       application.
    """
    paste_config = paste_config or TEST_SERVER_CONFIG
    mgoblin_config = mgoblin_config or TEST_APP_CONFIG

    # This is the directory we're copying the paste/mgoblin config stuff into
    run_dir = request.config._tmpdirhandler.mktemp(
        'mgoblin_app', numbered=True)
    user_dev_dir = run_dir.mkdir('user_dev').strpath

    new_paste_config = run_dir.join('paste.ini').strpath
    new_mgoblin_config = run_dir.join('mediagoblin.ini').strpath
    shutil.copyfile(paste_config, new_paste_config)
    shutil.copyfile(mgoblin_config, new_mgoblin_config)

    Session.rollback()
    Session.remove()

    # install user_dev directories
    for directory in USER_DEV_DIRECTORIES_TO_SETUP:
        full_dir = os.path.join(user_dev_dir, directory)
        os.makedirs(full_dir)

    # Get app config
    global_config, validation_result = read_mediagoblin_config(new_mgoblin_config)
    app_config = global_config['mediagoblin']

    # Run database setup/migrations
    # @@: The *only* test that doesn't pass if we remove this is in
    #   test_persona.py... why?
    run_dbupdate(app_config, global_config)

    # setup app and return
    test_app = loadapp(
        'config:' + new_paste_config)

    # Insert the TestingMeddleware, which can do some
    # sanity checks on every request/response.
    # Doing it this way is probably not the cleanest way.
    # We'll fix it, when we have plugins!
    mg_globals.app.meddleware.insert(0, TestingMeddleware(mg_globals.app))

    app = TestApp(test_app)
    return app
开发者ID:ausbin,项目名称:mediagoblin,代码行数:53,代码来源:tools.py


示例13: test_user_deletes_other_comments

def test_user_deletes_other_comments(test_app):
    user_a = fixture_add_user(u"chris_a")
    user_b = fixture_add_user(u"chris_b")

    media_a = fixture_media_entry(uploader=user_a.id, save=False,
                                  expunge=False, fake_upload=False)
    media_b = fixture_media_entry(uploader=user_b.id, save=False,
                                  expunge=False, fake_upload=False)
    Session.add(media_a)
    Session.add(media_b)
    Session.flush()

    # Create all 4 possible comments:
    for u_id in (user_a.id, user_b.id):
        for m_id in (media_a.id, media_b.id):
            cmt = MediaComment()
            cmt.media_entry = m_id
            cmt.author = u_id
            cmt.content = u"Some Comment"
            Session.add(cmt)

    Session.flush()

    usr_cnt1 = User.query.count()
    med_cnt1 = MediaEntry.query.count()
    cmt_cnt1 = MediaComment.query.count()

    User.query.get(user_a.id).delete(commit=False)

    usr_cnt2 = User.query.count()
    med_cnt2 = MediaEntry.query.count()
    cmt_cnt2 = MediaComment.query.count()

    # One user deleted
    assert usr_cnt2 == usr_cnt1 - 1
    # One media gone
    assert med_cnt2 == med_cnt1 - 1
    # Three of four comments gone.
    assert cmt_cnt2 == cmt_cnt1 - 3

    User.query.get(user_b.id).delete()

    usr_cnt2 = User.query.count()
    med_cnt2 = MediaEntry.query.count()
    cmt_cnt2 = MediaComment.query.count()

    # All users gone
    assert usr_cnt2 == usr_cnt1 - 2
    # All media gone
    assert med_cnt2 == med_cnt1 - 2
    # All comments gone
    assert cmt_cnt2 == cmt_cnt1 - 4
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:52,代码来源:test_misc.py


示例14: test_media_data_init

def test_media_data_init(test_app):
    Session.rollback()
    Session.remove()
    media = MediaEntry()
    media.media_type = u"mediagoblin.media_types.image"
    assert media.media_data is None
    media.media_data_init()
    assert media.media_data is not None
    obj_in_session = 0
    for obj in Session():
        obj_in_session += 1
        print(repr(obj))
    assert obj_in_session == 0
开发者ID:goblinrefuge,项目名称:goblinrefuge-mediagoblin,代码行数:13,代码来源:test_modelmethods.py


示例15: test_media_deletes_broken_attachment

def test_media_deletes_broken_attachment(test_app):
    user_a = fixture_add_user(u"chris_a")

    media = fixture_media_entry(uploader=user_a.id, save=False)
    media.attachment_files.append(dict(
            name=u"some name",
            filepath=[u"does", u"not", u"exist"],
            ))
    Session.add(media)
    Session.flush()

    MediaEntry.query.get(media.id).delete()
    User.query.get(user_a.id).delete()
开发者ID:praveen97uma,项目名称:goblin,代码行数:13,代码来源:test_misc.py


示例16: _test_new_user

        def _test_new_user():
            openid_plugin_app.post(
                '/auth/openid/login/', {
                    'openid': u'http://real.myopenid.com'})

            # Right place?
            assert 'mediagoblin/auth/register.html' in template.TEMPLATE_TEST_CONTEXT
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/auth/register.html']
            register_form = context['register_form']

            # Register User
            res = openid_plugin_app.post(
                '/auth/openid/register/', {
                    'openid': register_form.openid.data,
                    'username': u'chris',
                    'email': u'[email protected]'})
            res.follow()

            # Correct place?
            assert urlparse.urlsplit(res.location)[2] == '/u/chris/'
            assert 'mediagoblin/user_pages/user_nonactive.html' in template.TEMPLATE_TEST_CONTEXT

            # No need to test if user is in logged in and verification email
            # awaits, since openid uses the register_user function which is
            # tested in test_auth

            # Logout User
            openid_plugin_app.get('/auth/logout')

            # Get user and detach from session
            test_user = mg_globals.database.LocalUser.query.filter(
                LocalUser.username==u'chris'
            ).first()
            Session.expunge(test_user)

            # Log back in
            # Could not get it to work by 'POST'ing to /auth/openid/login/
            template.clear_test_template_context()
            res = openid_plugin_app.post(
                '/auth/openid/login/finish/', {
                    'openid': u'http://real.myopenid.com'})
            res.follow()

            assert urlparse.urlsplit(res.location)[2] == '/'
            assert 'mediagoblin/root.html' in template.TEMPLATE_TEST_CONTEXT

            # Make sure user is in the session
            context = template.TEMPLATE_TEST_CONTEXT['mediagoblin/root.html']
            session = context['request'].session
            assert session['user_id'] == six.text_type(test_user.id)
开发者ID:ausbin,项目名称:mediagoblin,代码行数:50,代码来源:test_openid.py


示例17: user_upload_limits

    def user_upload_limits(self, uploaded=None, upload_limit=None):
        if uploaded:
            self.test_user.uploaded = uploaded
        if upload_limit:
            self.test_user.upload_limit = upload_limit

        self.test_user.save()

        # Reload
        self.test_user = User.query.filter_by(
            username=self.test_user.username
        ).first()

        # ... and detach from session:
        Session.expunge(self.test_user)
开发者ID:commonsmachinery,项目名称:mediagoblin,代码行数:15,代码来源:test_submission.py


示例18: check_db_up_to_date

def check_db_up_to_date():
    """Check if the database is up to date and quit if not"""
    dbdatas = gather_database_data(mgg.global_config.get('plugins', {}).keys())

    for dbdata in dbdatas:
        session = Session()
        try:
            migration_manager = dbdata.make_migration_manager(session)
            if migration_manager.database_current_migration is None or \
                    migration_manager.migrations_to_run():
                sys.exit("Your database is not up to date. Please run "
                         "'gmg dbupdate' before starting MediaGoblin.")
        finally:
            Session.rollback()
            Session.remove()
开发者ID:ausbin,项目名称:mediagoblin,代码行数:15,代码来源:util.py


示例19: take_punitive_actions

def take_punitive_actions(request, form, report, user):
    message_body = ""

    # The bulk of this action is running through all of the different
    # punitive actions that a moderator could take.
    if u"takeaway" in form.action_to_resolve.data:
        for privilege_name in form.take_away_privileges.data:
            take_away_privileges(user.username, privilege_name)
            form.resolution_content.data += _(u"\n{mod} took away {user}'s {privilege} privileges.").format(
                mod=request.user.username, user=user.username, privilege=privilege_name
            )

    # If the moderator elects to ban the user, a new instance of user_ban
    # will be created.
    if u"userban" in form.action_to_resolve.data:
        user_ban = ban_user(
            form.targeted_user.data, expiration_date=form.user_banned_until.data, reason=form.why_user_was_banned.data
        )
        Session.add(user_ban)
        form.resolution_content.data += _(u"\n{mod} banned user {user} {expiration_date}.").format(
            mod=request.user.username,
            user=user.username,
            expiration_date=(
                _("until {date}").format(date=form.user_banned_until.data)
                if form.user_banned_until.data
                else _("indefinitely")
            ),
        )

    # If the moderator elects to send a warning message. An email will be
    # sent to the email address given at sign up
    if u"sendmessage" in form.action_to_resolve.data:
        message_body = form.message_to_user.data
        form.resolution_content.data += _(u"\n{mod} sent a warning email to the {user}.").format(
            mod=request.user.username, user=user.username
        )

    if u"delete" in form.action_to_resolve.data and report.is_comment_report():
        deleted_comment = report.obj()
        Session.delete(deleted_comment)
        form.resolution_content.data += _(u"\n{mod} deleted the comment.").format(mod=request.user.username)
    elif u"delete" in form.action_to_resolve.data and report.is_media_entry_report():
        deleted_media = report.obj()
        deleted_media.delete()
        form.resolution_content.data += _(u"\n{mod} deleted the media entry.").format(mod=request.user.username)
    report.archive(resolver_id=request.user.id, resolved=datetime.now(), result=form.resolution_content.data)

    Session.add(report)
    Session.commit()
    if message_body:
        send_email(
            mg_globals.app_config["email_sender_address"],
            [user.email],
            _("Warning from") + "- {moderator} ".format(moderator=request.user.username),
            message_body,
        )

    return redirect(request, "mediagoblin.moderation.users_detail", user=user.username)
开发者ID:tofay,项目名称:mediagoblin,代码行数:58,代码来源:tools.py


示例20: check_media_slug_used

def check_media_slug_used(dummy_db, uploader_id, slug, ignore_m_id):
    filt = (MediaEntry.uploader == uploader_id) \
        & (MediaEntry.slug == slug)
    if ignore_m_id is not None:
        filt = filt & (MediaEntry.id != ignore_m_id)
    does_exist = Session.query(MediaEntry.id).filter(filt).first() is not None
    return does_exist
开发者ID:orblivion,项目名称:mediagoblin-quickstart-openshift,代码行数:7,代码来源:util.py



注:本文中的mediagoblin.db.base.Session类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python migration_tools.inspect_table函数代码示例发布时间:2022-05-27
下一篇:
Python utils.find_file函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap