• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testutil.FakeSource类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testutil.FakeSource的典型用法代码示例。如果您正苦于以下问题:Python FakeSource类的具体用法?Python FakeSource怎么用?Python FakeSource使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了FakeSource类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_rate_limiting_errors

  def test_rate_limiting_errors(self):
    """Finish the task on rate limiting errors."""
    try:
      for err in (InstagramAPIError('503', 'Rate limited', '...'),
                  apiclient.errors.HttpError(httplib2.Response({'status': 429}), ''),
                  urllib2.HTTPError('url', 403, 'msg', {}, None)):
        self.mox.UnsetStubs()
        self.mox.StubOutWithMock(FakeSource, 'get_activities_response')
        FakeSource.get_activities_response(
          count=mox.IgnoreArg(), fetch_replies=True, fetch_likes=True,
          fetch_shares=True, etag=None, min_id=None, cache=mox.IgnoreArg(),
          ).AndRaise(err)
        self.mox.ReplayAll()

        self.post_task()
        source = self.sources[0].key.get()
        self.assertEqual('error', source.status)
        self.mox.VerifyAll()

        # should have inserted a new poll task
        polls = self.taskqueue_stub.GetTasks('poll')
        self.assertEqual(1, len(polls))
        self.assertEqual('/_ah/queue/poll', polls[0]['url'])
        polls = self.taskqueue_stub.FlushQueue('poll')

    finally:
      self.mox.UnsetStubs()
开发者ID:notenoughneon,项目名称:bridgy,代码行数:27,代码来源:tasks_test.py


示例2: test_create_new_domain

  def test_create_new_domain(self):
    """If the source has a URL set, extract its domain."""
    for user_json in None, {}, {'url': 'not<a>url'}, {'url': 'http://t.co/foo'}:
      auth_entity = None
      if user_json is not None:
        auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(user_json))
        auth_entity.put()
      source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
      self.assertEqual([], source.domains)
      self.assertEqual([], source.domain_urls)

    # good URLs
    for url in ('http://foo.com/bar', 'https://www.foo.com/bar',
                'http://foo.com/\nhttp://baz.com/',
                'http://FoO.cOm',  # should be normalized to lowercase
                ):
      auth_entity = testutil.FakeAuthEntity(
        id='x', user_json=json.dumps({'url': url}))
      auth_entity.put()
      source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
      self.assertEquals([url.split('\n')[0]], source.domain_urls)
      self.assertEquals(['foo.com'], source.domains)

    # also look in urls field
    auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(
        {'url': 'not<a>url',
         'urls': [{'value': 'also<not>'}, {'value': 'http://foo.com/'}],
         }))
    auth_entity.put()
    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://foo.com/'], source.domain_urls)
    self.assertEquals(['foo.com'], source.domains)
开发者ID:notenoughneon,项目名称:bridgy,代码行数:32,代码来源:models_test.py


示例3: test_create_new_already_exists

  def test_create_new_already_exists(self):
    long_ago = datetime.datetime(year=1901, month=2, day=3)
    props = {
      'created': long_ago,
      'last_webmention_sent': long_ago + datetime.timedelta(days=1),
      'last_polled': long_ago + datetime.timedelta(days=2),
      'last_hfeed_fetch': long_ago + datetime.timedelta(days=3),
      'last_syndication_url': long_ago + datetime.timedelta(days=4),
      'superfeedr_secret': 'asdfqwert',
      }
    FakeSource.new(None, features=['listen'], **props).put()
    self.assert_equals(['listen'], FakeSource.query().get().features)

    FakeSource.string_id_counter -= 1
    auth_entity = testutil.FakeAuthEntity(
      id='x', user_json=json.dumps({'url': 'http://foo.com/'}))
    auth_entity.put()
    self._test_create_new(auth_entity=auth_entity, features=['publish'])

    source = FakeSource.query().get()
    self.assert_equals(['listen', 'publish'], source.features)
    for prop, value in props.items():
      self.assert_equals(value, getattr(source, prop), prop)

    self.assert_equals(
      {"Updated fake (FakeSource). Try previewing a post from your web site!"},
      self.handler.messages)

    task_params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll')[0])
    self.assertEqual('1901-02-05-00-00-00', task_params['last_polled'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:30,代码来源:models_test.py


示例4: test_create_new_webmention

  def test_create_new_webmention(self):
    """We should subscribe to webmention sources in Superfeedr."""
    self.expect_requests_get('http://primary/', 'no webmention endpoint',
                             verify=False)
    self.mox.StubOutWithMock(superfeedr, 'subscribe')
    superfeedr.subscribe(mox.IsA(FakeSource), self.handler)

    self.mox.ReplayAll()
    FakeSource.create_new(self.handler, features=['webmention'],
                          domains=['primary/'], domain_urls=['http://primary/'])
开发者ID:uniteddiversity,项目名称:bridgy,代码行数:10,代码来源:test_models.py


示例5: test_create_new_domain

  def test_create_new_domain(self):
    """If the source has a URL set, extract its domain."""
    # bad URLs
    for user_json in (None, {}, {'url': 'not<a>url'},
                      # t.co is in the webmention blacklist
                      {'url': 'http://t.co/foo'}):
      auth_entity = None
      if user_json is not None:
        auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(user_json))
        auth_entity.put()
      source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
      self.assertEqual([], source.domains)
      self.assertEqual([], source.domain_urls)

    self.expect_requests_get('http://foo.com')
    self.expect_requests_get('https://www.foo.com')
    self.expect_requests_get('https://baj')
    self.mox.ReplayAll()
    # good URLs
    for url in ('http://foo.com/bar', 'https://www.foo.com/bar',
                'http://FoO.cOm/',  # should be normalized to lowercase
                ):
      auth_entity = testutil.FakeAuthEntity(
        id='x', user_json=json.dumps({'url': url}))
      auth_entity.put()
      source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
      self.assertEquals([url.lower()], source.domain_urls)
      self.assertEquals(['foo.com'], source.domains)

    # multiple good URLs and one that's in the webmention blacklist
    auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps({
          'url': 'http://foo.org',
          'urls': [{'value': u} for u in
                   'http://bar.com', 'http://t.co/x', 'http://baz',
                   # utm_* query params should be stripped
                   'https://baj/biff?utm_campaign=x&utm_source=y'],
          }))
    auth_entity.put()
    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://foo.org/', 'http://bar.com/', 'http://baz/',
                       'https://baj/biff'],
                      source.domain_urls)
    self.assertEquals(['foo.org', 'bar.com', 'baz', 'baj'], source.domains)

    # a URL that redirects
    auth_entity = testutil.FakeAuthEntity(
      id='x', user_json=json.dumps({'url': 'http://orig'}))
    auth_entity.put()

    self.expect_requests_head('http://orig', redirected_url='http://final')
    self.mox.ReplayAll()

    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://final/'], source.domain_urls)
    self.assertEquals(['final'], source.domains)
开发者ID:mblaney,项目名称:bridgy,代码行数:55,代码来源:test_models.py


示例6: test_poll_error

  def test_poll_error(self):
    """If anything goes wrong, the source status should be set to 'error'."""
    self.mox.StubOutWithMock(FakeSource, 'get_activities_response')
    FakeSource.get_activities_response(
      count=mox.IgnoreArg(), fetch_replies=True, fetch_likes=True,
      fetch_shares=True, etag=None, min_id=None, cache=mox.IgnoreArg(),
      ).AndRaise(Exception('foo'))
    self.mox.ReplayAll()

    self.assertRaises(Exception, self.post_task)
    source = self.sources[0].key.get()
    self.assertEqual('error', source.status)
开发者ID:notenoughneon,项目名称:bridgy,代码行数:12,代码来源:tasks_test.py


示例7: _test_create_new

  def _test_create_new(self, **kwargs):
    FakeSource.create_new(self.handler, domains=['foo'],
                          domain_urls=['http://foo.com'],
                          webmention_endpoint='http://x/y',
                          **kwargs)
    self.assertEqual(1, FakeSource.query().count())

    tasks = self.taskqueue_stub.GetTasks('poll')
    self.assertEqual(1, len(tasks))
    source = FakeSource.query().get()
    self.assertEqual('/_ah/queue/poll', tasks[0]['url'])
    self.assertEqual(source.key.urlsafe(),
                     testutil.get_task_params(tasks[0])['source_key'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:13,代码来源:models_test.py


示例8: test_create_new_rereads_domains

  def test_create_new_rereads_domains(self):
    FakeSource.new(None, features=['listen'],
                   domain_urls=['http://foo'], domains=['foo']).put()

    FakeSource.string_id_counter -= 1
    auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(
        {'urls': [{'value': 'http://bar'}, {'value': 'http://baz'}]}))
    self.expect_requests_get('http://bar', 'no webmention endpoint',
                             verify=False)

    self.mox.ReplayAll()
    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://bar', 'http://baz'], source.domain_urls)
    self.assertEquals(['bar', 'baz'], source.domains)
开发者ID:sanduhrs,项目名称:bridgy,代码行数:14,代码来源:models_test.py


示例9: test_create_new_webmention

  def test_create_new_webmention(self):
    """We should subscribe to webmention sources in Superfeedr."""
    self.expect_webmention_requests_get('http://primary/', 'no webmention endpoint',
                                        verify=False)
    self.mox.StubOutWithMock(superfeedr, 'subscribe')

    def check_source(source):
      assert isinstance(source, FakeSource)
      assert source.is_saved
      return True
    superfeedr.subscribe(mox.Func(check_source), self.handler)

    self.mox.ReplayAll()
    FakeSource.create_new(self.handler, features=['webmention'],
                          domains=['primary/'], domain_urls=['http://primary/'])
开发者ID:mblaney,项目名称:bridgy,代码行数:15,代码来源:test_models.py


示例10: test_disable_source_on_deauthorized

  def test_disable_source_on_deauthorized(self):
    """If the source raises DisableSource, disable it.
    """
    source = self.sources[0]
    self.mox.StubOutWithMock(FakeSource, 'get_activities_response')
    FakeSource.get_activities_response(
      count=mox.IgnoreArg(), fetch_replies=True, fetch_likes=True,
      fetch_shares=True, etag=None, min_id=None, cache=mox.IgnoreArg(),
      ).AndRaise(models.DisableSource)
    self.mox.ReplayAll()

    source.status = 'enabled'
    source.put()
    self.post_task()
    source = source.key.get()
    self.assertEqual('disabled', source.status)
开发者ID:notenoughneon,项目名称:bridgy,代码行数:16,代码来源:tasks_test.py


示例11: setUp

  def setUp(self):
    super(SyndicatedPostTest, self).setUp()

    self.source = FakeSource.new(None)
    self.source.put()

    self.relationships = []
    self.relationships.append(
        SyndicatedPost(parent=self.source.key,
                       original='http://original/post/url',
                       syndication='http://silo/post/url'))
    # two syndication for the same original
    self.relationships.append(
        SyndicatedPost(parent=self.source.key,
                       original='http://original/post/url',
                       syndication='http://silo/another/url'))
    # two originals for the same syndication
    self.relationships.append(
        SyndicatedPost(parent=self.source.key,
                       original='http://original/another/post',
                       syndication='http://silo/post/url'))
    self.relationships.append(
        SyndicatedPost(parent=self.source.key,
                       original=None,
                       syndication='http://silo/no-original'))
    self.relationships.append(
        SyndicatedPost(parent=self.source.key,
                       original='http://original/no-syndication',
                       syndication=None))

    for r in self.relationships:
      r.put()
开发者ID:sanduhrs,项目名称:bridgy,代码行数:32,代码来源:models_test.py


示例12: test_create_new

  def test_create_new(self):
    self.assertEqual(0, FakeSource.query().count())
    self._test_create_new(features=['listen'])
    msg = "Added fake (FakeSource). Refresh to see what we've found!"
    self.assert_equals({msg}, self.handler.messages)

    task_params = testutil.get_task_params(self.taskqueue_stub.GetTasks('poll')[0])
    self.assertEqual('1970-01-01-00-00-00', task_params['last_polled'])
开发者ID:sanduhrs,项目名称:bridgy,代码行数:8,代码来源:models_test.py


示例13: test_replace_poll_tasks

  def test_replace_poll_tasks(self):
    self.assertEqual([], self.taskqueue_stub.GetTasks('poll'))
    now = datetime.datetime.now()

    # a bunch of sources, one needs a new poll task
    five_min_ago = now - datetime.timedelta(minutes=5)
    day_and_half_ago = now - datetime.timedelta(hours=36)
    month_ago = now - datetime.timedelta(days=30)
    defaults = {
      'features': ['listen'],
      'last_webmention_sent': day_and_half_ago,
      }
    sources = [
      # doesn't need a new poll task
      FakeSource.new(None, last_poll_attempt=now, **defaults).put(),
      FakeSource.new(None, last_poll_attempt=five_min_ago, **defaults).put(),
      FakeSource.new(None, status='disabled', **defaults).put(),
      FakeSource.new(None, status='disabled', **defaults).put(),
      # need a new poll task
      FakeSource.new(None, status='enabled', **defaults).put(),
      # not signed up for listen
      FakeSource.new(None, last_webmention_sent=day_and_half_ago).put(),
      # never sent a webmention, past grace period. last polled is older than 2x
      # fast poll, but within 2x slow poll.
      FakeSource.new(None, features=['listen'], created=month_ago,
                     last_poll_attempt=day_and_half_ago).put(),
      ]
    resp = cron.application.get_response('/cron/replace_poll_tasks')
    self.assertEqual(200, resp.status_int)

    tasks = self.taskqueue_stub.GetTasks('poll')
    self.assertEqual(1, len(tasks))
    self.assert_equals(sources[4].urlsafe(),
                       testutil.get_task_params(tasks[0])['source_key'])
开发者ID:paulscallanjr,项目名称:bridgy,代码行数:34,代码来源:test_cron.py


示例14: test_is_beta_user

  def test_is_beta_user(self):
    source = FakeSource.new(self.handler)
    self.assertFalse(source.is_beta_user())

    self.mox.stubs.Set(util, 'BETA_USER_PATHS', set())
    self.assertFalse(source.is_beta_user())

    self.mox.stubs.Set(util, 'BETA_USER_PATHS', set([source.bridgy_path()]))
    self.assertTrue(source.is_beta_user())
开发者ID:chrisaldrich,项目名称:bridgy,代码行数:9,代码来源:test_models.py


示例15: test_last_activity_id

  def test_last_activity_id(self):
    """We should store the last activity id seen and then send it as min_id."""
    self.sources[0].set_activities(list(reversed(self.activities)))
    self.post_task()

    source = self.sources[0].key.get()
    self.assertEqual('c', source.last_activity_id)
    source.last_polled = util.EPOCH
    source.put()

    self.mox.StubOutWithMock(FakeSource, 'get_activities_response')
    FakeSource.get_activities_response(
      count=mox.IgnoreArg(), fetch_replies=True, fetch_likes=True,
      fetch_shares=True, etag=None, min_id='c', cache=mox.IgnoreArg(),
      ).AndReturn({'items': []})

    self.mox.ReplayAll()
    self.post_task()
开发者ID:notenoughneon,项目名称:bridgy,代码行数:18,代码来源:tasks_test.py


示例16: test_create_new_domain_url_path_fails

  def test_create_new_domain_url_path_fails(self):
    auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(
        {'urls': [{'value': 'http://flaky/foo'}]}))
    self.expect_requests_get('http://flaky', status_code=500)
    self.mox.ReplayAll()

    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://flaky/foo'], source.domain_urls)
    self.assertEquals(['flaky'], source.domains)
开发者ID:snarfed,项目名称:bridgy,代码行数:9,代码来源:test_models.py


示例17: test_verify_without_webmention_endpoint

  def test_verify_without_webmention_endpoint(self):
    self.expect_requests_get('http://primary/', 'no webmention endpoint here!',
                             verify=False)
    self.mox.ReplayAll()

    source = FakeSource.new(self.handler, features=['webmention'],
                            domain_urls=['http://primary/'], domains=['primary'])
    source.verify()
    self.assertIsNone(source.webmention_endpoint)
开发者ID:sanduhrs,项目名称:bridgy,代码行数:9,代码来源:models_test.py


示例18: test_get_comment

  def test_get_comment(self):
    comment_obj = {'objectType': 'comment', 'content': 'qwert'}
    source = FakeSource.new(None)
    source.as_source = self.mox.CreateMock(as_source.Source)
    source.as_source.get_comment('123', activity_id=None, activity_author_id=None
                                 ).AndReturn(comment_obj)

    self.mox.ReplayAll()
    self.assert_equals(comment_obj, source.get_comment('123'))
开发者ID:sanduhrs,项目名称:bridgy,代码行数:9,代码来源:models_test.py


示例19: test_has_bridgy_webmention_endpoint

 def test_has_bridgy_webmention_endpoint(self):
   source = FakeSource.new(None)
   for endpoint, has in ((None, False),
                         ('http://foo', False ),
                         ('https://brid.gy/webmention/fake', True),
                         ('https://www.brid.gy/webmention/fake', True),
                         ):
     source.webmention_endpoint = endpoint
     self.assertEquals(has, source.has_bridgy_webmention_endpoint(), endpoint)
开发者ID:lcorbasson,项目名称:bridgy,代码行数:9,代码来源:test_models.py


示例20: test_create_new_domain_url_path_connection_fails

  def test_create_new_domain_url_path_connection_fails(self):
    auth_entity = testutil.FakeAuthEntity(id='x', user_json=json.dumps(
        {'urls': [{'value': 'http://flaky/foo'}]}))
    self.expect_requests_get('http://flaky').AndRaise(
      requests.ConnectionError('DNS lookup failed for URL: http://bad/'))
    self.mox.ReplayAll()

    source = FakeSource.create_new(self.handler, auth_entity=auth_entity)
    self.assertEquals(['http://flaky/foo'], source.domain_urls)
    self.assertEquals(['flaky'], source.domains)
开发者ID:snarfed,项目名称:bridgy,代码行数:10,代码来源:test_models.py



注:本文中的testutil.FakeSource类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testutils.assertNotRaises函数代码示例发布时间:2022-05-27
下一篇:
Python testutil.get_task_params函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap