• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python core.get_session函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pybossa.core.get_session函数的典型用法代码示例。如果您正苦于以下问题:Python get_session函数的具体用法?Python get_session怎么用?Python get_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: n_available_tasks

def n_available_tasks(app_id, user_id=None, user_ip=None):
    """Returns the number of tasks for a given app a user can contribute to,
    based on the completion of the app tasks, and previous task_runs submitted
    by the user"""
    try:
        if user_id and not user_ip:
            query = text('''SELECT COUNT(id) AS n_tasks FROM task WHERE NOT EXISTS
                           (SELECT task_id FROM task_run WHERE
                           app_id=:app_id AND user_id=:user_id AND task_id=task.id)
                           AND app_id=:app_id AND state !='completed';''')
            session = get_session(db, bind='slave')
            result = session.execute(query, dict(app_id=app_id, user_id=user_id))
        else:
            if not user_ip:
                user_ip = '127.0.0.1'
            query = text('''SELECT COUNT(id) AS n_tasks FROM task WHERE NOT EXISTS
                           (SELECT task_id FROM task_run WHERE
                           app_id=:app_id AND user_ip=:user_ip AND task_id=task.id)
                           AND app_id=:app_id AND state !='completed';''')

            session = get_session(db, bind='slave')
            result = session.execute(query, dict(app_id=app_id, user_ip=user_ip))
        n_tasks = 0
        for row in result:
            n_tasks = row.n_tasks
        return n_tasks
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:31,代码来源:helpers.py


示例2: overall_progress

def overall_progress(app_id):
    """Returns the percentage of submitted Tasks Runs done when a task is
    completed"""
    try:
        sql = text('''SELECT task.id, n_answers,
                   COUNT(task_run.task_id) AS n_task_runs
                   FROM task LEFT OUTER JOIN task_run ON task.id=task_run.task_id
                   WHERE task.app_id=:app_id GROUP BY task.id''')
        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(app_id=app_id))
        n_expected_task_runs = 0
        n_task_runs = 0
        for row in results:
            tmp = row[2]
            if row[2] > row[1]:
                tmp = row[1]
            n_expected_task_runs += row[1]
            n_task_runs += tmp
        pct = float(0)
        if n_expected_task_runs != 0:
            pct = float(n_task_runs) / float(n_expected_task_runs)
        return (pct * 100)
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:27,代码来源:apps.py


示例3: hidden_apps

def hidden_apps(user_id):
    try:
        sql = text('''
                   SELECT app.id, app.name, app.short_name, app.description,
                   app.owner_id,
                   app.info
                   FROM app, task
                   WHERE app.id=task.app_id AND app.owner_id=:user_id AND
                   app.hidden=1 AND app.info LIKE('%task_presenter%')
                   GROUP BY app.id, app.name, app.short_name,
                   app.description,
                   app.info;''')
        apps_published = []
        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(user_id=user_id))
        for row in results:
            app = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       overall_progress=overall_progress(row.id),
                       n_tasks=n_tasks(row.id),
                       n_volunteers=n_volunteers(row.id),
                       info=json.loads(row.info))
            apps_published.append(app)
        return apps_published
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:30,代码来源:users.py


示例4: apps_contributed

def apps_contributed(user_id):
    try:
        sql = text('''
                   WITH apps_contributed as 
                        (SELECT DISTINCT(app_id) FROM task_run 
                         WHERE user_id=:user_id)
                   SELECT app.id, app.name, app.short_name, app.owner_id,
                   app.description, app.info FROM app, apps_contributed
                   WHERE app.id=apps_contributed.app_id ORDER BY app.name DESC;
                   ''')
        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(user_id=user_id))
        apps_contributed = []
        for row in results:
            app = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       overall_progress=overall_progress(row.id),
                       n_tasks=n_tasks(row.id),
                       n_volunteers=n_volunteers(row.id),
                       info=json.loads(row.info))
            apps_contributed.append(app)
        return apps_contributed
    except: # pragma: no cover
        session.rollback()
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:27,代码来源:users.py


示例5: get_featured

def get_featured(category, page=1, per_page=5):
    """Return a list of featured apps with a pagination"""
    try:
        count = n_featured()

        sql = text('''SELECT app.id, app.name, app.short_name, app.info, app.created,
                   app.description,
                   "user".fullname AS owner FROM app, featured, "user"
                   WHERE app.id=featured.app_id AND app.hidden=0
                   AND "user".id=app.owner_id GROUP BY app.id, "user".id
                   OFFSET(:offset) LIMIT(:limit);
                   ''')
        offset = (page - 1) * per_page

        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(limit=per_page, offset=offset))
        apps = []
        for row in results:
            app = dict(id=row.id, name=row.name, short_name=row.short_name,
                       created=row.created, description=row.description,
                       overall_progress=overall_progress(row.id),
                       last_activity=pretty_date(last_activity(row.id)),
                       last_activity_raw=last_activity(row.id),
                       owner=row.owner,
                       featured=row.id,
                       info=dict(json.loads(row.info)))
            apps.append(app)
        return apps, count
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:33,代码来源:apps.py


示例6: get_draft

def get_draft(category, page=1, per_page=5):
    """Return list of draft projects"""
    try:
        count = n_draft()

        sql = text('''SELECT app.id, app.name, app.short_name, app.created,
                   app.description, app.info, "user".fullname as owner
                   FROM "user", app LEFT JOIN task ON app.id=task.app_id
                   WHERE task.app_id IS NULL AND app.info NOT LIKE('%task_presenter%')
                   AND app.hidden=0
                   AND app.owner_id="user".id
                   OFFSET :offset
                   LIMIT :limit;''')

        offset = (page - 1) * per_page
        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(limit=per_page, offset=offset))
        apps = []
        for row in results:
            app = dict(id=row.id, name=row.name, short_name=row.short_name,
                       created=row.created,
                       description=row.description,
                       owner=row.owner,
                       last_activity=pretty_date(last_activity(row.id)),
                       last_activity_raw=last_activity(row.id),
                       overall_progress=overall_progress(row.id),
                       info=dict(json.loads(row.info)))
            apps.append(app)
        return apps, count
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:34,代码来源:apps.py


示例7: n_count

def n_count(category):
    """Count the number of apps in a given category"""
    try:
        sql = text('''
                   WITH uniq AS (
                   SELECT COUNT(app.id) FROM task, app
                   LEFT OUTER JOIN category ON app.category_id=category.id
                   WHERE
                   category.short_name=:category
                   AND app.hidden=0
                   AND app.info LIKE('%task_presenter%')
                   AND task.app_id=app.id
                   GROUP BY app.id)
                   SELECT COUNT(*) FROM uniq
                   ''')

        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(category=category))
        count = 0
        for row in results:
            count = row[0]
        return count
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:27,代码来源:apps.py


示例8: create

def create(taskrun=None):
    authorized = False
    try:
        session = get_session(db, bind='slave')
        if taskrun.user_ip:
            sql = text('''SELECT COUNT(task_run.id) AS n_task_runs FROM task_run
                          WHERE task_run.app_id=:app_id AND
                          task_run.task_id=:task_id AND
                          task_run.user_ip=:user_ip;''')
            results = session.execute(sql, dict(app_id=taskrun.app_id,
                                                task_id=taskrun.task_id,
                                                user_ip=taskrun.user_ip))
        elif taskrun.user_id:
            sql = text('''SELECT COUNT(task_run.id) AS n_task_runs FROM task_run
                          WHERE task_run.app_id=:app_id AND
                          task_run.task_id=:task_id AND
                          task_run.user_id=:user_id;''')
            results = session.execute(sql, dict(app_id=taskrun.app_id,
                                                task_id=taskrun.task_id,
                                                user_id=taskrun.user_id))
        else:
            return False
        n_task_runs = 0
        for row in results:
            n_task_runs = row.n_task_runs
        authorized = (n_task_runs <= 0)
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
    if not authorized:
        raise abort(403)
    return authorized
开发者ID:geoapi,项目名称:pybossa,代码行数:34,代码来源:taskrun.py


示例9: get_incremental_task

def get_incremental_task(app_id, user_id=None, user_ip=None, n_answers=30, offset=0):
    """
    Get a new task for a given project with its last given answer.
    It is an important strategy when dealing with large tasks, as
    transcriptions.
    """
    try:
        session = get_session(db, bind='slave')
        candidate_tasks = get_candidate_tasks(app_id, user_id, user_ip,
                                              n_answers, offset=0)
        total_remaining = len(candidate_tasks)
        if total_remaining == 0:
            return None
        rand = random.randrange(0, total_remaining)
        task = candidate_tasks[rand]
        #Find last answer for the task
        q = session.query(TaskRun)\
              .filter(TaskRun.task_id == task.id)\
              .order_by(TaskRun.finish_time.desc())
        last_task_run = q.first()
        if last_task_run:
            task.info['last_answer'] = last_task_run.info
            #TODO: As discussed in GitHub #53
            # it is necessary to create a lock in the task!
        return task
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:30,代码来源:sched.py


