• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python content.ContentCatalog类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.db.model.content.ContentCatalog的典型用法代码示例。如果您正苦于以下问题:Python ContentCatalog类的具体用法?Python ContentCatalog怎么用?Python ContentCatalog使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ContentCatalog类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUp

 def setUp(self):
     super(ContainerTest, self).setUp()
     ContentCatalog.get_collection().remove()
     self.tmp_dir = mkdtemp()
     self.downloaded = os.path.join(self.tmp_dir, 'downloaded')
     os.makedirs(self.downloaded)
     self.add_sources()
     plugins._create_manager()
     plugins._MANAGER.catalogers.add_plugin('yum', FakeCataloger, {})
开发者ID:skarmark,项目名称:pulp,代码行数:9,代码来源:test_content_sources.py


示例2: setUp

 def setUp(self):
     PulpAsyncServerTests.setUp(self)
     ContentCatalog.get_collection().remove()
     self.tmp_dir = mkdtemp()
     self.downloaded = os.path.join(self.tmp_dir, 'downloaded')
     os.makedirs(self.downloaded)
     self.add_sources()
     MockListener.download_started.reset_mock()
     MockListener.download_succeeded.reset_mock()
     MockListener.download_failed.reset_mock()
     plugins._create_manager()
     plugins._MANAGER.catalogers.add_plugin('yum', MockCataloger, {})
开发者ID:ashcrow,项目名称:pulp,代码行数:12,代码来源:test_content_sources.py


示例3: test_add

 def test_add(self):
     units = self.units(0, 10)
     manager = ContentCatalogManager()
     for unit_key, url in units:
         manager.add_entry(SOURCE_ID, EXPIRATION, TYPE_ID, unit_key, url)
     collection = ContentCatalog.get_collection()
     self.assertEqual(len(units), collection.find().count())
     for unit_key, url in units:
         locator = ContentCatalog.get_locator(TYPE_ID, unit_key)
         entry = collection.find_one({'locator': locator})
         self.assertEqual(entry['type_id'], TYPE_ID)
         self.assertEqual(entry['unit_key'], unit_key)
         self.assertEqual(entry['url'], url)
开发者ID:ashcrow,项目名称:pulp,代码行数:13,代码来源:test_content_catalog_manager.py


示例4: test_locator

 def test_locator(self):
     key_1 = {'a': 1, 'b': 2, 'c': 3}
     key_2 = {'c': 3, 'b': 2, 'a': 1}
     key_3 = {'c': 1, 'b': 2, 'a': 3}
     locator_1 = ContentCatalog.get_locator(TYPE_ID, key_1)  # eq
     locator_2 = ContentCatalog.get_locator(TYPE_ID, key_2)  # eq
     locator_3 = ContentCatalog.get_locator(TYPE_ID, key_3)  # neq
     locator_4 = ContentCatalog.get_locator(TYPE_ID[1:], key_1)  # neq
     self.assertTrue(isinstance(locator_1, str))
     self.assertTrue(isinstance(locator_2, str))
     self.assertEqual(locator_1, locator_2)
     self.assertNotEqual(locator_1, locator_3)
     self.assertNotEqual(locator_1, locator_4)
开发者ID:ashcrow,项目名称:pulp,代码行数:13,代码来源:test_content_catalog_manager.py


示例5: purge_expired

 def purge_expired(self, grace_period=GRACE_PERIOD):
     """
     Purge (delete) expired entries from the content catalog belonging
     to the specified content source by ID.
     :param grace_period: The grace period in seconds.
         The grace_period defines how long an entry is permitted to remain
         in the catalog after it has expired.  The default is 1 hour.
     :type grace_period: int
     """
     collection = ContentCatalog.get_collection()
     now = ContentCatalog.get_expiration(0)
     timestamp = now - grace_period
     query = {'expiration': {'$lt': timestamp}}
     collection.remove(query, safe=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:14,代码来源:catalog.py


示例6: delete_entry

 def delete_entry(self, source_id, type_id, unit_key):
     """
     Delete an entry from the content catalog.
     :param source_id: A content source ID.
     :type source_id: str
     :param type_id: The unit type ID.
     :type type_id: str
     :param unit_key: The unit key.
     :type unit_key: dict
     """
     collection = ContentCatalog.get_collection()
     locator = ContentCatalog.get_locator(type_id, unit_key)
     query = {'source_id': source_id, 'locator': locator}
     collection.remove(query, safe=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:14,代码来源:catalog.py


示例7: has_entries

 def has_entries(self, source_id):
     """
     Get whether the specified content source has entries in the catalog.
     :param source_id: A content source ID.
     :type source_id: str
     :return: True if has entries.
     :rtype: bool
     """
     collection = ContentCatalog.get_collection()
     query = {
         'source_id': source_id,
         'expiration': {'$gte': ContentCatalog.get_expiration(0)}
     }
     cursor = collection.find(query)
     return cursor.count() > 0
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:15,代码来源:catalog.py


示例8: test_add

 def test_add(self):
     units = self.units(0, 10)
     conduit = CatalogerConduit(SOURCE_ID, EXPIRES)
     for unit_key, url in units:
         conduit.add_entry(TYPE_ID, unit_key, url)
     collection = ContentCatalog.get_collection()
     self.assertEqual(conduit.source_id, SOURCE_ID)
     self.assertEqual(conduit.expires, EXPIRES)
     self.assertEqual(len(units), collection.find().count())
     self.assertEqual(conduit.added_count, len(units))
     self.assertEqual(conduit.deleted_count, 0)
     for unit_key, url in units:
         locator = ContentCatalog.get_locator(TYPE_ID, unit_key)
         entry = collection.find_one({"locator": locator})
         self.assertEqual(entry["type_id"], TYPE_ID)
         self.assertEqual(entry["unit_key"], unit_key)
         self.assertEqual(entry["url"], url)
开发者ID:lzap,项目名称:pulp,代码行数:17,代码来源:test_cataloger_conduit.py


示例9: purge

 def purge(self, source_id):
     """
     Purge (delete) entries from the content catalog belonging
     to the specified content source by ID.
     :param source_id: A content source ID.
     :type source_id: str
     """
     collection = ContentCatalog.get_collection()
     query = {'source_id': source_id}
     collection.remove(query, safe=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:10,代码来源:catalog.py


示例10: test_delete

 def test_delete(self):
     units = self.units(0, 10)
     conduit = CatalogerConduit(SOURCE_ID, EXPIRES)
     for unit_key, url in units:
         conduit.add_entry(TYPE_ID, unit_key, url)
     collection = ContentCatalog.get_collection()
     self.assertEqual(len(units), collection.find().count())
     unit_key, url = units[5]
     locator = ContentCatalog.get_locator(TYPE_ID, unit_key)
     entry = collection.find_one({"locator": locator})
     self.assertEqual(entry["type_id"], TYPE_ID)
     self.assertEqual(entry["unit_key"], unit_key)
     self.assertEqual(entry["url"], url)
     conduit.delete_entry(TYPE_ID, unit_key)
     self.assertEqual(len(units) - 1, collection.find().count())
     self.assertEqual(conduit.added_count, len(units))
     self.assertEqual(conduit.deleted_count, 1)
     entry = collection.find_one({"locator": locator})
     self.assertTrue(entry is None)
开发者ID:lzap,项目名称:pulp,代码行数:19,代码来源:test_cataloger_conduit.py


示例11: purge_orphans

 def purge_orphans(self, valid_ids):
     """
     Purge orphan entries from the content catalog.
     Entries are orphaned when the content source to which they belong
     is no longer loaded.
     :param valid_ids: The list of valid (loaded) content source IDs.
     :type valid_ids: list
     """
     collection = ContentCatalog.get_collection()
     for source_id in collection.distinct('source_id'):
         if source_id not in valid_ids:
             self.purge(source_id)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:12,代码来源:catalog.py


示例12: test_purge_orphans

 def test_purge_orphans(self):
     _dir, cataloged = self.populate_catalog(ORPHANED, 0, 10)
     _dir, cataloged = self.populate_catalog(UNDERGROUND, 0, 10)
     _dir, cataloged = self.populate_catalog(UNIT_WORLD, 0, 10)
     collection = ContentCatalog.get_collection()
     self.assertEqual(collection.find().count(), 30)
     container = ContentContainer(path=self.tmp_dir)
     container.purge_orphans()
     self.assertEqual(collection.find().count(), 20)
     self.assertEqual(collection.find({'source_id': ORPHANED}).count(), 0)
     self.assertEqual(collection.find({'source_id': UNDERGROUND}).count(), 10)
     self.assertEqual(collection.find({'source_id': UNIT_WORLD}).count(), 10)
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_content_sources.py


示例13: populate_catalog

 def populate_catalog(self, source_id, n_start, n_units, checksum="0xAA"):
     _dir = self.populate_content(source_id, n_start, n_units)
     collection = ContentCatalog.get_collection()
     entry_list = []
     for n in range(n_start, n_start + n_units):
         unit_key = {"name": "unit_%d" % n, "version": "1.0.%d" % n, "release": "1", "checksum": checksum}
         url = "file://%s/unit_%d" % (_dir, n)
         entry = ContentCatalog(source_id, EXPIRES, TYPE_ID, unit_key, url)
         entry_list.append(entry)
     for entry in entry_list:
         collection.insert(entry)
     return _dir, entry_list
开发者ID:nthien,项目名称:pulp,代码行数:12,代码来源:test_content_sources.py


示例14: purge

 def purge(self, source_id):
     """
     Purge (delete) entries from the content catalog belonging
     to the specified content source by ID.
     :param source_id: A content source ID.
     :type source_id: str
     :return: The number of entries purged.
     :rtype: int
     """
     collection = ContentCatalog.get_collection()
     query = {'source_id': source_id}
     result = collection.remove(query)
     return result['n']
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:13,代码来源:catalog.py


示例15: test_refresh_exception

 def test_refresh_exception(self, mock_refresh):
     container = ContentContainer(path=self.tmp_dir)
     event = Event()
     report = container.refresh(event, force=True)
     self.assertEqual(len(report), 2)
     for r in report:
         self.assertFalse(r.succeeded)
         self.assertEqual(r.added_count, 0)
         self.assertEqual(r.deleted_count, 0)
         self.assertEqual(len(r.errors), 1)
     collection = ContentCatalog.get_collection()
     self.assertEqual(mock_refresh.call_count, 2)
     self.assertEqual(collection.find().count(), 0)
开发者ID:aweiteka,项目名称:pulp,代码行数:13,代码来源:test_content_sources.py


示例16: test_refresh_failure

 def test_refresh_failure(self, mock_plugin):
     container = ContentContainer(path=self.tmp_dir)
     event = Event()
     report = container.refresh(event, force=True)
     self.assertEqual(len(report), 5)
     for r in report:
         self.assertFalse(r.succeeded)
         self.assertEqual(r.added_count, 0)
         self.assertEqual(r.deleted_count, 0)
         self.assertEqual(len(r.errors), 1)
     plugin = mock_plugin.return_value[0]
     collection = ContentCatalog.get_collection()
     self.assertEqual(plugin.refresh.call_count, 5)
     self.assertEqual(collection.find().count(), 0)
开发者ID:aweiteka,项目名称:pulp,代码行数:14,代码来源:test_content_sources.py


示例17: find

 def find(self, type_id, unit_key):
     """
     Find entries in the content catalog using the specified unit type_id
     and unit_key.  The catalog may contain more than one entry matching the
     locator for a given content source.  In this case, only the newest entry
     for each source is included in the result set.
     :param type_id: The unit type ID.
     :type type_id: str
     :param unit_key: The unit key.
     :type unit_key: dict
     :return: A list of matching entries.
     :rtype: list
     """
     collection = ContentCatalog.get_collection()
     locator = ContentCatalog.get_locator(type_id, unit_key)
     query = {
         'locator': locator,
         'expiration': {'$gte': ContentCatalog.get_expiration(0)}
     }
     newest_by_source = {}
     for entry in collection.find(query, sort=[('_id', ASCENDING)]):
         newest_by_source[entry['source_id']] = entry
     return newest_by_source.values()
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:23,代码来源:catalog.py


示例18: test_purge

 def test_purge(self):
     source_a = 'A'
     source_b = 'B'
     manager = ContentCatalogManager()
     for unit_key, url in self.units(0, 10):
         manager.add_entry(source_a, EXPIRATION, TYPE_ID, unit_key, url)
     for unit_key, url in self.units(0, 10):
         manager.add_entry(source_b, EXPIRATION, TYPE_ID, unit_key, url)
     collection = ContentCatalog.get_collection()
     self.assertEqual(20, collection.find().count())
     manager = ContentCatalogManager()
     manager.purge(source_a)
     self.assertEqual(collection.find({'source_id': source_a}).count(), 0)
     self.assertEqual(collection.find({'source_id': source_b}).count(), 10)
开发者ID:ashcrow,项目名称:pulp,代码行数:14,代码来源:test_content_catalog_manager.py


示例19: populate_catalog

 def populate_catalog(self, source_id, n_start, n_units, checksum='0xAA'):
     _dir = self.populate_content(source_id, n_start, n_units)
     collection = ContentCatalog.get_collection()
     entry_list = []
     for n in range(n_start, n_start + n_units):
         unit_key = {
             'name': 'unit_%d' % n,
             'version': '1.0.%d' % n,
             'release': '1',
             'checksum': checksum
         }
         url = 'file://%s/unit_%d' % (_dir, n)
         entry = ContentCatalog(source_id, EXPIRES, TYPE_ID, unit_key, url)
         entry_list.append(entry)
     for entry in entry_list:
         collection.insert(entry, safe=True)
     return _dir, entry_list
开发者ID:aweiteka,项目名称:pulp,代码行数:17,代码来源:test_content_sources.py


示例20: add_entry

 def add_entry(self, source_id, expires, type_id, unit_key, url):
     """
     Add an entry to the content catalog.
     :param source_id: A content source ID.
     :type source_id: str
     :param expires: The entry expiration in seconds.
     :type expires: int
     :param type_id: The unit type ID.
     :type type_id: str
     :param unit_key: The unit key.
     :type unit_key: dict
     :param url: The download URL.
     :type url: str
     """
     collection = ContentCatalog.get_collection()
     entry = ContentCatalog(source_id, expires, type_id, unit_key, url)
     collection.insert(entry, safe=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:17,代码来源:catalog.py



注:本文中的pulp.server.db.model.content.ContentCatalog类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python content.ContentType类代码示例发布时间:2022-05-25
下一篇:
Python consumer.UnitProfile类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap