本文整理汇总了Python中pulp.plugins.types.database.all_type_ids函数的典型用法代码示例。如果您正苦于以下问题:Python all_type_ids函数的具体用法?Python all_type_ids怎么用?Python all_type_ids使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了all_type_ids函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: list_content_types
def list_content_types(self):
"""
List the currently defined content type ids.
@return: list of content type ids
@rtype: list [str, ...]
"""
return content_types_db.all_type_ids()
开发者ID:nbetm,项目名称:pulp,代码行数:7,代码来源:query.py
示例2: delete_all_orphans
def delete_all_orphans():
"""
Delete all orphaned content units.
"""
for content_type_id in content_types_db.all_type_ids():
OrphanManager.delete_orphans_by_type(content_type_id)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:orphan.py
示例3: list_content_types
def list_content_types():
"""
List the supported content types.
@return: list of content type IDs
@rtype: list of str
"""
assert _is_initialized()
return database.all_type_ids()
开发者ID:ryanschneider,项目名称:pulp,代码行数:8,代码来源:api.py
示例4: __unit_ids_to_plugin_unit_keys
def __unit_ids_to_plugin_unit_keys(self, unit_ids_by_type, repo_ids):
"""
Parse a dictionary of unit ids keyed by content type id and return a dictionary of
corresponding plugin unit keys keyed by content type id.
:param unit_ids_by_type: dictionary of <content type id> : <list of unit ids>
:type unit_ids_by_type: dict
:return: if units are specified, return the corresponding plugin unit_keys. If unit_ids_by_type dict
is empty, return plugin unit keys corresponging to all units in given repo ids.
If unit ids list for a particular unit type is empty, return all plugin unit_keys
in given repo ids with that unit type.
:rtype: dict
"""
repo_unit_association_query_manager = managers.repo_unit_association_query_manager()
content_query_manager = managers.content_query_manager()
result_unit_keys = {}
if unit_ids_by_type is not None:
for unit_type_id, unit_ids in unit_ids_by_type.items():
# Get unit type specific collection
collection = content_query_manager.get_content_unit_collection(type_id=unit_type_id)
type_def = content_types_db.type_definition(unit_type_id)
if not unit_ids:
# If unit_list is empty for a unit_type, consider all units of specific type
criteria = UnitAssociationCriteria(unit_fields = ['unit_id'])
for repo_id in repo_ids:
repo_units = repo_unit_association_query_manager.get_units_by_type(repo_id, unit_type_id, criteria)
# Get metadata for each unit from type specific collection
pulp_units = [collection.find_one({'_id': u['unit_id']}) for u in repo_units]
# Convert pulp units to plugin unit keys
plugin_unit_keys = [common_utils.to_plugin_unit(u, type_def).unit_key for u in pulp_units]
result_unit_keys.setdefault(unit_type_id, []).extend(plugin_unit_keys)
else:
# Get metadata for each unit from type specific collection
pulp_units = [collection.find_one({'_id': unit_id}) for unit_id in unit_ids]
# Convert pulp units to plugin unit keys
plugin_unit_keys = [common_utils.to_plugin_unit(u, type_def).unit_key for u in pulp_units]
result_unit_keys.setdefault(unit_type_id, []).extend(plugin_unit_keys)
else:
# If units are not specified, consider all units in given repos.
for repo_id in repo_ids:
all_unit_type_ids = content_types_db.all_type_ids()
for unit_type_id in all_unit_type_ids:
criteria = UnitAssociationCriteria(type_ids=[unit_type_id], unit_fields = ['unit_id', 'unit_type_id'])
repo_units = repo_unit_association_query_manager.get_units(repo_id, criteria)
# Get unit metadata for each unit from type specific collection
collection = content_query_manager.get_content_unit_collection(type_id=unit_type_id)
pulp_units = [collection.find_one({'_id': u['unit_id']}) for u in repo_units]
# Convert pulp units to plugin unit keys
type_def = content_types_db.type_definition(unit_type_id)
plugin_unit_keys = [common_utils.to_plugin_unit(u, type_def).unit_key for u in pulp_units]
result_unit_keys.setdefault(unit_type_id, []).extend(plugin_unit_keys)
return result_unit_keys
开发者ID:bartwo,项目名称:pulp,代码行数:58,代码来源:applicability.py
示例5: delete_all_orphans
def delete_all_orphans():
"""
Delete all orphaned content units.
"""
for content_type_id in content_types_db.all_type_ids():
OrphanManager.delete_orphans_by_type(content_type_id)
for content_type_id in plugin_api.list_unit_models():
OrphanManager.delete_orphan_content_units_by_type(content_type_id)
开发者ID:rbramwell,项目名称:pulp,代码行数:10,代码来源:orphan.py
示例6: list_content_types
def list_content_types():
"""
List the supported content types.
:return: list of content type IDs
:rtype: list of str
"""
assert _is_initialized()
types_list = _MANAGER.unit_models.keys()
legacy_types = database.all_type_ids()
types_list.extend(legacy_types)
return types_list
开发者ID:jeremycline,项目名称:pulp,代码行数:11,代码来源:api.py
示例7: orphans_summary
def orphans_summary(self):
"""
Return a summary of the orphaned units as a dictionary of
content type -> number of orphaned units
:return: summary of orphaned units
:rtype: dict
"""
summary = {}
for content_type_id in content_types_db.all_type_ids():
summary[content_type_id] = self.orphans_count_by_type(content_type_id)
return summary
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:orphan.py
示例8: test_all_type_ids_no_entries
def test_all_type_ids_no_entries(self):
"""
Tests that an empty list (not None) is returned when there are no types.
"""
# Test
type_ids = types_db.all_type_ids()
# Verify
self.assertTrue(type_ids is not None)
self.assertTrue(isinstance(type_ids, list))
self.assertEqual(0, len(type_ids))
开发者ID:ashcrow,项目名称:pulp,代码行数:12,代码来源:test_types_database.py
示例9: generate_all_orphans_with_unit_keys
def generate_all_orphans_with_unit_keys(self):
"""
Return an generator of all orphaned content units.
Each orphan will contain the fields specified in the content type
definition's search indexes.
:return: generator of orphaned content units
:rtype: generator
"""
for content_type_id in content_types_db.all_type_ids():
for content_unit in self.generate_orphans_by_type_with_unit_keys(content_type_id):
yield content_unit
开发者ID:bartwo,项目名称:pulp,代码行数:14,代码来源:orphan.py
示例10: generate_all_orphans
def generate_all_orphans(self, fields=None):
"""
Return an generator of all orphaned content units.
If fields is not specified, only the `_id` field will be present.
:param fields: list of fields to include in each content unit
:type fields: list or None
:return: generator of orphaned content units
:rtype: generator
"""
for content_type_id in content_types_db.all_type_ids():
for content_unit in OrphanManager.generate_orphans_by_type(content_type_id, fields):
yield content_unit
开发者ID:aweiteka,项目名称:pulp,代码行数:15,代码来源:orphan.py
示例11: delete_all_orphans
def delete_all_orphans(flush=True):
"""
Delete all orphaned content units.
NOTE: `flush` should not be set to False unless you know what you're doing
:param flush: flush the database updates to disk on completion
:type flush: bool
"""
for content_type_id in content_types_db.all_type_ids():
OrphanManager.delete_orphans_by_type(content_type_id, flush=False)
if flush:
db_connection.flush_database()
开发者ID:aweiteka,项目名称:pulp,代码行数:15,代码来源:orphan.py
示例12: test_all_type_ids
def test_all_type_ids(self):
"""
Tests listing all type IDs.
"""
# Setup
types_db._create_or_update_type(TypeDefinition('a', 'A', 'A', [], [], []))
types_db._create_or_update_type(TypeDefinition('b', 'B', 'B', [], [], []))
# Test
type_ids = types_db.all_type_ids()
# Verify
self.assertEqual(2, len(type_ids))
self.assertTrue('a' in type_ids)
self.assertTrue('b' in type_ids)
开发者ID:ashcrow,项目名称:pulp,代码行数:16,代码来源:test_types_database.py
示例13: delete_all_orphans
def delete_all_orphans():
"""
Delete all orphaned content units.
:return: count of units deleted indexed by content_type_id
:rtype: dict
"""
ret = {}
for content_type_id in content_types_db.all_type_ids():
count = OrphanManager.delete_orphans_by_type(content_type_id)
if count > 0:
ret[content_type_id] = count
for content_type_id in plugin_api.list_unit_models():
count = OrphanManager.delete_orphan_content_units_by_type(content_type_id)
if count > 0:
ret[content_type_id] = count
return ret
开发者ID:pulp,项目名称:pulp,代码行数:18,代码来源:orphan.py
示例14: __populate_units
def __populate_units(self, unit_ids_by_type, repo_ids):
"""
Parse a dictionary of unit ids keyed by content type id and populate units for each type
if the criteria is empty.
:param unit_ids_by_type: dictionary of <content type id> : <list of unit ids>
:type unit_ids_by_type: dict
:return: if units are specified, return the corresponding units. If unit_ids_by_type dict
is empty, return unit ids corresponging to all units in given repo ids.
If unit ids list for a particular unit type is empty, return all unit ids
in given repo ids with that unit type.
:rtype: dict
"""
repo_unit_association_query_manager = managers.repo_unit_association_query_manager()
result_units = {}
if unit_ids_by_type is not None:
for unit_type_id, unit_ids in unit_ids_by_type.items():
# Get unit type specific collection
if not unit_ids:
# If unit_list is empty for a unit_type, consider all units of specific type
criteria = Criteria(filters={"repo_id": {"$in": repo_ids}, "unit_type_id":unit_type_id},
fields=['unit_id'])
repo_units = repo_unit_association_query_manager.find_by_criteria(criteria)
pulp_unit_ids = [u['unit_id'] for u in repo_units]
result_units.setdefault(unit_type_id, []).extend(list(set(pulp_unit_ids)))
else:
result_units.setdefault(unit_type_id, []).extend(unit_ids)
else:
# If units are not specified, consider all units in given repos.
all_unit_type_ids = content_types_db.all_type_ids()
for unit_type_id in all_unit_type_ids:
criteria = Criteria(filters={"repo_id": {"$in": repo_ids}, "unit_type_id":unit_type_id},
fields=['unit_id'])
repo_units = repo_unit_association_query_manager.find_by_criteria(criteria)
pulp_unit_ids = [u['unit_id'] for u in repo_units]
result_units.setdefault(unit_type_id, []).extend(list(set(pulp_unit_ids)))
return result_units
开发者ID:kbotc,项目名称:pulp,代码行数:40,代码来源:applicability.py
示例15: POST
def POST(self):
"""
Determine content applicability.
body {
consumer_criteria:<dict> or None,
repo_criteria:<dict> or None,
unit_criteria: <dict of type_id : unit_criteria> or None,
override_config: <dict> or None
}
:return:
When report_style is 'by_consumer' -
A dict of applicability reports keyed by consumer ID.
Each consumer report is:
{ <unit_type_id1> : [<ApplicabilityReport>],
<unit_type_id1> : [<ApplicabilityReport>]},
}
When report_style is 'by_units' -
A dict of <unit_type_id1>: [<ApplicabilityReport>]
where applicability_report.summary contains a list of applicable consumer ids.
:rtype: dict
"""
body = self.params()
consumer_criteria = body.get('consumer_criteria', None)
repo_criteria = body.get('repo_criteria', None)
units = body.get('unit_criteria', None)
override_config = body.get('override_config', None)
if consumer_criteria:
consumer_criteria = Criteria.from_client_input(consumer_criteria)
if repo_criteria:
repo_criteria = Criteria.from_client_input(repo_criteria)
# If unit_criteria is not specified, consider all units of all types
if not units:
units = {}
all_unit_type_ids = content_types_db.all_type_ids()
for unit_type_id in all_unit_type_ids:
units[unit_type_id] = {}
# Validate user defined criteria and convert them to Criteria objects
unit_criteria = {}
for type_id, criteria in units.items():
if criteria is None:
criteria = {}
unit_criteria[type_id] = Criteria.from_client_input(criteria)
manager = managers.consumer_applicability_manager()
report = manager.find_applicable_units(consumer_criteria, repo_criteria, unit_criteria, override_config)
for unit_type_id, applicability_reports in report.items():
if isinstance(applicability_reports, list):
report[unit_type_id] = [serialization.consumer.applicability_report(r) for r in applicability_reports]
else:
for consumer_id, report_list in applicability_reports.items():
report[unit_type_id][consumer_id] = [serialization.consumer.applicability_report(r) for r in report_list]
return self.ok(report)
开发者ID:cliffy94,项目名称:pulp,代码行数:62,代码来源:consumers.py
注:本文中的pulp.plugins.types.database.all_type_ids函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论