示例10: get_candidate_tasks

def get_candidate_tasks(app_id, user_id=None, user_ip=None, n_answers=30, offset=0):
    """Gets all available tasks for a given project and user"""
    try:
        session = get_session(db, bind='slave')
        rows = None
        if user_id and not user_ip:
            query = text('''
                         SELECT id FROM task WHERE NOT EXISTS
                         (SELECT task_id FROM task_run WHERE
                         app_id=:app_id AND user_id=:user_id AND task_id=task.id)
                         AND app_id=:app_id AND state !='completed'
                         ORDER BY priority_0 DESC, id ASC LIMIT 10''')
            rows = session.execute(query, dict(app_id=app_id, user_id=user_id))
        else:
            if not user_ip:
                user_ip = '127.0.0.1'
            query = text('''
                         SELECT id FROM task WHERE NOT EXISTS
                         (SELECT task_id FROM task_run WHERE
                         app_id=:app_id AND user_ip=:user_ip AND task_id=task.id)
                         AND app_id=:app_id AND state !='completed'
                         ORDER BY priority_0 DESC, id ASC LIMIT 10''')
            rows = session.execute(query, dict(app_id=app_id, user_ip=user_ip))

        tasks = []
        for t in rows:
            tasks.append(session.query(Task).get(t.id))
        return tasks
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:33,代码来源:sched.py


示例11: new_task

def new_task(app_id, user_id=None, user_ip=None, offset=0):
    '''Get a new task by calling the appropriate scheduler function.
    '''
    try:
        session = get_session(db, bind='slave')
        app = session.query(App).get(app_id)
        if not app.allow_anonymous_contributors and user_id is None:
            info = dict(
                error="This project does not allow anonymous contributors")
            error = Task(info=info)
            return error
        else:
            sched_map = {
                'default': get_depth_first_task,
                'breadth_first': get_breadth_first_task,
                'depth_first': get_depth_first_task,
                'random': get_random_task,
                'incremental': get_incremental_task,
			'filter_by_users': get_filtered_by_user_task}
            sched = sched_map.get(app.info.get('sched'), sched_map['default'])
            return sched(app_id, user_id, user_ip, offset=offset)
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:26,代码来源:sched.py


示例12: get_locs

def get_locs(): # pragma: no cover
    try:
        session = get_session(db, bind='slave')
        # All IP addresses from anonymous users to create a map
        locs = []
        if current_app.config['GEO']:
            sql = '''SELECT DISTINCT(user_ip) from task_run WHERE user_ip IS NOT NULL;'''
            results = session.execute(sql)

            geolite = current_app.root_path + '/../dat/GeoLiteCity.dat'
            gic = pygeoip.GeoIP(geolite)
            for row in results:
                loc = gic.record_by_addr(row.user_ip)
                if loc is None:
                    loc = {}
                if (len(loc.keys()) == 0):
                    loc['latitude'] = 0
                    loc['longitude'] = 0
                locs.append(dict(loc=loc))
        return locs
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:25,代码来源:stats.py


示例13: get_top5_users_24_hours

def get_top5_users_24_hours():
    try:
        session = get_session(db, bind='slave')
        # Top 5 Most active users in last 24 hours
        sql = text('''SELECT "user".id, "user".fullname, "user".name,
                   COUNT(task_run.app_id) AS n_answers FROM "user", task_run
                   WHERE "user".id=task_run.user_id
                   AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour'
                   AND DATE(task_run.finish_time) <= NOW()
                   GROUP BY "user".id
                   ORDER BY n_answers DESC LIMIT 5;''')

        results = session.execute(sql, dict(limit=5))
        top5_users_24_hours = []
        for row in results:
            user = dict(id=row.id, fullname=row.fullname,
                        name=row.name,
                        n_answers=row.n_answers)
            top5_users_24_hours.append(user)
        return top5_users_24_hours
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:25,代码来源:stats.py


示例14: get_top5_apps_24_hours

def get_top5_apps_24_hours():
    try:
        session = get_session(db, bind='slave')
        # Top 5 Most active apps in last 24 hours
        sql = text('''SELECT app.id, app.name, app.short_name, app.info,
                   COUNT(task_run.app_id) AS n_answers FROM app, task_run
                   WHERE app.id=task_run.app_id
                   AND app.hidden=0
                   AND DATE(task_run.finish_time) > NOW() - INTERVAL '24 hour'
                   AND DATE(task_run.finish_time) <= NOW()
                   GROUP BY app.id
                   ORDER BY n_answers DESC LIMIT 5;''')

        results = session.execute(sql, dict(limit=5))
        top5_apps_24_hours = []
        for row in results:
            tmp = dict(id=row.id, name=row.name, short_name=row.short_name,
                       info=dict(json.loads(row.info)), n_answers=row.n_answers)
            top5_apps_24_hours.append(tmp)
        return top5_apps_24_hours
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:25,代码来源:stats.py


示例15: draft_apps

def draft_apps(user_id):
    try:
        sql = text('''
                   SELECT app.id, app.name, app.short_name, app.description,
                   owner_id,
                   app.info
                   FROM app
                   WHERE app.owner_id=:user_id
                   AND app.info NOT LIKE('%task_presenter%')
                   GROUP BY app.id, app.name, app.short_name,
                   app.description,
                   app.info;''')
        apps_draft = []
        session = get_session(db, bind='slave')
        results = session.execute(sql, dict(user_id=user_id))
        for row in results:
            app = dict(id=row.id, name=row.name, short_name=row.short_name,
                       owner_id=row.owner_id,
                       description=row.description,
                       info=json.loads(row.info))
            apps_draft.append(app)
        return apps_draft
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:27,代码来源:users.py


示例16: applications

def applications(name):
    """
    List user's project list.

    Returns a Jinja2 template with the list of projects of the user.

    """
    try:
        session = get_session(db, bind='slave')
        user = session.query(User).filter_by(name=name).first()
        if not user:
            return abort(404)
        if current_user.name != name:
            return abort(403)

        user = db.session.query(model.user.User).get(current_user.id)
        apps_published, apps_draft = _get_user_apps(user.id)
        apps_published.extend(cached_users.hidden_apps(user.id))

        return render_template('account/applications.html',
                               title=gettext("Projects"),
                               apps_published=apps_published,
                               apps_draft=apps_draft)
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:28,代码来源:account.py


示例17: stats_users

def stats_users(app_id):
    """Return users's stats for a given app_id"""
    try:
        session = get_session(db, bind='slave')

        users = {}
        auth_users = []
        anon_users = []

        # Get Authenticated Users
        sql = text('''SELECT task_run.user_id AS user_id,
                   COUNT(task_run.id) as n_tasks FROM task_run
                   WHERE task_run.user_id IS NOT NULL AND
                   task_run.user_ip IS NULL AND
                   task_run.app_id=:app_id
                   GROUP BY task_run.user_id ORDER BY n_tasks DESC
                   LIMIT 5;''')
        results = session.execute(sql, dict(app_id=app_id))

        for row in results:
            auth_users.append([row.user_id, row.n_tasks])

        sql = text('''SELECT count(distinct(task_run.user_id)) AS user_id FROM task_run
                   WHERE task_run.user_id IS NOT NULL AND
                   task_run.user_ip IS NULL AND
                   task_run.app_id=:app_id;''')

        results = session.execute(sql, dict(app_id=app_id))
        for row in results:
            users['n_auth'] = row[0]

        # Get all Anonymous Users
        sql = text('''SELECT task_run.user_ip AS user_ip,
                   COUNT(task_run.id) as n_tasks FROM task_run
                   WHERE task_run.user_ip IS NOT NULL AND
                   task_run.user_id IS NULL AND
                   task_run.app_id=:app_id
                   GROUP BY task_run.user_ip ORDER BY n_tasks DESC;''').execution_options(stream=True)
        results = session.execute(sql, dict(app_id=app_id))

        for row in results:
            anon_users.append([row.user_ip, row.n_tasks])

        sql = text('''SELECT COUNT(DISTINCT(task_run.user_ip)) AS user_ip FROM task_run
                   WHERE task_run.user_ip IS NOT NULL AND
                   task_run.user_id IS NULL AND
                   task_run.app_id=:app_id;''')

        results = session.execute(sql, dict(app_id=app_id))

        for row in results:
            users['n_anon'] = row[0]

        return users, anon_users, auth_users
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:geoapi,项目名称:pybossa,代码行数:59,代码来源:stats.py


示例18: stats_dates

def stats_dates(app_id):
    try:
        dates = {}
        dates_anon = {}
        dates_auth = {}
        dates_n_tasks = {}

        session = get_session(db, bind='slave')

        avg, total_n_tasks = get_avg_n_tasks(app_id)

        # Get all answers per date
        sql = text('''
                    WITH myquery AS (
                        SELECT TO_DATE(finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') as d,
                                       COUNT(id)
                        FROM task_run WHERE app_id=:app_id GROUP BY d)
                   SELECT to_char(d, 'YYYY-MM-DD') as d, count from myquery;
                   ''').execution_options(stream=True)

        results = session.execute(sql, dict(app_id=app_id))
        for row in results:
            dates[row.d] = row.count
            dates_n_tasks[row.d] = total_n_tasks * avg


        # Get all answers per date for auth
        sql = text('''
                    WITH myquery AS (
                        SELECT TO_DATE(finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') as d,
                                       COUNT(id)
                        FROM task_run WHERE app_id=:app_id AND user_ip IS NULL GROUP BY d)
                   SELECT to_char(d, 'YYYY-MM-DD') as d, count from myquery;
                   ''').execution_options(stream=True)

        results = session.execute(sql, dict(app_id=app_id))
        for row in results:
            dates_auth[row.d] = row.count

        # Get all answers per date for anon
        sql = text('''
                    WITH myquery AS (
                        SELECT TO_DATE(finish_time, 'YYYY-MM-DD\THH24:MI:SS.US') as d,
                                       COUNT(id)
                        FROM task_run WHERE app_id=:app_id AND user_id IS NULL GROUP BY d)
                   SELECT to_char(d, 'YYYY-MM-DD') as d, count  from myquery;
                   ''').execution_options(stream=True)

        results = session.execute(sql, dict(app_id=app_id))
        for row in results:
            dates_anon[row.d] = row.count

        return dates, dates_n_tasks, dates_anon, dates_auth
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:58,代码来源:stats.py


示例19: get_breadth_first_task

def get_breadth_first_task(app_id, user_id=None, user_ip=None, n_answers=30, offset=0):
    """Gets a new task which have the least number of task runs (excluding the
    current user).

    Note that it **ignores** the number of answers limit for efficiency reasons
    (this is not a big issue as all it means is that you may end up with some
    tasks run more than is strictly needed!)
    """
    try:
        # Uncomment the next three lines to profile the sched function
        #import timeit
        #T = timeit.Timer(lambda: get_candidate_tasks(app_id, user_id,
        #                  user_ip, n_answers))
        #print "First algorithm: %s" % T.timeit(number=1)
        session = get_session(db, bind='slave')
        if user_id and not user_ip:
            sql = text('''
                       SELECT task.id, COUNT(task_run.task_id) AS taskcount FROM task
                       LEFT JOIN task_run ON (task.id = task_run.task_id) WHERE NOT EXISTS
                       (SELECT 1 FROM task_run WHERE app_id=:app_id AND
                       user_id=:user_id AND task_id=task.id)
                       AND task.app_id=:app_id AND task.state !='completed'
                       group by task.id ORDER BY taskcount, id ASC LIMIT 10;
                       ''')
            tasks = session.execute(sql, dict(app_id=app_id, user_id=user_id))
        else:
            if not user_ip: # pragma: no cover
                user_ip = '127.0.0.1'
            sql = text('''
                       SELECT task.id, COUNT(task_run.task_id) AS taskcount FROM task
                       LEFT JOIN task_run ON (task.id = task_run.task_id) WHERE NOT EXISTS
                       (SELECT 1 FROM task_run WHERE app_id=:app_id AND
                       user_ip=:user_ip AND task_id=task.id)
                       AND task.app_id=:app_id AND task.state !='completed'
                       group by task.id ORDER BY taskcount, id ASC LIMIT 10;
                       ''')

            # results will be list of (taskid, count)
            tasks = session.execute(sql, dict(app_id=app_id, user_ip=user_ip))
        # ignore n_answers for the present - we will just keep going once we've
        # done as many as we need
        tasks = [x[0] for x in tasks]
        if tasks:
            if (offset == 0):
                return session.query(Task).get(tasks[0])
            else:
                if (offset < len(tasks)):
                    return session.query(Task).get(tasks[offset])
                else:
                    return None
        else: # pragma: no cover
            return None
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:57,代码来源:sched.py


示例20: get_app

def get_app(short_name):
    try:
        session = get_session(db, bind='slave')
        app = session.query(App).filter_by(short_name=short_name).first()
        return app
    except: # pragma: no cover
        session.rollback()
        raise
    finally:
        session.close()
开发者ID:bcfuchs,项目名称:pybossa,代码行数:10,代码来源:apps.py



注:本文中的pybossa.core.get_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python project_repo.get函数代码示例发布时间:2022-05-25
下一篇:
Python users.get_user_summary函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap