本文整理汇总了Python中mediadrop.lib.auth.query_result_proxy.QueryResultProxy类的典型用法代码示例。如果您正苦于以下问题:Python QueryResultProxy类的具体用法?Python QueryResultProxy怎么用?Python QueryResultProxy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了QueryResultProxy类的14个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_can_iterate_over_results
def test_can_iterate_over_results(self):
filter_ = lambda item: item.activity >= 2
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_true(hasattr(self.proxy, '__iter__'))
results = list(self.proxy)
assert_equals(['baz', 'quux', 'quuux'], self._names(results))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:7,代码来源:query_result_proxy_test.py
示例2: test_can_tell_length_if_no_more_items_available
def test_can_tell_length_if_no_more_items_available(self):
filter_ = lambda item: item.activity >= 2
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_true(hasattr(self.proxy, '__len__'))
assert_length(3, self.proxy.fetch(10))
assert_false(self.proxy.more_available())
assert_length(3, self.proxy)
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:8,代码来源:query_result_proxy_test.py
示例3: test_can_fetch_single_item
def test_can_fetch_single_item(self):
filter_ = lambda item: item.activity >= 4
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals('quuux', self._name(self.proxy.first()))
assert_equals(None, self.proxy.first())
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:5,代码来源:query_result_proxy_test.py
示例4: setUp
def setUp(self):
self.engine = create_engine('sqlite:///:memory:')
self.session = self._create_session()
self._populate_database()
self.query = self.session.query(User).order_by(asc(User.id))
self.proxy = QueryResultProxy(self.query)
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:6,代码来源:query_result_proxy_test.py
示例5: QueryResultProxyTest
class QueryResultProxyTest(PythonicTestCase):
def setUp(self):
self.engine = create_engine('sqlite:///:memory:')
self.session = self._create_session()
self._populate_database()
self.query = self.session.query(User).order_by(asc(User.id))
self.proxy = QueryResultProxy(self.query)
def _tearDown(self):
Base.metadata.drop_all(self.engine)
def _create_session(self):
Base.metadata.create_all(self.engine)
Session = sessionmaker(bind=self.engine)
return Session()
def _populate_database(self):
for i, name in enumerate(('foo', 'bar', 'baz', 'quux', 'quuux')):
self.session.add(User(name, i))
self.session.commit()
def _next_name(self):
items = self.proxy.fetch(n=1)
return items[0].name
def _next_names(self, n=1):
return self._names(self.proxy.fetch(n=n))
def _names(self, results):
return [item.name for item in results]
def _name(self, result):
return result.name
def test_can_fetch_next_item(self):
assert_equals('foo', self._next_name())
assert_equals('bar', self._next_name())
assert_equals('baz', self._next_name())
def test_can_fetch_single_item(self):
filter_ = lambda item: item.activity >= 4
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals('quuux', self._name(self.proxy.first()))
assert_equals(None, self.proxy.first())
def test_can_fetch_multiple_items_at_once(self):
assert_equals(['foo', 'bar'], self._next_names(n=2))
assert_equals(['baz', 'quux'], self._next_names(n=2))
assert_equals(['quuux'], self._next_names(n=2))
assert_equals([], self._next_names(n=2))
def test_regression_do_not_skipped_items_because_of_prefetching(self):
assert_equals('foo', self._next_name())
assert_equals(['bar', 'baz'], self._next_names(n=2))
assert_equals(['quux', 'quuux'], self._next_names(n=2))
def test_can_tell_if_more_items_are_available_even_before_explicit_fetching(self):
assert_true(self.proxy.more_available())
assert_equals('foo', self._next_name())
def test_can_tell_if_no_more_items_are_available(self):
assert_equals(['foo', 'bar', 'baz', 'quux'], self._next_names(n=4))
assert_true(self.proxy.more_available())
assert_equals('quuux', self._next_name())
assert_false(self.proxy.more_available())
def test_does_not_omit_prefetched_items_after_asking_if_more_are_available(self):
assert_true(self.proxy.more_available())
assert_equals(['foo', 'bar'], self._next_names(n=2))
def test_does_not_omit_prefetched_items_if_many_prefetched_items_were_available(self):
assert_true(self.proxy.more_available())
# more_available() should have fetched more than one item so we have
# actually 2+ items prefetched.
assert_not_equals(0, len(self.proxy._prefetched_items))
# .next() consumes only one item so there should be one left
# (._prefetched_items were overwritten in this step)
assert_equals(['foo'], self._names([self.proxy.next()]))
assert_not_equals(0, len(self.proxy._prefetched_items))
assert_equals(['bar', 'baz'], self._next_names(n=2))
def test_can_initialize_proxy_with_offset(self):
self.proxy = QueryResultProxy(self.query, start=2)
assert_equals(['baz', 'quux'], self._next_names(n=2))
def test_can_specify_filter_callable(self):
filter_ = lambda item: item.activity % 2 == 1
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals(['bar', 'quux'], self._next_names(n=5))
assert_false(self.proxy.more_available())
def test_proxy_returns_always_specified_number_of_items_if_possible(self):
filter_ = lambda item: item.activity >= 2
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals(['baz', 'quux', 'quuux'], self._next_names(n=3))
assert_false(self.proxy.more_available())
# --- limit ----------------------------------------------------------------
#.........这里部分代码省略.........
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:101,代码来源:query_result_proxy_test.py
示例6: test_can_specify_how_many_items_should_be_fetched_by_default
def test_can_specify_how_many_items_should_be_fetched_by_default(self):
self.proxy = QueryResultProxy(self.query, default_fetch=3)
self.proxy.more_available()
assert_equals(3, len(self.proxy._prefetched_items))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:4,代码来源:query_result_proxy_test.py
示例7: test_can_tell_length
def test_can_tell_length(self):
filter_ = lambda item: item.activity >= 2
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_length(3, self.proxy)
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:4,代码来源:query_result_proxy_test.py
示例8: test_can_offset_for_iteration
def test_can_offset_for_iteration(self):
self.proxy = QueryResultProxy(self.query).offset(3)
assert_equals(['quux', 'quuux'], self._names(self.proxy))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:3,代码来源:query_result_proxy_test.py
示例9: test_accepts_strings_for_offset
def test_accepts_strings_for_offset(self):
# that's what SQLAlchemy does (sic!)
self.proxy = QueryResultProxy(self.query).offset('1')
assert_equals(['bar', 'baz', 'quux', 'quuux'], self._names(self.proxy))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:4,代码来源:query_result_proxy_test.py
示例10: test_accepts_strings_for_limit
def test_accepts_strings_for_limit(self):
# that's what SQLAlchemy does (sic!)
self.proxy = QueryResultProxy(self.query).limit('1')
assert_equals(['foo'], self._names(self.proxy))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:4,代码来源:query_result_proxy_test.py
示例11: test_can_limit_items_returned_by_iteration
def test_can_limit_items_returned_by_iteration(self):
self.proxy = QueryResultProxy(self.query).limit(2)
assert_equals(['foo', 'bar'], self._names(self.proxy))
self.proxy = QueryResultProxy(self.query).limit(20)
assert_equals(['foo', 'bar', 'baz', 'quux', 'quuux'], self._names(self.proxy))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:6,代码来源:query_result_proxy_test.py
示例12: test_proxy_returns_always_specified_number_of_items_if_possible
def test_proxy_returns_always_specified_number_of_items_if_possible(self):
filter_ = lambda item: item.activity >= 2
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals(['baz', 'quux', 'quuux'], self._next_names(n=3))
assert_false(self.proxy.more_available())
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:5,代码来源:query_result_proxy_test.py
示例13: test_can_specify_filter_callable
def test_can_specify_filter_callable(self):
filter_ = lambda item: item.activity % 2 == 1
self.proxy = QueryResultProxy(self.query, filter_=filter_)
assert_equals(['bar', 'quux'], self._next_names(n=5))
assert_false(self.proxy.more_available())
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:5,代码来源:query_result_proxy_test.py
示例14: test_can_initialize_proxy_with_offset
def test_can_initialize_proxy_with_offset(self):
self.proxy = QueryResultProxy(self.query, start=2)
assert_equals(['baz', 'quux'], self._next_names(n=2))
开发者ID:GitReadysoft,项目名称:mediadrop,代码行数:3,代码来源:query_result_proxy_test.py
注:本文中的mediadrop.lib.auth.query_result_proxy.QueryResultProxy类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论