• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python project_repo.get函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pybossa.core.project_repo.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_warn_project_excludes_completed_projects

    def test_warn_project_excludes_completed_projects(self, clean_mock):
        """Test JOB email excludes completed projects."""
        from pybossa.core import mail
        with mail.record_messages() as outbox:
            date = '2010-10-22T11:02:00.000000'

            owner = UserFactory.create(consent=True, subscribed=True)
            project = ProjectFactory.create(updated=date, contacted=False,
                                            owner=owner)
            TaskFactory.create(created=date, project=project, state='completed')
            project_id = project.id
            project = project_repo.get(project_id)
            project.updated = date
            project_repo.update(project)

            project = ProjectFactory.create(updated=date, contacted=False,
                                            owner=owner)
            TaskFactory.create(created=date, project=project, state='ongoing')
            project_id = project.id
            project = project_repo.get(project_id)
            project.updated = date
            project_repo.update(project)

            warn_old_project_owners()
            assert len(outbox) == 1, outbox
            subject = 'Your PYBOSSA project: %s has been inactive' % project.name
            assert outbox[0].subject == subject
            err_msg = "project.contacted field should be True"
            assert project.contacted, err_msg
            err_msg = "project.published field should be False"
            assert project.published is False, err_msg
            err_msg = "cache of project should be cleaned"
            clean_mock.assert_called_with(project_id), err_msg
            err_msg = "The update date should be different"
            assert project.updated != date, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:35,代码来源:test_old_projects.py


示例2: _retrieve_new_task

def _retrieve_new_task(project_id):
    project = project_repo.get(project_id)
    if project is None:
        raise NotFound
    if not project.allow_anonymous_contributors and current_user.is_anonymous():
        info = dict(
            error="This project does not allow anonymous contributors")
        error = model.task.Task(info=info)
        return error
    if request.args.get('offset'):
        offset = int(request.args.get('offset'))
    else:
        offset = 0

    user_id = None if current_user.is_anonymous() else current_user.id
    user_ip = request.remote_addr if current_user.is_anonymous() else None

    if request.headers.get('remote_mobile_addr') is not None :
            user_ip = request.headers.get('remote_mobile_addr') if current_user.is_anonymous() else None
    else:
            if request.headers.getlist("X-Forwarded-For"):
                user_ip = request.headers.getlist("X-Forwarded-For")[0]

    if ',' in user_ip:
            ips = user_ip.split(",")
            for ip in ips:
                user_ip = ip

    print "_retrieve_new_task %s." % user_id
    print "_retrieve_new_task %s." % user_ip
    print "project.info.get('sched') %s." % project.info.get('sched')

    task = sched.new_task(project_id, project.info.get('sched'), user_id, user_ip, offset)
    return task
开发者ID:amanagrawal9,项目名称:Pybossa,代码行数:34,代码来源:__init__.py


示例3: test_warn_project_owner

    def test_warn_project_owner(self, mail, clean_mock):
        """Test JOB email is sent to warn project owner."""
        from smtplib import SMTPRecipientsRefused
        from nose.tools import assert_raises
        # Mock for the send method
        send_mock = MagicMock()
        send_mock.send.side_effect = SMTPRecipientsRefused('wrong')
        # Mock for the connection method
        connection = MagicMock()
        connection.__enter__.return_value = send_mock
        # Join them
        mail.connect.return_value = connection

        date = '2010-10-22T11:02:00.000000'
        owner = UserFactory.create(consent=True, subscribed=True,
                                   email_addr="wrong")
        project = ProjectFactory.create(updated=date, owner=owner)
        project_id = project.id
        assert warn_old_project_owners() is False
        assert_raises(SMTPRecipientsRefused, send_mock.send, 'msg')
        project = project_repo.get(project_id)
        err_msg = "mail.connect() should be called"
        assert mail.connect.called, err_msg
        err_msg = "conn.send() should be called"
        assert send_mock.send.called, err_msg
        err_msg = "project.contacted field should be False"
        assert project.contacted is False, err_msg
        err_msg = "project.published field should be True"
        assert project.published, err_msg
        err_msg = "The update date should be the same"
        assert project.updated == date, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:31,代码来源:test_old_projects.py


