本文整理汇总了Python中pybossa.core.project_repo.get函数的典型用法代码示例。如果您正苦于以下问题:Python get函数的具体用法?Python get怎么用?Python get使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_warn_project_excludes_completed_projects
def test_warn_project_excludes_completed_projects(self, clean_mock):
"""Test JOB email excludes completed projects."""
from pybossa.core import mail
with mail.record_messages() as outbox:
date = '2010-10-22T11:02:00.000000'
owner = UserFactory.create(consent=True, subscribed=True)
project = ProjectFactory.create(updated=date, contacted=False,
owner=owner)
TaskFactory.create(created=date, project=project, state='completed')
project_id = project.id
project = project_repo.get(project_id)
project.updated = date
project_repo.update(project)
project = ProjectFactory.create(updated=date, contacted=False,
owner=owner)
TaskFactory.create(created=date, project=project, state='ongoing')
project_id = project.id
project = project_repo.get(project_id)
project.updated = date
project_repo.update(project)
warn_old_project_owners()
assert len(outbox) == 1, outbox
subject = 'Your PYBOSSA project: %s has been inactive' % project.name
assert outbox[0].subject == subject
err_msg = "project.contacted field should be True"
assert project.contacted, err_msg
err_msg = "project.published field should be False"
assert project.published is False, err_msg
err_msg = "cache of project should be cleaned"
clean_mock.assert_called_with(project_id), err_msg
err_msg = "The update date should be different"
assert project.updated != date, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:35,代码来源:test_old_projects.py
示例2: _retrieve_new_task
def _retrieve_new_task(project_id):
project = project_repo.get(project_id)
if project is None:
raise NotFound
if not project.allow_anonymous_contributors and current_user.is_anonymous():
info = dict(
error="This project does not allow anonymous contributors")
error = model.task.Task(info=info)
return error
if request.args.get('offset'):
offset = int(request.args.get('offset'))
else:
offset = 0
user_id = None if current_user.is_anonymous() else current_user.id
user_ip = request.remote_addr if current_user.is_anonymous() else None
if request.headers.get('remote_mobile_addr') is not None :
user_ip = request.headers.get('remote_mobile_addr') if current_user.is_anonymous() else None
else:
if request.headers.getlist("X-Forwarded-For"):
user_ip = request.headers.getlist("X-Forwarded-For")[0]
if ',' in user_ip:
ips = user_ip.split(",")
for ip in ips:
user_ip = ip
print "_retrieve_new_task %s." % user_id
print "_retrieve_new_task %s." % user_ip
print "project.info.get('sched') %s." % project.info.get('sched')
task = sched.new_task(project_id, project.info.get('sched'), user_id, user_ip, offset)
return task
开发者ID:amanagrawal9,项目名称:Pybossa,代码行数:34,代码来源:__init__.py
示例3: test_warn_project_owner
def test_warn_project_owner(self, mail, clean_mock):
"""Test JOB email is sent to warn project owner."""
from smtplib import SMTPRecipientsRefused
from nose.tools import assert_raises
# Mock for the send method
send_mock = MagicMock()
send_mock.send.side_effect = SMTPRecipientsRefused('wrong')
# Mock for the connection method
connection = MagicMock()
connection.__enter__.return_value = send_mock
# Join them
mail.connect.return_value = connection
date = '2010-10-22T11:02:00.000000'
owner = UserFactory.create(consent=True, subscribed=True,
email_addr="wrong")
project = ProjectFactory.create(updated=date, owner=owner)
project_id = project.id
assert warn_old_project_owners() is False
assert_raises(SMTPRecipientsRefused, send_mock.send, 'msg')
project = project_repo.get(project_id)
err_msg = "mail.connect() should be called"
assert mail.connect.called, err_msg
err_msg = "conn.send() should be called"
assert send_mock.send.called, err_msg
err_msg = "project.contacted field should be False"
assert project.contacted is False, err_msg
err_msg = "project.published field should be True"
assert project.published, err_msg
err_msg = "The update date should be the same"
assert project.updated == date, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:31,代码来源:test_old_projects.py
示例4: test_task_priority_external_uid
def test_task_priority_external_uid(self):
"""Test SCHED respects priority_0 field for externa uid"""
project = ProjectFactory.create(info=dict(sched='depth_first_all'),
owner=UserFactory.create(id=500))
TaskFactory.create_batch(10, project=project)
# By default, tasks without priority should be ordered by task.id (FIFO)
tasks = db.session.query(Task).filter_by(project_id=1).order_by('id').all()
project = project_repo.get(1)
headers = self.get_headers_jwt(project)
url = 'api/project/%s/newtask?external_uid=342' % project.id
res = self.app.get(url, headers=headers)
task1 = json.loads(res.data)
# Check that we received a Task
err_msg = "Task.id should be the same"
assert task1.get('id') == tasks[0].id, err_msg
# Now let's change the priority to a random task
import random
t = random.choice(tasks)
# Increase priority to maximum
t.priority_0 = 1
db.session.add(t)
db.session.commit()
# Request again a new task
res = self.app.get(url + '&orderby=priority_0&desc=true', headers=headers)
task1 = json.loads(res.data)
# Check that we received a Task
err_msg = "Task.id should be the same"
assert task1.get('id') == t.id, (err_msg, task1, t)
err_msg = "Task.priority_0 should be the 1"
assert task1.get('priority_0') == 1, err_msg
开发者ID:PyBossa,项目名称:pybossa,代码行数:33,代码来源:test_sched_depth_first_all.py
示例5: user_progress
def user_progress(project_id=None, short_name=None):
"""API endpoint for user progress.
Return a JSON object with two fields regarding the tasks for the user:
{ 'done': 10,
'total: 100
}
This will mean that the user has done a 10% of the available tasks for
him
"""
if project_id or short_name:
if short_name:
project = project_repo.get_by_shortname(short_name)
elif project_id:
project = project_repo.get(project_id)
if project:
# For now, keep this version, but wait until redis cache is used here for task_runs too
query_attrs = dict(project_id=project.id)
if current_user.is_anonymous():
query_attrs['user_ip'] = request.remote_addr or '127.0.0.1'
else:
query_attrs['user_id'] = current_user.id
taskrun_count = task_repo.count_task_runs_with(**query_attrs)
tmp = dict(done=taskrun_count, total=n_tasks(project.id))
return Response(json.dumps(tmp), mimetype="application/json")
else:
return abort(404)
else: # pragma: no cover
return abort(404)
开发者ID:amanagrawal9,项目名称:Pybossa,代码行数:31,代码来源:__init__.py
示例6: test_task_priority_external_uid
def test_task_priority_external_uid(self):
"""Test SCHED respects priority_0 field for externa uid"""
# Del previous TaskRuns
self.create()
self.del_task_runs()
# By default, tasks without priority should be ordered by task.id (FIFO)
tasks = db.session.query(Task).filter_by(project_id=1).order_by('id').all()
project = project_repo.get(1)
headers = self.get_headers_jwt(project)
url = 'api/project/1/newtask?external_uid=342'
res = self.app.get(url, headers=headers)
task1 = json.loads(res.data)
# Check that we received a Task
err_msg = "Task.id should be the same"
assert task1.get('id') == tasks[0].id, err_msg
# Now let's change the priority to a random task
import random
t = random.choice(tasks)
# Increase priority to maximum
t.priority_0 = 1
db.session.add(t)
db.session.commit()
# Request again a new task
res = self.app.get(url, headers=headers)
task1 = json.loads(res.data)
# Check that we received a Task
err_msg = "Task.id should be the same"
assert task1.get('id') == t.id, err_msg
err_msg = "Task.priority_0 should be the 1"
assert task1.get('priority_0') == 1, err_msg
开发者ID:whxhxw,项目名称:pybossa,代码行数:32,代码来源:test_sched.py
示例7: _retrieve_new_task
def _retrieve_new_task(project_id):
project = project_repo.get(project_id)
if project is None:
raise NotFound
if not project.allow_anonymous_contributors and current_user.is_anonymous():
info = dict(
error="This project does not allow anonymous contributors")
error = model.task.Task(info=info)
return error
if request.args.get('external_uid'):
resp = jwt_authorize_project(project,
request.headers.get('Authorization'))
if resp != True:
return resp
if request.args.get('offset'):
offset = int(request.args.get('offset'))
else:
offset = 0
user_id = None if current_user.is_anonymous() else current_user.id
user_ip = request.remote_addr if current_user.is_anonymous() else None
external_uid = request.args.get('external_uid')
task = sched.new_task(project_id, project.info.get('sched'),
user_id,
user_ip,
external_uid,
offset)
return task
开发者ID:whxhxw,项目名称:pybossa,代码行数:32,代码来源:__init__.py
示例8: project_export
def project_export(id):
from pybossa.core import project_repo, json_exporter, csv_exporter
app = project_repo.get(id)
if app is not None:
print "Export project id %d" % id
json_exporter.pregenerate_zip_files(app)
csv_exporter.pregenerate_zip_files(app)
开发者ID:bingoko,项目名称:pybossa,代码行数:7,代码来源:jobs.py
示例9: update
def update(task):
if not current_user.is_anonymous():
app = project_repo.get(task.app_id)
if app.owner_id == current_user.id or current_user.admin is True:
return True
else:
return False
else:
return False
开发者ID:alisheikh,项目名称:pybossa,代码行数:9,代码来源:task.py
示例10: post
def post(self):
try:
project_id = json.loads(request.data).get("project_id")
project = project_repo.get(project_id)
if project is not None and not project.published:
return json.dumps({"status": "OK"})
except Exception as e:
return super(TaskRunAPI, self).post()
return super(TaskRunAPI, self).post()
开发者ID:pkdevbox,项目名称:pybossa,代码行数:9,代码来源:task_run.py
示例11: _retrieve_new_task
def _retrieve_new_task(project_id):
project = project_repo.get(project_id)
if project is None:
raise NotFound
if not project.allow_anonymous_contributors and current_user.is_anonymous():
info = dict(
error="This project does not allow anonymous contributors")
error = [model.task.Task(info=info)]
return error
if request.args.get('external_uid'):
resp = jwt_authorize_project(project,
request.headers.get('Authorization'))
if resp != True:
return resp
if request.args.get('limit'):
limit = int(request.args.get('limit'))
else:
limit = 1
if limit > 100:
limit = 100
if request.args.get('offset'):
offset = int(request.args.get('offset'))
else:
offset = 0
if request.args.get('orderby'):
orderby = request.args.get('orderby')
else:
orderby = 'id'
if request.args.get('desc'):
desc = fuzzyboolean(request.args.get('desc'))
else:
desc = False
user_id = None if current_user.is_anonymous() else current_user.id
user_ip = (anonymizer.ip(request.remote_addr or '127.0.0.1')
if current_user.is_anonymous() else None)
external_uid = request.args.get('external_uid')
task = sched.new_task(project_id, project.info.get('sched'),
user_id,
user_ip,
external_uid,
offset,
limit,
orderby=orderby,
desc=desc)
return task
开发者ID:PyBossa,项目名称:pybossa,代码行数:55,代码来源:__init__.py
示例12: create
def create(taskrun=None):
project = project_repo.get(task_repo.get_task(taskrun.task_id).app_id)
if (current_user.is_anonymous() and
project.allow_anonymous_contributors is False):
return False
authorized = task_repo.count_task_runs_with(app_id=taskrun.app_id,
task_id=taskrun.task_id,
user_id=taskrun.user_id,
user_ip=taskrun.user_ip) <= 0
if not authorized:
raise abort(403)
return authorized
开发者ID:alisheikh,项目名称:pybossa,代码行数:12,代码来源:taskrun.py
示例13: _retrieve_new_task
def _retrieve_new_task(app_id):
app = project_repo.get(app_id)
if app is None:
raise NotFound
if request.args.get('offset'):
offset = int(request.args.get('offset'))
else:
offset = 0
user_id = None if current_user.is_anonymous() else current_user.id
user_ip = request.remote_addr if current_user.is_anonymous() else None
task = sched.new_task(app_id, user_id, user_ip, offset)
return task
开发者ID:chamaa,项目名称:pybossa,代码行数:12,代码来源:__init__.py
示例14: test_task_preloading_external_uid
def test_task_preloading_external_uid(self):
"""Test TASK Pre-loading for external user IDs works"""
project = ProjectFactory.create(info=dict(sched='depth_first_all'),
owner=UserFactory.create(id=500))
TaskFactory.create_batch(10, project=project)
assigned_tasks = []
# Get Task until scheduler returns None
project = project_repo.get(1)
headers = self.get_headers_jwt(project)
url = 'api/project/%s/newtask?external_uid=2xb' % project.id
res = self.app.get(url, headers=headers)
task1 = json.loads(res.data)
# Check that we received a Task
assert task1.get('id'), task1
# Pre-load the next task for the user
res = self.app.get(url + '&offset=1', headers=headers)
task2 = json.loads(res.data)
# Check that we received a Task
assert task2.get('id'), task2
# Check that both tasks are different
assert task1.get('id') != task2.get('id'), "Tasks should be different"
## Save the assigned task
assigned_tasks.append(task1)
assigned_tasks.append(task2)
# Submit an Answer for the assigned and pre-loaded task
for t in assigned_tasks:
tr = dict(project_id=t['project_id'],
task_id=t['id'], info={'answer': 'No'},
external_uid='2xb')
tr = json.dumps(tr)
res = self.app.post('/api/taskrun?external_uid=2xb',
data=tr, headers=headers)
# Get two tasks again
res = self.app.get(url, headers=headers)
task3 = json.loads(res.data)
# Check that we received a Task
assert task3.get('id'), task1
# Pre-load the next task for the user
res = self.app.get(url + '&offset=1', headers=headers)
task4 = json.loads(res.data)
# Check that we received a Task
assert task4.get('id'), task2
# Check that both tasks are different
assert task3.get('id') != task4.get('id'), "Tasks should be different"
assert task1.get('id') != task3.get('id'), "Tasks should be different"
assert task2.get('id') != task4.get('id'), "Tasks should be different"
# Check that a big offset returns None
res = self.app.get(url + '&offset=11', headers=headers)
assert json.loads(res.data) == {}, res.data
开发者ID:PyBossa,项目名称:pybossa,代码行数:53,代码来源:test_sched_depth_first_all.py
示例15: import_tasks
def import_tasks(project_id, **form_data):
"""Import tasks for a project."""
from pybossa.core import project_repo
app = project_repo.get(project_id)
msg = importer.create_tasks(task_repo, project_id, **form_data)
msg = msg + " to your project %s!" % app.name
subject = "Tasks Import to your project %s" % app.name
body = "Hello,\n\n" + msg + "\n\nAll the best,\nThe %s team." % current_app.config.get("BRAND")
mail_dict = dict(recipients=[app.owner.email_addr], subject=subject, body=body)
send_mail(mail_dict)
return msg
开发者ID:ruanyuemin,项目名称:pybossa,代码行数:12,代码来源:jobs.py
示例16: push_notification
def push_notification(project_id, **kwargs):
"""Send push notification."""
from pybossa.core import project_repo
project = project_repo.get(project_id)
if project.info.get('onesignal'):
app_id = current_app.config.get('ONESIGNAL_APP_ID')
api_key = current_app.config.get('ONESIGNAL_API_KEY')
client = PybossaOneSignal(app_id=app_id, api_key=api_key)
filters = [{"field": "tag", "key": project_id, "relation": "exists"}]
return client.push_msg(contents=kwargs['contents'],
headings=kwargs['headings'],
launch_url=kwargs['launch_url'],
web_buttons=kwargs['web_buttons'],
filters=filters)
开发者ID:fiorda,项目名称:pybossa,代码行数:14,代码来源:jobs.py
示例17: import_tasks
def import_tasks(tasks_info, app_id):
from pybossa.core import task_repo, project_repo
from flask import current_app
import pybossa.importers as importers
app = project_repo.get(app_id)
msg = importers.create_tasks(task_repo, tasks_info, app_id)
msg = msg + ' to your project %s!' % app.name
subject = 'Tasks Import to your project %s' % app.name
body = 'Hello,\n\n' + msg + '\n\nAll the best,\nThe %s team.' % current_app.config.get('BRAND')
mail_dict = dict(recipients=[app.owner.email_addr],
subject=subject, body=body)
send_mail(mail_dict)
return msg
开发者ID:alisheikh,项目名称:pybossa,代码行数:14,代码来源:jobs.py
示例18: send_weekly_stats_project
def send_weekly_stats_project(project_id):
from pybossa.cache.project_stats import update_stats, get_stats
from pybossa.core import project_repo
from datetime import datetime
project = project_repo.get(project_id)
if project.owner.subscribed is False:
return "Owner does not want updates by email"
update_stats(project_id)
dates_stats, hours_stats, users_stats = get_stats(project_id,
geo=True,
period='1 week')
subject = "Weekly Update: %s" % project.name
timeout = current_app.config.get('TIMEOUT')
# Max number of completed tasks
n_completed_tasks = 0
xy = zip(*dates_stats[3]['values'])
n_completed_tasks = max(xy[1])
# Most active day
xy = zip(*dates_stats[0]['values'])
active_day = [xy[0][xy[1].index(max(xy[1]))], max(xy[1])]
active_day[0] = datetime.fromtimestamp(active_day[0]/1000).strftime('%A')
body = render_template('/account/email/weeklystats.md',
project=project,
dates_stats=dates_stats,
hours_stats=hours_stats,
users_stats=users_stats,
n_completed_tasks=n_completed_tasks,
active_day=active_day,
config=current_app.config)
html = render_template('/account/email/weeklystats.html',
project=project,
dates_stats=dates_stats,
hours_stats=hours_stats,
users_stats=users_stats,
active_day=active_day,
n_completed_tasks=n_completed_tasks,
config=current_app.config)
mail_dict = dict(recipients=[project.owner.email_addr],
subject=subject,
body=body,
html=html)
job = dict(name=send_mail,
args=[mail_dict],
kwargs={},
timeout=timeout,
queue='high')
enqueue_job(job)
开发者ID:fiorda,项目名称:pybossa,代码行数:50,代码来源:jobs.py
示例19: get_autoimport_jobs
def get_autoimport_jobs(queue='low'):
"""Get autoimport jobs."""
from pybossa.core import project_repo
import pybossa.cache.projects as cached_projects
pro_user_projects = cached_projects.get_from_pro_user()
for project_dict in pro_user_projects:
project = project_repo.get(project_dict['id'])
if project.has_autoimporter():
job = dict(name=import_tasks,
args=[project.id],
kwargs=project.get_autoimporter(),
timeout=(10 * MINUTE),
queue=queue)
yield job
开发者ID:codeaudit,项目名称:pybossa,代码行数:14,代码来源:jobs.py
示例20: import_tasks
def import_tasks(project_id, from_auto=False, **form_data):
"""Import tasks for a project."""
from pybossa.core import project_repo
project = project_repo.get(project_id)
report = importer.create_tasks(task_repo, project_id, **form_data)
if from_auto:
form_data["last_import_meta"] = report.metadata
project.set_autoimporter(form_data)
project_repo.save(project)
msg = report.message + " to your project %s!" % project.name
subject = "Tasks Import to your project %s" % project.name
body = "Hello,\n\n" + msg + "\n\nAll the best,\nThe %s team." % current_app.config.get("BRAND")
mail_dict = dict(recipients=[project.owner.email_addr], subject=subject, body=body)
send_mail(mail_dict)
return msg
开发者ID:chandra1b,项目名称:pybossa,代码行数:16,代码来源:jobs.py
注:本文中的pybossa.core.project_repo.get函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论