本文整理汇总了Python中models.Response类的典型用法代码示例。如果您正苦于以下问题:Python Response类的具体用法?Python Response怎么用?Python Response使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Response类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: take_survey
def take_survey(request, survey_id):
survey = get_object_or_404(Survey, pk=survey_id)
questions = survey.question_set.all()
AnswerFormset = get_answer_formset(questions)
formset = AnswerFormset(request.POST or None, queryset=Answer.objects.none())
if formset.is_valid():
answers = formset.save(commit=False)
response = Response(survey=survey)
response.save()
for answer in answers:
answer.response = response
answer.save()
return HttpResponseRedirect(reverse('simple_survey.views.list_surveys'))
for index in range(len(questions)):
question = questions[index]
form = formset.forms[index]
form.fields['answer'].label = question.question
form.fields['question'].initial = question
responseParameters = {
"survey" : survey,
"formset" : formset,
}
return render_to_response('simple_survey/take_survey.html', responseParameters, context_instance=RequestContext(request))
开发者ID:MegaMark16,项目名称:django-simple-survey,代码行数:26,代码来源:views.py
示例2: post
def post(self):
newUser = Responder(name = self.request.get('name'), email = self.request.get('email'), company = self.request.get('company'))
newUser.put()
set = ResponseSet(product = 'ADSync', responder = newUser)
set.put()
adQuestions = Question.gql('WHERE product = :1', 'ADSync')
htmlBody = '<h2>Response to ADSync Questionnaire</h2><p><i>Submitted by ' + newUser.name +', ' + newUser.email + '</i></p>'
for adQuestion in adQuestions:
responseText = self.request.get('response' + str(adQuestion.key().id()))
response = Response(text = responseText, question = adQuestion, responseSet = set)
response.put()
htmlBody += '<h3>' + adQuestion.text + '</h3>' + '<p>' + response.text + '</p>'
#send email notification
sender = '[email protected]'
recipients = ['[email protected]', '[email protected]', '[email protected]']
sub = newUser.name + ' from ' + newUser.company + ' responded to the ADSync Questionnaire'
plainBody = 'Get response here: http://yammerie.appspot.com/responsesets?id=' + str(set.key().id())
mail.send_mail(sender, recipients, sub, plainBody, html = htmlBody)
self.redirect('/adsuccess')
开发者ID:nmccarthy,项目名称:Yammer-Integration-Questionnaire,代码行数:26,代码来源:questionnaire.py
示例3: _on_rmq_message
def _on_rmq_message(self, *args, **kwargs):
if 'rmq_message' in kwargs:
rmq_message = kwargs['rmq_message']
else:
return
if rmq_message is not None:
channel, user, message, plugin_name, plugin_response = \
self._process_rmq_message(rmq_message)
if channel is not None:
if user is not None:
if message is not None:
if plugin_name is not None:
if plugin_response is not None:
is_approved = True if channel.is_secure is False \
else False
response = Response(
text=plugin_response,
from_plugin=plugin_name,
in_response_to=message,
to_channel=channel,
to_user=user,
is_approved=is_approved,
is_sent=False,
)
response.save()
开发者ID:gabeos,项目名称:slackotron,代码行数:25,代码来源:slackotron.py
示例4: answer
def answer(request, poll_id, user_uuid):
poll = get_object_or_404(Poll, pk=poll_id)
participant = get_object_or_404(Participant, unique_id=user_uuid)
if participant.completed:
return redirect(reverse('polls:thanks',
args=(poll.id,
user_uuid,)))
questions = poll.question_set.all()
if request.method == 'POST':
form = DetailForm(request.POST, questions=questions)
if form.is_valid():
to_delete = Response.objects.filter(participant=participant)
to_delete.delete()
for choice_id in form.answers():
response = Response(participant=participant,
choice_id=choice_id)
response.save()
return HttpResponseRedirect(reverse('polls:sign',
args=(poll.id,
user_uuid,)))
else:
form = DetailForm(questions=questions)
return render(request, 'polls/detail.html',
{
'poll': poll,
'form': form,
'user_uuid': user_uuid,
})
开发者ID:grnet,项目名称:nosmoking,代码行数:28,代码来源:views.py
示例5: volumedown
def volumedown():
"""Decreases the Sonos volume by 10"""
sonos.volume = sonos.volume - 10;
resp = Response("Sonos volume now set to " + str(sonos.volume))
resp.volume = sonos.volume
return jsonpickle.encode(resp, unpicklable=False)
开发者ID:nansen,项目名称:sonos-api-server,代码行数:8,代码来源:index.py
示例6: volume
def volume():
"""Returns (GET) or sets (POST) the current Sonos volume (0 - 100)"""
if request.method == 'POST':
sonos.volume = request.form.get('volume')
resp = Response("Sonos volume now set to " + str(sonos.volume))
resp.volume = sonos.volume
return jsonpickle.encode(resp, unpicklable=False)
开发者ID:nansen,项目名称:sonos-api-server,代码行数:9,代码来源:index.py
示例7: test_hooks
def test_hooks(self):
resp = Response(id='x', activity_json='{"foo": "bar"}')
self.assertRaises(AssertionError, resp.put)
pre_put = Response._pre_put_hook
del Response._pre_put_hook
resp.put()
Response._pre_put_hook = pre_put
got = resp.key.get()
self.assertEqual(['{"foo": "bar"}'], got.activities_json)
self.assertIsNone(got.activity_json)
开发者ID:sanduhrs,项目名称:bridgy,代码行数:11,代码来源:models_test.py
示例8: respond
def respond(request):
"""
Request handler when someone posts a response
1. Add response content to the database
2. Send push notification to client device
3. Update the credit of the responder
"""
if request.method == 'POST':
json_data = json.loads(request.body)
try:
thread_id = json_data['thread_id']
response_content = json_data['content']
device_id = json_data['device_id']
except KeyError:
print "Error: A posted response did not have a JSON object with the required properties"
else:
# check that the thread id and the device ids are valid
thread = Thread.objects.filter(id=thread_id)
device = Device.objects.filter(device_id=device_id)
print "Passed parameter validation"
print thread.count()
print device.count()
if thread.exists() and device.exists():
# add response to database
response = Response(thread=thread[0], responder_device=device[0], response_content=response_content)
response.save()
# add update to the other device
asker_device = thread[0].asker_device
answerer_device = thread[0].answerer_device
print "Thread and device actually exist"
print device_id
print asker_device.device_id
print answerer_device.device_id
if asker_device.device_id == device_id:
ResponseUpdates.add_update(answerer_device, response)
print "Adding an update to the answerers queue"
elif answerer_device.device_id == device_id:
ResponseUpdates.add_update(asker_device, response)
print "Adding an update to the askers queue"
return HttpResponse(json.dumps({}), content_type="application/json")
开发者ID:spencerwhyte,项目名称:RSpeak,代码行数:48,代码来源:views.py
示例9: api_view_request
def api_view_request(request_rid):
check_admin()
request = Request.find_by('where rid = ?', request_rid)
response = Response.find_by('where rid = ?', request_rid)
if request is None or response is None:
raise notfound()
return dict(request=content_escape(request), response=html_encode(response))
开发者ID:akz747,项目名称:NagaScan,代码行数:7,代码来源:urls.py
示例10: get
def get(self):
qId = int(self.request.get('id'))
question = Question.get_by_id(qId)
responses = Response.all()
responses.filter("question =", question)
values = {'responses': responses, 'question': question}
self.response.out.write(template.render('templates/responses.html', values))
开发者ID:nmccarthy,项目名称:Yammer-Integration-Questionnaire,代码行数:8,代码来源:questionnaire.py
示例11: refetch_hfeed
def refetch_hfeed(self, source):
"""refetch and reprocess the author's url, looking for
new or updated syndication urls that we may have missed the first
time we looked for them.
"""
logging.debug('refetching h-feed for source %s', source.label())
relationships = original_post_discovery.refetch(source)
if not relationships:
return
logging.debug('refetch h-feed found %d new rel=syndication relationships',
len(relationships))
# grab the Responses and see if any of them have a a syndication
# url matching one of the newly discovered relationships. We'll
# check each response until we've seen all of them or until
# the 60s timer runs out.
# TODO maybe add a (canonicalized) url field to Response so we can
# query by it instead of iterating over all of them
for response in (Response.query(Response.source == source.key)
.order(-Response.created)):
if response.activity_json: # handle old entities
response.activities_json.append(response.activity_json)
response.activity_json = None
new_orig_urls = set()
for activity_json in response.activities_json:
activity = json.loads(activity_json)
activity_url = activity.get('url') or activity.get('object', {}).get('url')
if not activity_url:
logging.warning('activity has no url %s', activity_json)
continue
activity_url = source.canonicalize_syndication_url(activity_url)
# look for activity url in the newly discovered list of relationships
for relationship in relationships.get(activity_url, []):
# won't re-propagate if the discovered link is already among
# these well-known upstream duplicates
if relationship.original in response.sent:
logging.info(
'%s found a new rel=syndication link %s -> %s, but the '
'relationship had already been discovered by another method',
response.label(), relationship.original,
relationship.syndication)
else:
logging.info(
'%s found a new rel=syndication link %s -> %s, and '
'will be repropagated with a new target!',
response.label(), relationship.original,
relationship.syndication)
new_orig_urls.add(relationship.original)
if new_orig_urls:
# re-open a previously 'complete' propagate task
response.status = 'new'
response.unsent.extend(list(new_orig_urls))
response.put()
response.add_task()
开发者ID:dev511,项目名称:bridgy,代码行数:58,代码来源:tasks.py
示例12: test_get_type
def test_get_type(self):
self.assertEqual('repost', Response.get_type(
{'objectType': 'activity', 'verb': 'share'}))
self.assertEqual('rsvp', Response.get_type({'verb': 'rsvp-no'}))
self.assertEqual('rsvp', Response.get_type({'verb': 'invite'}))
self.assertEqual('comment', Response.get_type({'objectType': 'comment'}))
self.assertEqual('post', Response.get_type({'verb': 'post'}))
self.assertEqual('post', Response.get_type({'objectType': 'event'}))
self.assertEqual('post', Response.get_type({'objectType': 'image'}))
self.assertEqual('comment', Response.get_type({
'objectType': 'note',
'context': {'inReplyTo': {'foo': 'bar'}},
}))
self.assertEqual('comment', Response.get_type({
'objectType': 'comment',
'verb': 'post',
}))
开发者ID:lcorbasson,项目名称:bridgy,代码行数:17,代码来源:test_models.py
示例13: add_response
def add_response():
username = request.form.get('username')
username = username or 'anonymous'
board_id = request.form.get('board_id')
items = request.form.getlist('item')
# print 'Items: {} // {}'.format(type(items), items)
items = [int(x) for x in items if x]
# print 'Items: {} // {}'.format(type(items), items)
response = Response(username=username, board_id=int(board_id),
items=items)
saved = response.save()
# print 'saved response? : {}'.format(saved)
if saved == True:
flash('Response saved. Thanks {}.'.format(username))
else:
flash('Could not save response')
return redirect(url_for('response', response_id=response.id))
开发者ID:t20,项目名称:skore,代码行数:21,代码来源:app.py
示例14: repropagate_old_responses
def repropagate_old_responses(self, source, relationships):
"""Find old Responses that match a new SyndicatedPost and repropagate them.
We look through as many responses as we can until the datastore query expires.
Args:
source: :class:`models.Source`
relationships: refetch result
"""
for response in Response.query(Response.source == source.key).order(-Response.updated):
new_orig_urls = set()
for activity_json in response.activities_json:
activity = json.loads(activity_json)
activity_url = activity.get("url") or activity.get("object", {}).get("url")
if not activity_url:
logging.warning("activity has no url %s", activity_json)
continue
activity_url = source.canonicalize_url(activity_url, activity=activity)
if not activity_url:
continue
# look for activity url in the newly discovered list of relationships
for relationship in relationships.get(activity_url, []):
# won't re-propagate if the discovered link is already among
# these well-known upstream duplicates
if relationship.original in response.sent or relationship.original in response.original_posts:
logging.info(
"%s found a new rel=syndication link %s -> %s, but the "
"relationship had already been discovered by another method",
response.label(),
relationship.original,
relationship.syndication,
)
else:
logging.info(
"%s found a new rel=syndication link %s -> %s, and "
"will be repropagated with a new target!",
response.label(),
relationship.original,
relationship.syndication,
)
new_orig_urls.add(relationship.original)
if new_orig_urls:
# re-open a previously 'complete' propagate task
response.status = "new"
response.unsent.extend(list(new_orig_urls))
response.put()
response.add_task()
开发者ID:snarfed,项目名称:bridgy,代码行数:50,代码来源:tasks.py
示例15: post
def post(self, request):
if request.POST.get('response', False):
response = Response()
question = Question.objects.get(id=request.POST['question'])
response.message = request.POST['response']
response.question = question
response.user = request.user
response.save()
return HttpResponseRedirect(request.META['HTTP_REFERER'])
开发者ID:portyaninoleh,项目名称:steelkiwi,代码行数:9,代码来源:views.py
示例16: test_get_or_save
def test_get_or_save(self):
response = self.responses[0]
self.assertEqual(0, Response.query().count())
self.assert_no_propagate_task()
# new. should add a propagate task.
saved = response.get_or_save(self.sources[0])
self.assertEqual(response.key, saved.key)
self.assertEqual(response.source, saved.source)
self.assertEqual('comment', saved.type)
self.assertEqual([], saved.old_response_jsons)
self.assert_propagate_task()
# existing. no new task.
same = saved.get_or_save(self.sources[0])
self.assert_entities_equal(saved, same)
self.assert_no_propagate_task()
开发者ID:uniteddiversity,项目名称:bridgy,代码行数:17,代码来源:test_models.py
示例17: repropagate_old_responses
def repropagate_old_responses(self, source, relationships):
"""Find old Responses that match a new SyndicatedPost and repropagate them.
We look through as many responses as we can until the datastore query expires.
"""
for response in (Response.query(Response.source == source.key)
.order(-Response.updated)):
if response.activity_json: # handle old entities
response.activities_json.append(response.activity_json)
response.activity_json = None
new_orig_urls = set()
for activity_json in response.activities_json:
activity = json.loads(activity_json)
activity_url = activity.get('url') or activity.get('object', {}).get('url')
if not activity_url:
logging.warning('activity has no url %s', activity_json)
continue
activity_url = source.canonicalize_syndication_url(activity_url,
activity=activity)
# look for activity url in the newly discovered list of relationships
for relationship in relationships.get(activity_url, []):
# won't re-propagate if the discovered link is already among
# these well-known upstream duplicates
if (relationship.original in response.sent or
relationship.original in response.original_posts):
logging.info(
'%s found a new rel=syndication link %s -> %s, but the '
'relationship had already been discovered by another method',
response.label(), relationship.original,
relationship.syndication)
else:
logging.info(
'%s found a new rel=syndication link %s -> %s, and '
'will be repropagated with a new target!',
response.label(), relationship.original,
relationship.syndication)
new_orig_urls.add(relationship.original)
if new_orig_urls:
# re-open a previously 'complete' propagate task
response.status = 'new'
response.unsent.extend(list(new_orig_urls))
response.put()
response.add_task()
开发者ID:tantek,项目名称:bridgy,代码行数:46,代码来源:tasks.py
示例18: template_vars
def template_vars(self):
responses = []
# Find the most recently propagated responses with error URLs
for r in Response.query().order(-Response.updated):
if (len(responses) >= self.NUM_RESPONSES or
r.updated < datetime.datetime.now() - datetime.timedelta(hours=1)):
break
elif not r.error or r.status == 'complete':
continue
# r.source = r.source.get()
r.links = [util.pretty_link(u, new_tab=True) for u in r.error + r.failed]
r.response = json.loads(r.response_json)
r.activities = [json.loads(a) for a in r.activities_json]
responses.append(r)
responses.sort(key=lambda r: (r.source, r.activities, r.response))
return {'responses': responses}
开发者ID:notenoughneon,项目名称:bridgy,代码行数:20,代码来源:admin.py
示例19: test_get_or_save
def test_get_or_save(self):
self.sources[0].put()
response = self.responses[0]
self.assertEqual(0, Response.query().count())
self.assertEqual(0, len(self.taskqueue_stub.GetTasks('propagate')))
# new. should add a propagate task.
saved = response.get_or_save()
self.assertEqual(response.key, saved.key)
self.assertEqual(response.source, saved.source)
self.assertEqual('comment', saved.type)
tasks = self.taskqueue_stub.GetTasks('propagate')
self.assertEqual(1, len(tasks))
self.assertEqual(response.key.urlsafe(),
testutil.get_task_params(tasks[0])['response_key'])
self.assertEqual('/_ah/queue/propagate', tasks[0]['url'])
# existing. no new task.
same = saved.get_or_save()
self.assertEqual(saved.source, same.source)
self.assertEqual(1, len(tasks))
开发者ID:sanduhrs,项目名称:bridgy,代码行数:23,代码来源:models_test.py
示例20: template_vars
def template_vars(self):
if not self.source:
return {}
vars = super(UserHandler, self).template_vars()
vars.update({
'source': self.source,
'epoch': util.EPOCH,
})
# Blog webmention promos
if 'webmention' not in self.source.features:
if self.source.SHORT_NAME in ('blogger', 'tumblr', 'wordpress'):
vars[self.source.SHORT_NAME + '_promo'] = True
else:
for domain in self.source.domains:
if ('.blogspot.' in domain and # Blogger uses country TLDs
not Blogger.query(Blogger.domains == domain).get()):
vars['blogger_promo'] = True
elif (domain.endswith('tumblr.com') and
not Tumblr.query(Tumblr.domains == domain).get()):
vars['tumblr_promo'] = True
elif (domain.endswith('wordpress.com') and
not WordPress.query(WordPress.domains == domain).get()):
vars['wordpress_promo'] = True
# Responses
if 'listen' in self.source.features:
vars['responses'] = []
for i, r in enumerate(Response.query()
.filter(Response.source == self.source.key)\
.order(-Response.updated)):
r.response = json.loads(r.response_json)
if r.activity_json: # handle old entities
r.activities_json.append(r.activity_json)
r.activities = [json.loads(a) for a in r.activities_json]
if (not gr_source.Source.is_public(r.response) or
not all(gr_source.Source.is_public(a) for a in r.activities)):
continue
r.actor = r.response.get('author') or r.response.get('actor', {})
if not r.response.get('content'):
phrases = {
'like': 'liked this',
'repost': 'reposted this',
'rsvp-yes': 'is attending',
'rsvp-no': 'is not attending',
'rsvp-maybe': 'might attend',
'invite': 'is invited',
}
r.response['content'] = '%s %s.' % (
r.actor.get('displayName') or '',
phrases.get(r.type) or phrases.get(r.response.get('verb')))
# convert image URL to https if we're serving over SSL
image_url = r.actor.setdefault('image', {}).get('url')
if image_url:
r.actor['image']['url'] = util.update_scheme(image_url, self)
# generate original post links
r.links = self.process_webmention_links(r)
vars['responses'].append(r)
if len(vars['responses']) >= 10 or i > 200:
break
# Publishes
if 'publish' in self.source.features:
publishes = Publish.query().filter(Publish.source == self.source.key)\
.order(-Publish.updated)\
.fetch(10)
for p in publishes:
p.pretty_page = util.pretty_link(
p.key.parent().id(), a_class='original-post', new_tab=True)
vars['publishes'] = publishes
if 'webmention' in self.source.features:
# Blog posts
blogposts = BlogPost.query().filter(BlogPost.source == self.source.key)\
.order(-BlogPost.created)\
.fetch(10)
for b in blogposts:
b.links = self.process_webmention_links(b)
try:
text = b.feed_item.get('title')
except ValueError:
text = None
b.pretty_url = util.pretty_link(b.key.id(), text=text,
a_class='original-post', max_length=40,
new_tab=True)
# Blog webmentions
webmentions = BlogWebmention.query()\
.filter(BlogWebmention.source == self.source.key)\
.order(-BlogWebmention.updated)\
.fetch(10)
for w in webmentions:
w.pretty_source = util.pretty_link(w.source_url(), a_class='original-post',
#.........这里部分代码省略.........
开发者ID:uniteddiversity,项目名称:bridgy,代码行数:101,代码来源:app.py
注:本文中的models.Response类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论