示例4: test_task_priority_external_uid

    def test_task_priority_external_uid(self):
        """Test SCHED respects priority_0 field for externa uid"""
        project = ProjectFactory.create(info=dict(sched='depth_first_all'),
                                        owner=UserFactory.create(id=500))

        TaskFactory.create_batch(10, project=project)

        # By default, tasks without priority should be ordered by task.id (FIFO)
        tasks = db.session.query(Task).filter_by(project_id=1).order_by('id').all()
        project = project_repo.get(1)
        headers = self.get_headers_jwt(project)
        url = 'api/project/%s/newtask?external_uid=342' % project.id
        res = self.app.get(url, headers=headers)
        task1 = json.loads(res.data)
        # Check that we received a Task
        err_msg = "Task.id should be the same"
        assert task1.get('id') == tasks[0].id, err_msg

        # Now let's change the priority to a random task
        import random
        t = random.choice(tasks)
        # Increase priority to maximum
        t.priority_0 = 1
        db.session.add(t)
        db.session.commit()
        # Request again a new task
        res = self.app.get(url + '&orderby=priority_0&desc=true', headers=headers)
        task1 = json.loads(res.data)
        # Check that we received a Task
        err_msg = "Task.id should be the same"
        assert task1.get('id') == t.id, (err_msg, task1, t)
        err_msg = "Task.priority_0 should be the 1"
        assert task1.get('priority_0') == 1, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:33,代码来源:test_sched_depth_first_all.py


示例5: user_progress

def user_progress(project_id=None, short_name=None):
    """API endpoint for user progress.

    Return a JSON object with two fields regarding the tasks for the user:
        { 'done': 10,
          'total: 100
        }
       This will mean that the user has done a 10% of the available tasks for
       him

    """
    if project_id or short_name:
        if short_name:
            project = project_repo.get_by_shortname(short_name)
        elif project_id:
            project = project_repo.get(project_id)

        if project:
            # For now, keep this version, but wait until redis cache is used here for task_runs too
            query_attrs = dict(project_id=project.id)
            if current_user.is_anonymous():
                query_attrs['user_ip'] = request.remote_addr or '127.0.0.1'
            else:
                query_attrs['user_id'] = current_user.id
            taskrun_count = task_repo.count_task_runs_with(**query_attrs)
            tmp = dict(done=taskrun_count, total=n_tasks(project.id))
            return Response(json.dumps(tmp), mimetype="application/json")
        else:
            return abort(404)
    else:  # pragma: no cover
        return abort(404)
开发者ID:amanagrawal9,项目名称:Pybossa,代码行数:31,代码来源:__init__.py


示例6: test_task_priority_external_uid

    def test_task_priority_external_uid(self):
        """Test SCHED respects priority_0 field for externa uid"""
        # Del previous TaskRuns
        self.create()
        self.del_task_runs()

        # By default, tasks without priority should be ordered by task.id (FIFO)
        tasks = db.session.query(Task).filter_by(project_id=1).order_by('id').all()
        project = project_repo.get(1)
        headers = self.get_headers_jwt(project)
        url = 'api/project/1/newtask?external_uid=342'
        res = self.app.get(url, headers=headers)
        task1 = json.loads(res.data)
        # Check that we received a Task
        err_msg = "Task.id should be the same"
        assert task1.get('id') == tasks[0].id, err_msg

        # Now let's change the priority to a random task
        import random
        t = random.choice(tasks)
        # Increase priority to maximum
        t.priority_0 = 1
        db.session.add(t)
        db.session.commit()
        # Request again a new task
        res = self.app.get(url, headers=headers)
        task1 = json.loads(res.data)
        # Check that we received a Task
        err_msg = "Task.id should be the same"
        assert task1.get('id') == t.id, err_msg
        err_msg = "Task.priority_0 should be the 1"
        assert task1.get('priority_0') == 1, err_msg
开发者ID:whxhxw,项目名称:pybossa,代码行数:32,代码来源:test_sched.py


示例7: _retrieve_new_task

def _retrieve_new_task(project_id):

    project = project_repo.get(project_id)

    if project is None:
        raise NotFound

    if not project.allow_anonymous_contributors and current_user.is_anonymous():
        info = dict(
            error="This project does not allow anonymous contributors")
        error = model.task.Task(info=info)
        return error

    if request.args.get('external_uid'):
        resp = jwt_authorize_project(project,
                                     request.headers.get('Authorization'))
        if resp != True:
            return resp

    if request.args.get('offset'):
        offset = int(request.args.get('offset'))
    else:
        offset = 0
    user_id = None if current_user.is_anonymous() else current_user.id
    user_ip = request.remote_addr if current_user.is_anonymous() else None
    external_uid = request.args.get('external_uid')
    task = sched.new_task(project_id, project.info.get('sched'),
                          user_id,
                          user_ip,
                          external_uid,
                          offset)
    return task
开发者ID:whxhxw,项目名称:pybossa,代码行数:32,代码来源:__init__.py


示例8: project_export

def project_export(id):
    from pybossa.core import project_repo, json_exporter, csv_exporter
    app = project_repo.get(id)
    if app is not None:
        print "Export project id %d" % id
        json_exporter.pregenerate_zip_files(app)
        csv_exporter.pregenerate_zip_files(app)
开发者ID:bingoko,项目名称:pybossa,代码行数:7,代码来源:jobs.py


示例9: update

def update(task):
    if not current_user.is_anonymous():
        app = project_repo.get(task.app_id)
        if app.owner_id == current_user.id or current_user.admin is True:
            return True
        else:
            return False
    else:
        return False
开发者ID:alisheikh,项目名称:pybossa,代码行数:9,代码来源:task.py


示例10: post

 def post(self):
     try:
         project_id = json.loads(request.data).get("project_id")
         project = project_repo.get(project_id)
         if project is not None and not project.published:
             return json.dumps({"status": "OK"})
     except Exception as e:
         return super(TaskRunAPI, self).post()
     return super(TaskRunAPI, self).post()
开发者ID:pkdevbox,项目名称:pybossa,代码行数:9,代码来源:task_run.py


示例11: _retrieve_new_task

def _retrieve_new_task(project_id):

    project = project_repo.get(project_id)

    if project is None:
        raise NotFound

    if not project.allow_anonymous_contributors and current_user.is_anonymous():
        info = dict(
            error="This project does not allow anonymous contributors")
        error = [model.task.Task(info=info)]
        return error

    if request.args.get('external_uid'):
        resp = jwt_authorize_project(project,
                                     request.headers.get('Authorization'))
        if resp != True:
            return resp

    if request.args.get('limit'):
        limit = int(request.args.get('limit'))
    else:
        limit = 1

    if limit > 100:
        limit = 100

    if request.args.get('offset'):
        offset = int(request.args.get('offset'))
    else:
        offset = 0

    if request.args.get('orderby'):
        orderby = request.args.get('orderby')
    else:
        orderby = 'id'

    if request.args.get('desc'):
        desc = fuzzyboolean(request.args.get('desc'))
    else:
        desc = False

    user_id = None if current_user.is_anonymous() else current_user.id
    user_ip = (anonymizer.ip(request.remote_addr or '127.0.0.1')
               if current_user.is_anonymous() else None)
    external_uid = request.args.get('external_uid')
    task = sched.new_task(project_id, project.info.get('sched'),
                          user_id,
                          user_ip,
                          external_uid,
                          offset,
                          limit,
                          orderby=orderby,
                          desc=desc)
    return task
开发者ID:PyBossa,项目名称:pybossa,代码行数:55,代码来源:__init__.py


示例12: create

def create(taskrun=None):
    project = project_repo.get(task_repo.get_task(taskrun.task_id).app_id)
    if (current_user.is_anonymous() and
        project.allow_anonymous_contributors is False):
        return False
    authorized = task_repo.count_task_runs_with(app_id=taskrun.app_id,
                                                task_id=taskrun.task_id,
                                                user_id=taskrun.user_id,
                                                user_ip=taskrun.user_ip) <= 0
    if not authorized:
        raise abort(403)
    return authorized
开发者ID:alisheikh,项目名称:pybossa,代码行数:12,代码来源:taskrun.py


示例13: _retrieve_new_task

def _retrieve_new_task(app_id):
    app = project_repo.get(app_id)
    if app is None:
        raise NotFound
    if request.args.get('offset'):
        offset = int(request.args.get('offset'))
    else:
        offset = 0
    user_id = None if current_user.is_anonymous() else current_user.id
    user_ip = request.remote_addr if current_user.is_anonymous() else None
    task = sched.new_task(app_id, user_id, user_ip, offset)
    return task
开发者ID:chamaa,项目名称:pybossa,代码行数:12,代码来源:__init__.py


示例14: test_task_preloading_external_uid

    def test_task_preloading_external_uid(self):
        """Test TASK Pre-loading for external user IDs works"""
        project = ProjectFactory.create(info=dict(sched='depth_first_all'),
                                        owner=UserFactory.create(id=500))

        TaskFactory.create_batch(10, project=project)

        assigned_tasks = []
        # Get Task until scheduler returns None
        project = project_repo.get(1)
        headers = self.get_headers_jwt(project)
        url = 'api/project/%s/newtask?external_uid=2xb' % project.id
        res = self.app.get(url, headers=headers)
        task1 = json.loads(res.data)
        # Check that we received a Task
        assert task1.get('id'),  task1
        # Pre-load the next task for the user
        res = self.app.get(url + '&offset=1', headers=headers)
        task2 = json.loads(res.data)
        # Check that we received a Task
        assert task2.get('id'),  task2
        # Check that both tasks are different
        assert task1.get('id') != task2.get('id'), "Tasks should be different"
        ## Save the assigned task
        assigned_tasks.append(task1)
        assigned_tasks.append(task2)

        # Submit an Answer for the assigned and pre-loaded task
        for t in assigned_tasks:
            tr = dict(project_id=t['project_id'],
                      task_id=t['id'], info={'answer': 'No'},
                      external_uid='2xb')
            tr = json.dumps(tr)

            res = self.app.post('/api/taskrun?external_uid=2xb',
                                data=tr, headers=headers)
        # Get two tasks again
        res = self.app.get(url, headers=headers)
        task3 = json.loads(res.data)
        # Check that we received a Task
        assert task3.get('id'),  task1
        # Pre-load the next task for the user
        res = self.app.get(url + '&offset=1', headers=headers)
        task4 = json.loads(res.data)
        # Check that we received a Task
        assert task4.get('id'),  task2
        # Check that both tasks are different
        assert task3.get('id') != task4.get('id'), "Tasks should be different"
        assert task1.get('id') != task3.get('id'), "Tasks should be different"
        assert task2.get('id') != task4.get('id'), "Tasks should be different"
        # Check that a big offset returns None
        res = self.app.get(url + '&offset=11', headers=headers)
        assert json.loads(res.data) == {}, res.data
开发者ID:PyBossa,项目名称:pybossa,代码行数:53,代码来源:test_sched_depth_first_all.py


示例15: import_tasks

def import_tasks(project_id, **form_data):
    """Import tasks for a project."""
    from pybossa.core import project_repo

    app = project_repo.get(project_id)
    msg = importer.create_tasks(task_repo, project_id, **form_data)
    msg = msg + " to your project %s!" % app.name
    subject = "Tasks Import to your project %s" % app.name
    body = "Hello,\n\n" + msg + "\n\nAll the best,\nThe %s team." % current_app.config.get("BRAND")
    mail_dict = dict(recipients=[app.owner.email_addr], subject=subject, body=body)
    send_mail(mail_dict)
    return msg
开发者ID:ruanyuemin,项目名称:pybossa,代码行数:12,代码来源:jobs.py


示例16: push_notification

def push_notification(project_id, **kwargs):
    """Send push notification."""
    from pybossa.core import project_repo
    project = project_repo.get(project_id)
    if project.info.get('onesignal'):
        app_id = current_app.config.get('ONESIGNAL_APP_ID') 
        api_key = current_app.config.get('ONESIGNAL_API_KEY')
        client = PybossaOneSignal(app_id=app_id, api_key=api_key)
        filters = [{"field": "tag", "key": project_id, "relation": "exists"}]
        return client.push_msg(contents=kwargs['contents'],
                               headings=kwargs['headings'],
                               launch_url=kwargs['launch_url'],
                               web_buttons=kwargs['web_buttons'],
                               filters=filters)
开发者ID:fiorda,项目名称:pybossa,代码行数:14,代码来源:jobs.py


示例17: import_tasks

def import_tasks(tasks_info, app_id):
    from pybossa.core import task_repo, project_repo
    from flask import current_app
    import pybossa.importers as importers

    app = project_repo.get(app_id)
    msg = importers.create_tasks(task_repo, tasks_info, app_id)
    msg = msg + ' to your project %s!' % app.name
    subject = 'Tasks Import to your project %s' % app.name
    body = 'Hello,\n\n' + msg + '\n\nAll the best,\nThe %s team.' % current_app.config.get('BRAND')
    mail_dict = dict(recipients=[app.owner.email_addr],
                     subject=subject, body=body)
    send_mail(mail_dict)
    return msg
开发者ID:alisheikh,项目名称:pybossa,代码行数:14,代码来源:jobs.py


示例18: send_weekly_stats_project

def send_weekly_stats_project(project_id):
    from pybossa.cache.project_stats import update_stats, get_stats
    from pybossa.core import project_repo
    from datetime import datetime
    project = project_repo.get(project_id)
    if project.owner.subscribed is False:
        return "Owner does not want updates by email"
    update_stats(project_id)
    dates_stats, hours_stats, users_stats = get_stats(project_id,
                                                      geo=True,
                                                      period='1 week')
    subject = "Weekly Update: %s" % project.name

    timeout = current_app.config.get('TIMEOUT')

    # Max number of completed tasks
    n_completed_tasks = 0
    xy = zip(*dates_stats[3]['values'])
    n_completed_tasks = max(xy[1])
    # Most active day
    xy = zip(*dates_stats[0]['values'])
    active_day = [xy[0][xy[1].index(max(xy[1]))], max(xy[1])]
    active_day[0] = datetime.fromtimestamp(active_day[0]/1000).strftime('%A')
    body = render_template('/account/email/weeklystats.md',
                           project=project,
                           dates_stats=dates_stats,
                           hours_stats=hours_stats,
                           users_stats=users_stats,
                           n_completed_tasks=n_completed_tasks,
                           active_day=active_day,
                           config=current_app.config)
    html = render_template('/account/email/weeklystats.html',
                           project=project,
                           dates_stats=dates_stats,
                           hours_stats=hours_stats,
                           users_stats=users_stats,
                           active_day=active_day,
                           n_completed_tasks=n_completed_tasks,
                           config=current_app.config)
    mail_dict = dict(recipients=[project.owner.email_addr],
                     subject=subject,
                     body=body,
                     html=html)

    job = dict(name=send_mail,
               args=[mail_dict],
               kwargs={},
               timeout=timeout,
               queue='high')
    enqueue_job(job)
开发者ID:fiorda,项目名称:pybossa,代码行数:50,代码来源:jobs.py


示例19: get_autoimport_jobs

def get_autoimport_jobs(queue='low'):
    """Get autoimport jobs."""
    from pybossa.core import project_repo
    import pybossa.cache.projects as cached_projects
    pro_user_projects = cached_projects.get_from_pro_user()
    for project_dict in pro_user_projects:
        project = project_repo.get(project_dict['id'])
        if project.has_autoimporter():
            job = dict(name=import_tasks,
                       args=[project.id],
                       kwargs=project.get_autoimporter(),
                       timeout=(10 * MINUTE),
                       queue=queue)
            yield job
开发者ID:codeaudit,项目名称:pybossa,代码行数:14,代码来源:jobs.py


示例20: import_tasks

def import_tasks(project_id, from_auto=False, **form_data):
    """Import tasks for a project."""
    from pybossa.core import project_repo

    project = project_repo.get(project_id)
    report = importer.create_tasks(task_repo, project_id, **form_data)
    if from_auto:
        form_data["last_import_meta"] = report.metadata
        project.set_autoimporter(form_data)
        project_repo.save(project)
    msg = report.message + " to your project %s!" % project.name
    subject = "Tasks Import to your project %s" % project.name
    body = "Hello,\n\n" + msg + "\n\nAll the best,\nThe %s team." % current_app.config.get("BRAND")
    mail_dict = dict(recipients=[project.owner.email_addr], subject=subject, body=body)
    send_mail(mail_dict)
    return msg
开发者ID:chandra1b,项目名称:pybossa,代码行数:16,代码来源:jobs.py



注:本文中的pybossa.core.project_repo.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python user_repo.get函数代码示例发布时间:2022-05-25
下一篇:
Python core.get_session函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap