• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testutil.get_task_params函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testutil.get_task_params函数的典型用法代码示例。如果您正苦于以下问题:Python get_task_params函数的具体用法?Python get_task_params怎么用?Python get_task_params使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_task_params函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_poll

  def test_poll(self):
    """A normal poll task."""
    self.assertEqual(0, models.Response.query().count())
    self.assertEqual([], self.taskqueue_stub.GetTasks('poll'))

    self.post_task()
    self.assertEqual(9, models.Response.query().count())
    self.assert_responses()

    source = self.sources[0].key.get()
    self.assertEqual(NOW, source.last_polled)

    tasks = self.taskqueue_stub.GetTasks('propagate')
    for task in tasks:
      self.assertEqual('/_ah/queue/propagate', task['url'])
    keys = set(ndb.Key(urlsafe=testutil.get_task_params(t)['response_key'])
               for t in tasks)
    self.assert_equals(keys, set(r.key for r in self.responses))

    tasks = self.taskqueue_stub.GetTasks('poll')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/_ah/queue/poll', tasks[0]['url'])
    self.assert_task_eta(FakeSource.FAST_POLL)
    params = testutil.get_task_params(tasks[0])
    self.assert_equals(source.key.urlsafe(), params['source_key'])
开发者ID:notenoughneon,项目名称:bridgy,代码行数:25,代码来源:tasks_test.py


示例2: assert_blogposts

  def assert_blogposts(self, expected):
    got = list(BlogPost.query())
    self.assert_entities_equal(expected, got, ignore=('created', 'updated'))

    tasks = self.taskqueue_stub.GetTasks('propagate-blogpost')
    self.assert_equals([{'key': post.key.urlsafe()} for post in expected],
                       [testutil.get_task_params(t) for t in tasks])
开发者ID:LennonFlores,项目名称:bridgy,代码行数:7,代码来源:test_superfeedr.py


示例3: assert_propagate_task

 def assert_propagate_task(self):
   tasks = self.taskqueue_stub.GetTasks('propagate')
   self.assertEqual(1, len(tasks))
   self.assertEqual(self.responses[0].key.urlsafe(),
                    testutil.get_task_params(tasks[0])['response_key'])
   self.assertEqual('/_ah/queue/propagate', tasks[0]['url'])
   self.taskqueue_stub.FlushQueue('propagate')
开发者ID:uniteddiversity,项目名称:bridgy,代码行数:7,代码来源:test_models.py


示例4: assert_propagate_task

 def assert_propagate_task(self, queue='propagate'):
   tasks = self.taskqueue_stub.GetTasks('propagate-blogpost')
   self.assertEqual(1, len(tasks))
   key = testutil.get_task_params(tasks[0])['key']
   self.assertEqual(self.blogposts[0].key, ndb.Key(urlsafe=key))
   self.assertEqual('/_ah/queue/propagate-blogpost', tasks[0]['url'])
   self.taskqueue_stub.FlushQueue('propagate-blogpost')
开发者ID:mblaney,项目名称:bridgy,代码行数:7,代码来源:test_models.py


示例5: test_create_new_already_exists

  def test_create_new_already_exists(self):
    long_ago = datetime.datetime(year=1901, month=2, day=3)
    props = {
      'created': long_ago,
      'last_webmention_sent': long_ago + datetime.timedelta(days=1),
      'last_polled': long_ago + datetime.timedelta(days=2),
      'last_hfeed_fetch': long_ago + datetime.timedelta(days=3),
      'last_syndication_url': long_ago + datetime.timedelta(days=4),
      'superfeedr_secret': 'asdfqwert',
      }
    FakeSource.new(None, features=['listen'], **props).put()
    self.assert_equals(['listen'], FakeSource.query().get().features)

    FakeSource.string_id_counter -= 1
    auth_entity = testutil.FakeAuthEntity(
      id='x', user_json=json.dumps({'url': 'http://foo.com/'}))
    auth_entity.put()
    self._test_create_new(auth_entity=auth_entity, features=['publish'])

    source = FakeSource.query().get()
    self.assert_equals(['listen', 'publish'], source.features)
    for prop, value in props.items():
      self.assert_equals(value, getattr(source, prop), prop)

    self.assert_equals(
      {"Updated fake (FakeSource). Try previewing a post from your web site!"},
      self.handler.messages)

    task_params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll')[0])
    self.assertEqual('1901-02-05-00-00-00', task_params['last_polled'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:30,代码来源:models_test.py


示例6: test_replace_poll_tasks

  def test_replace_poll_tasks(self):
    self.assertEqual([], self.taskqueue_stub.GetTasks('poll'))
    now = datetime.datetime.now()

    # a bunch of sources, one needs a new poll task
    five_min_ago = now - datetime.timedelta(minutes=5)
    day_and_half_ago = now - datetime.timedelta(hours=36)
    month_ago = now - datetime.timedelta(days=30)
    defaults = {
      'features': ['listen'],
      'last_webmention_sent': day_and_half_ago,
      }
    sources = [
      # doesn't need a new poll task
      FakeSource.new(None, last_poll_attempt=now, **defaults).put(),
      FakeSource.new(None, last_poll_attempt=five_min_ago, **defaults).put(),
      FakeSource.new(None, status='disabled', **defaults).put(),
      FakeSource.new(None, status='disabled', **defaults).put(),
      # need a new poll task
      FakeSource.new(None, status='enabled', **defaults).put(),
      # not signed up for listen
      FakeSource.new(None, last_webmention_sent=day_and_half_ago).put(),
      # never sent a webmention, past grace period. last polled is older than 2x
      # fast poll, but within 2x slow poll.
      FakeSource.new(None, features=['listen'], created=month_ago,
                     last_poll_attempt=day_and_half_ago).put(),
      ]
    resp = cron.application.get_response('/cron/replace_poll_tasks')
    self.assertEqual(200, resp.status_int)

    tasks = self.taskqueue_stub.GetTasks('poll')
    self.assertEqual(1, len(tasks))
    self.assert_equals(sources[4].urlsafe(),
                       testutil.get_task_params(tasks[0])['source_key'])
开发者ID:paulscallanjr,项目名称:bridgy,代码行数:34,代码来源:test_cron.py


示例7: test_discover_url_site_post_syndication_links

  def test_discover_url_site_post_syndication_links(self):
    self.expect_requests_get('http://si.te/123', """
<div class="h-entry">
  foo
  <a class="u-syndication" href="http://fa.ke/222"></a>
  <a class="u-syndication" href="http://other/silo"></a>
  <a class="u-syndication" href="http://fa.ke/post/444"></a>
</div>""")
    self.mox.ReplayAll()

    self.assertEqual(0, SyndicatedPost.query().count())
    self.check_discover('http://si.te/123',
        'Discovering now. Refresh in a minute to see the results!')

    self.assertItemsEqual([
      {'https://fa.ke/222': 'http://si.te/123'},
      {'https://fa.ke/post/444': 'http://si.te/123'},
      ], [{sp.syndication: sp.original} for sp in models.SyndicatedPost.query()])

    tasks = self.taskqueue_stub.GetTasks('discover')
    key = self.source.key.urlsafe()
    self.assertEqual([
      {'source_key': key, 'post_id': '222'},
      {'source_key': key, 'post_id': '444'},
    ], [testutil.get_task_params(task) for task in tasks])

    now = util.now_fn()
    source = self.source.key.get()
    self.assertEqual(now, source.last_syndication_url)
开发者ID:snarfed,项目名称:bridgy,代码行数:29,代码来源:test_app.py


示例8: test_subscribe

  def test_subscribe(self):
    expected = {
      'hub.mode': 'subscribe',
      'hub.topic': 'fake feed url',
      'hub.callback': 'http://localhost/fake/notify/foo.com',
      'format': 'json',
      'retrieve': 'true',
      }
    item_a = {'permalinkUrl': 'A', 'content': 'a http://a.com a'}
    item_b = {'permalinkUrl': 'B', 'summary': 'b http://b.com b'}
    feed = json.dumps({'items': [item_a, {}, item_b]})
    self.expect_requests_post(superfeedr.PUSH_API_URL, feed,
                              data=expected, auth=mox.IgnoreArg())
    self.mox.ReplayAll()

    superfeedr.subscribe(self.source, self.handler)

    posts = list(BlogPost.query())
    self.assert_entities_equal(
      [BlogPost(id='A', source=self.source.key, feed_item=item_a,
                unsent=['http://a.com']),
       BlogPost(id='B', source=self.source.key, feed_item=item_b,
                unsent=['http://b.com']),
       ], posts,
      ignore=('created', 'updated'))

    tasks = self.taskqueue_stub.GetTasks('propagate-blogpost')
    self.assert_equals([{'key': posts[0].key.urlsafe()},
                        {'key': posts[1].key.urlsafe()}],
                       [testutil.get_task_params(t) for t in tasks])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:30,代码来源:superfeedr_test.py


示例9: test_subscribe

    def test_subscribe(self):
        expected = {
            "hub.mode": "subscribe",
            "hub.topic": "fake feed url",
            "hub.callback": "http://localhost/fake/notify/foo.com",
            "format": "json",
            "retrieve": "true",
        }
        item_a = {"permalinkUrl": "A", "content": "a http://a.com a"}
        item_b = {"permalinkUrl": "B", "summary": "b http://b.com b"}
        feed = json.dumps({"items": [item_a, {}, item_b]})
        self.expect_requests_post(superfeedr.PUSH_API_URL, feed, data=expected, auth=mox.IgnoreArg())
        self.mox.ReplayAll()

        superfeedr.subscribe(self.source, self.handler)

        posts = list(BlogPost.query())
        self.assert_entities_equal(
            [
                BlogPost(id="A", source=self.source.key, feed_item=item_a, unsent=["http://a.com"]),
                BlogPost(id="B", source=self.source.key, feed_item=item_b, unsent=["http://b.com"]),
            ],
            posts,
            ignore=("created", "updated"),
        )

        tasks = self.taskqueue_stub.GetTasks("propagate-blogpost")
        self.assert_equals(
            [{"key": posts[0].key.urlsafe()}, {"key": posts[1].key.urlsafe()}],
            [testutil.get_task_params(t) for t in tasks],
        )
开发者ID:tantek,项目名称:bridgy,代码行数:31,代码来源:test_superfeedr.py


示例10: test_create_new

  def test_create_new(self):
    self.assertEqual(0, FakeSource.query().count())
    self._test_create_new(features=['listen'])
    msg = "Added fake (FakeSource). Refresh to see what we've found!"
    self.assert_equals({msg}, self.handler.messages)

    task_params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll')[0])
    self.assertEqual('1970-01-01-00-00-00', task_params['last_polled'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:8,代码来源:models_test.py


示例11: test_poll_now

    def test_poll_now(self):
        self.assertEqual([], self.taskqueue_stub.GetTasks("poll"))

        key = self.sources[0].key.urlsafe()
        resp = app.application.get_response("/poll-now", method="POST", body="key=" + key)
        self.assertEquals(302, resp.status_int)
        self.assertEquals(self.sources[0].bridgy_url(self.handler), resp.headers["Location"].split("#")[0])
        params = testutil.get_task_params(self.taskqueue_stub.GetTasks("poll-now")[0])
        self.assertEqual(key, params["source_key"])
开发者ID:tantek,项目名称:bridgy,代码行数:9,代码来源:test_app.py


示例12: test_do_refetch_hfeed

  def test_do_refetch_hfeed(self):
    """Emulate a situation where we've done posse-post-discovery earlier and
    found no rel=syndication relationships for a particular silo URL. Every
    two hours or so, we should refetch the author's page and check to see if
    any new syndication links have been added or updated.
    """
    self.sources[0].domain_urls = ['http://author']
    FakeAsSource.DOMAIN = 'source'
    self.sources[0].last_syndication_url = NOW - datetime.timedelta(minutes=10)
    self.sources[0].last_hfeed_fetch = NOW - datetime.timedelta(hours=2,
                                                                minutes=10)
    self.sources[0].put()

    # pretend we've already done posse-post-discovery for the source
    # and checked this permalink and found no back-links
    models.SyndicatedPost(parent=self.sources[0].key, original=None,
                          syndication='https://source/post/url').put()
    models.SyndicatedPost(parent=self.sources[0].key,
                          original='http://author/permalink',
                          syndication=None).put()

    # and all the status have already been sent
    for r in self.responses:
      r.status = 'complete'
      r.put()

    self.expect_requests_get('http://author', """
    <html class="h-feed">
      <a class="h-entry" href="/permalink"></a>
    </html>""")

    self.expect_requests_get('http://author/permalink', """
    <html class="h-entry">
      <a class="u-url" href="http://author/permalink"></a>
      <a class="u-syndication" href="http://source/post/url"></a>
    </html>""")

    self.mox.ReplayAll()
    self.post_task()

    # should have a new SyndicatedPost
    relationship = models.SyndicatedPost.query_by_original(
      self.sources[0], 'http://author/permalink')
    self.assertIsNotNone(relationship)
    self.assertEquals('https://source/post/url', relationship.syndication)

    # should repropagate all 9 responses
    tasks = self.taskqueue_stub.GetTasks('propagate')
    self.assertEquals(9, len(tasks))

    # and they should be in reverse creation order
    response_keys = [resp.key.urlsafe() for resp in self.responses]
    response_keys.reverse()
    task_keys = [testutil.get_task_params(task)['response_key']
                 for task in tasks]
    self.assertEquals(response_keys, task_keys)
开发者ID:notenoughneon,项目名称:bridgy,代码行数:56,代码来源:tasks_test.py


示例13: test_discover_url_silo_post

  def test_discover_url_silo_post(self):
    self.check_discover('http://fa.ke/123',
        'Discovering now. Refresh in a minute to see the results!')

    tasks = self.taskqueue_stub.GetTasks('discover')
    self.assertEqual(1, len(tasks))
    self.assertEqual({
      'source_key': self.source.key.urlsafe(),
      'post_id': '123',
    }, testutil.get_task_params(tasks[0]))
开发者ID:snarfed,项目名称:bridgy,代码行数:10,代码来源:test_app.py


示例14: test_poll_now

  def test_poll_now(self):
    self.assertEqual([], self.taskqueue_stub.GetTasks('poll'))

    key = self.sources[0].key.urlsafe()
    resp = app.application.get_response('/poll-now', method='POST', body='key=' + key)
    self.assertEquals(302, resp.status_int)
    self.assertEquals(self.sources[0].bridgy_url(self.handler),
                      resp.headers['Location'].split('#')[0])
    params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll')[0])
    self.assertEqual(key, params['source_key'])
开发者ID:notenoughneon,项目名称:bridgy,代码行数:10,代码来源:app_test.py


示例15: test_retry

  def test_retry(self):
    self.assertEqual([], self.taskqueue_stub.GetTasks('propagate'))

    source = self.sources[0]
    source.domain_urls = ['http://orig']
    source.last_hfeed_fetch = last_hfeed_fetch = \
        testutil.NOW - datetime.timedelta(minutes=1)
    source.put()

    resp = self.responses[0]
    resp.status = 'complete'
    resp.unsent = ['http://unsent']
    resp.sent = ['http://sent']
    resp.error = ['http://error']
    resp.failed = ['http://failed']
    resp.skipped = ['https://skipped']

    # SyndicatedPost with new target URLs
    resp.activities_json = [
      json.dumps({'object': {'url': 'https://silo/1'}}),
      json.dumps({'url': 'https://silo/2', 'object': {'unused': 'ok'}}),
      json.dumps({'url': 'https://silo/3'}),
    ]
    resp.put()
    models.SyndicatedPost.insert(source, 'https://silo/1', 'https://orig/1')
    models.SyndicatedPost.insert(source, 'https://silo/2', 'http://orig/2')
    models.SyndicatedPost.insert(source, 'https://silo/3', 'http://orig/3')

    # cached webmention endpoint
    memcache.set('W https skipped', 'asdf')

    key = resp.key.urlsafe()
    response = app.application.get_response(
      '/retry', method='POST', body='key=' + key)
    self.assertEquals(302, response.status_int)
    self.assertEquals(source.bridgy_url(self.handler),
                      response.headers['Location'].split('#')[0])
    params = testutil.get_task_params(self.taskqueue_stub.GetTasks('propagate')[0])
    self.assertEqual(key, params['response_key'])

    # status and URLs should be refreshed
    got = resp.key.get()
    self.assertEqual('new', got.status)
    self.assertItemsEqual(
      ['http://unsent', 'http://sent', 'https://skipped', 'http://error',
       'http://failed', 'https://orig/1', 'http://orig/2', 'http://orig/3'],
      got.unsent)
    for field in got.sent, got.skipped, got.error, got.failed:
      self.assertEqual([], field)

    # webmention endpoints for URL domains should be refreshed
    self.assertIsNone(memcache.get('W https skipped'))

    # shouldn't have refetched h-feed
    self.assertEqual(last_hfeed_fetch, source.key.get().last_hfeed_fetch)
开发者ID:kylewm,项目名称:bridgy,代码行数:55,代码来源:test_app.py


示例16: test_non_public_posts

  def test_non_public_posts(self):
    """Only posts with to: @public should be propagated."""
    del self.activities[0]['object']['to']
    self.activities[1]['object']['to'] = [{'objectType':'group', 'alias':'@private'}]
    self.activities[2]['object']['to'] = [{'objectType':'group', 'alias':'@public'}]

    self.post_task()
    ids = set()
    for task in self.taskqueue_stub.GetTasks('propagate'):
      resp_key = ndb.Key(urlsafe=testutil.get_task_params(task)['response_key'])
      ids.update(json.loads(a)['id'] for a in resp_key.get().activities_json)
    self.assert_equals(ids, set([self.activities[0]['id'], self.activities[2]['id']]))
开发者ID:notenoughneon,项目名称:bridgy,代码行数:12,代码来源:tasks_test.py


示例17: _test_create_new

  def _test_create_new(self, **kwargs):
    FakeSource.create_new(self.handler, domains=['foo'],
                          domain_urls=['http://foo.com'],
                          webmention_endpoint='http://x/y',
                          **kwargs)
    self.assertEqual(1, FakeSource.query().count())

    tasks = self.taskqueue_stub.GetTasks('poll')
    self.assertEqual(1, len(tasks))
    source = FakeSource.query().get()
    self.assertEqual('/_ah/queue/poll', tasks[0]['url'])
    self.assertEqual(source.key.urlsafe(),
                     testutil.get_task_params(tasks[0])['source_key'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:13,代码来源:models_test.py


示例18: test_crawl_now

    def test_crawl_now(self):
        source = self.sources[0]
        source.domain_urls = ["http://orig"]
        source.last_hfeed_refetch = testutil.NOW
        source.put()

        key = source.key.urlsafe()
        response = app.application.get_response("/crawl-now", method="POST", body="key=%s" % key)
        self.assertEquals(source.bridgy_url(self.handler), response.headers["Location"].split("#")[0])
        self.assertEquals(302, response.status_int)

        params = testutil.get_task_params(self.taskqueue_stub.GetTasks("poll-now")[0])
        self.assertEqual(key, params["source_key"])
        self.assertEqual(models.REFETCH_HFEED_TRIGGER, source.key.get().last_hfeed_refetch)
开发者ID:tantek,项目名称:bridgy,代码行数:14,代码来源:test_app.py


示例19: test_crawl_now

  def test_crawl_now(self):
    source = self.sources[0]
    source.domain_urls = ['http://orig']
    source.last_hfeed_fetch = testutil.NOW
    source.put()

    key = source.key.urlsafe()
    response = app.application.get_response(
      '/crawl-now', method='POST', body='key=%s' % key)
    self.assertEquals(source.bridgy_url(self.handler),
                      response.headers['Location'].split('#')[0])
    self.assertEquals(302, response.status_int)

    params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll-now')[0])
    self.assertEqual(key, params['source_key'])
    self.assertEqual(models.REFETCH_HFEED_TRIGGER, source.key.get().last_hfeed_fetch)
开发者ID:kylewm,项目名称:bridgy,代码行数:16,代码来源:test_app.py


示例20: test_handle_feed

    def test_handle_feed(self):
        item_a = {"permalinkUrl": "A", "content": "a http://a.com http://foo.com/self/link b"}
        superfeedr.handle_feed(json.dumps({"items": [item_a]}), self.source)

        posts = list(BlogPost.query())
        self.assert_entities_equal(
            [
                BlogPost(id="A", source=self.source.key, feed_item=item_a, unsent=["http://a.com"])
            ],  # self link should be discarded
            posts,
            ignore=("created", "updated"),
        )

        tasks = self.taskqueue_stub.GetTasks("propagate-blogpost")
        self.assertEqual(1, len(tasks))
        self.assert_equals(posts[0].key.urlsafe(), testutil.get_task_params(tasks[0])["key"])
开发者ID:tantek,项目名称:bridgy,代码行数:16,代码来源:test_superfeedr.py



注:本文中的testutil.get_task_params函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testutil.FakeSource类代码示例发布时间:2022-05-27
下一篇:
Python testutil.get_data_file_name函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap