本文整理汇总了Python中pulp.plugins.util.verification.sanitize_checksum_type函数的典型用法代码示例。如果您正苦于以下问题:Python sanitize_checksum_type函数的具体用法?Python sanitize_checksum_type怎么用?Python sanitize_checksum_type使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sanitize_checksum_type函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: rpm_search_dicts
def rpm_search_dicts(self):
ret = []
for collection in self.pkglist:
for package in collection.get("packages", []):
if len(package.get("sum") or []) == 2:
checksum = package["sum"][1]
checksumtype = verification.sanitize_checksum_type(package["sum"][0])
elif "sums" in package and "type" in package:
# these are the field names we get from an erratum upload.
# I have no idea why they are different.
checksum = package["sums"]
checksumtype = verification.sanitize_checksum_type(package["type"])
else:
checksum = None
checksumtype = None
rpm = RPM(
name=package["name"],
epoch=package["epoch"],
version=package["version"],
release=package["release"],
arch=package["arch"],
checksum=checksum,
checksumtype=checksumtype,
)
unit_key = rpm.unit_key
for key in ["checksum", "checksumtype"]:
if unit_key[key] is None:
del unit_key[key]
ret.append(unit_key)
return ret
开发者ID:aeria,项目名称:pulp_rpm,代码行数:30,代码来源:models.py
示例2: rpm_search_dicts
def rpm_search_dicts(self):
ret = []
for collection in self.pkglist:
for package in collection.get('packages', []):
if len(package.get('sum') or []) == 2:
checksum = package['sum'][1]
checksumtype = verification.sanitize_checksum_type(package['sum'][0])
elif 'sums' in package and 'type' in package:
# these are the field names we get from an erratum upload.
# I have no idea why they are different.
checksum = package['sums']
checksumtype = verification.sanitize_checksum_type(package['type'])
else:
checksum = None
checksumtype = None
rpm = RPM(name=package['name'], epoch=package['epoch'],
version=package['version'], release=package['release'],
arch=package['arch'], checksum=checksum,
checksumtype=checksumtype)
unit_key = rpm.unit_key
for key in ['checksum', 'checksumtype']:
if unit_key[key] is None:
del unit_key[key]
ret.append(unit_key)
return ret
开发者ID:bkearney,项目名称:pulp_rpm,代码行数:25,代码来源:models.py
示例3: process_repomd_data_element
def process_repomd_data_element(data_element):
"""
Process the data elements of the repomd.xml file.
This returns a file information dictionary with the following keys:
* `name`: name of the element
* `relative_path`: the path of the metadata file, relative to the repository URL
* `checksum`: dictionary of `algorithm` and `hex_digest` keys and values
* `size`: size of the metadata file, in bytes
* `timestamp`: unix timestamp of the file's creation, as a float
* `open_checksum`: optional checksum dictionary of uncompressed metadata file
* `open_size`: optional size of the uncompressed metadata file, in bytes
:param data_element: XML data element parsed from the repomd.xml file
:return: file_info dictionary
:rtype: dict
"""
file_info = deepcopy(FILE_INFO_SKEL)
file_info['name'] = data_element.attrib['type']
location_element = data_element.find(LOCATION_TAG)
if location_element is not None:
file_info['relative_path'] = location_element.attrib['href']
checksum_element = data_element.find(CHECKSUM_TAG)
if checksum_element is not None:
checksum_type = verification.sanitize_checksum_type(checksum_element.attrib['type'])
file_info['checksum']['algorithm'] = checksum_type
file_info['checksum']['hex_digest'] = checksum_element.text
size_element = data_element.find(SIZE_TAG)
if size_element is not None:
file_info['size'] = int(size_element.text)
timestamp_element = data_element.find(TIMESTAMP_TAG)
if timestamp_element is not None:
file_info['timestamp'] = float(timestamp_element.text)
open_checksum_element = data_element.find(OPEN_CHECKSUM_TAG)
if open_checksum_element is not None:
checksum_type = verification.sanitize_checksum_type(open_checksum_element.attrib['type'])
file_info['open_checksum']['algorithm'] = checksum_type
file_info['open_checksum']['hex_digest'] = open_checksum_element.text
open_size_element = data_element.find(OPEN_SIZE_TAG)
if open_size_element is not None:
file_info['open_size'] = int(open_size_element.text)
for child in data_element.getchildren():
child.clear()
data_element.clear()
return file_info
开发者ID:Mahendra-Shivaswamy-Devops,项目名称:pulp_rpm,代码行数:56,代码来源:metadata.py
示例4: validate
def validate(self, value):
"""
Validates that value is a checksumtype known to pulp platform
:param value: The value to validate
:type value: basestring
:return: None
"""
super(ChecksumTypeStringField, self).validate(value)
verification.sanitize_checksum_type(value)
开发者ID:Mahendra-Shivaswamy-Devops,项目名称:pulp_rpm,代码行数:11,代码来源:fields.py
示例5: import_unknown_metadata_files
def import_unknown_metadata_files(self, metadata_files):
"""
Import metadata files whose type is not known to us. These are any files
that we are not already parsing.
:param metadata_files: object containing access to all metadata files
:type metadata_files: pulp_rpm.plugins.importers.yum.repomd.metadata.MetadataFiles
"""
for metadata_type, file_info in metadata_files.metadata.iteritems():
if metadata_type not in metadata_files.KNOWN_TYPES:
checksum_type = file_info['checksum']['algorithm']
checksum_type = verification.sanitize_checksum_type(checksum_type)
unit_metadata = {
'checksum': file_info['checksum']['hex_digest'],
'checksum_type': checksum_type,
}
model = models.YumMetadataFile(metadata_type,
self.sync_conduit.repo_id,
unit_metadata)
relative_path = os.path.join(model.relative_dir,
os.path.basename(file_info['local_path']))
unit = self.sync_conduit.init_unit(models.YumMetadataFile.TYPE, model.unit_key,
model.metadata, relative_path)
shutil.copyfile(file_info['local_path'], unit.storage_path)
self.sync_conduit.save_unit(unit)
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:26,代码来源:sync.py
示例6: process_download_reports
def process_download_reports(self, reports):
"""
Once downloading is complete, add information about each file to this
model instance. This is required before saving the new unit.
:param reports: list of successful download reports
:type reports: list(pulp.common.download.report.DownloadReport)
"""
# TODO: maybe this shouldn't be in common
metadata_files = self.metadata.setdefault('files', [])
for report in reports:
# the following data model is mostly intended to match what the
# previous importer generated.
metadata_files.append({
'checksum': report.data['checksum'],
'checksumtype': verification.sanitize_checksum_type(report.data['checksumtype']),
'downloadurl': report.url,
'filename': os.path.basename(report.data['relativepath']),
'fileName': os.path.basename(report.data['relativepath']),
'item_type': self.TYPE,
'pkgpath': os.path.join(
constants.DISTRIBUTION_STORAGE_PATH,
self.id,
os.path.dirname(report.data['relativepath']),
),
'relativepath': report.data['relativepath'],
'savepath': report.destination,
'size': report.total_bytes,
})
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:29,代码来源:models.py
示例7: process_package_element
def process_package_element(element):
"""
Process one XML block from prestodelta.xml and return a models.DRPM instance
:param element: object representing one "DRPM" block from the XML file
:type element: xml.etree.ElementTree.Element
:return: models.DRPM instance for the XML block
:rtype: pulp_rpm.plugins.db.models.DRPM
"""
delta = element.find("delta")
filename = delta.find("filename")
sequence = delta.find("sequence")
size = delta.find("size")
checksum = delta.find("checksum")
checksum_type = verification.sanitize_checksum_type(checksum.attrib["type"])
return models.DRPM.from_package_info(
{
"type": "drpm",
"new_package": element.attrib["name"],
"epoch": element.attrib["epoch"],
"version": element.attrib["version"],
"release": element.attrib["release"],
"arch": element.attrib["arch"],
"oldepoch": delta.attrib["oldepoch"],
"oldversion": delta.attrib["oldversion"],
"oldrelease": delta.attrib["oldrelease"],
"filename": filename.text,
"sequence": sequence.text,
"size": int(size.text),
"checksum": checksum.text,
"checksumtype": checksum_type,
}
)
开发者ID:hjensas,项目名称:pulp_rpm,代码行数:35,代码来源:presto.py
示例8: process_package_element
def process_package_element(element):
"""
Process one XML block from prestodelta.xml and return a models.DRPM instance
:param element: object representing one "DRPM" block from the XML file
:type element: xml.etree.ElementTree.Element
:return: models.DRPM instance for the XML block
:rtype: pulp_rpm.plugins.db.models.DRPM
"""
delta = element.find('delta')
filename = delta.find('filename')
sequence = delta.find('sequence')
size = delta.find('size')
checksum = delta.find('checksum')
checksum_type = verification.sanitize_checksum_type(checksum.attrib['type'])
return models.DRPM.from_package_info({
'type': 'drpm',
'new_package': element.attrib['name'],
'epoch': element.attrib['epoch'],
'version': element.attrib['version'],
'release': element.attrib['release'],
'arch': element.attrib['arch'],
'oldepoch': delta.attrib['oldepoch'],
'oldversion': delta.attrib['oldversion'],
'oldrelease': delta.attrib['oldrelease'],
'filename': filename.text,
'sequence': sequence.text,
'size': int(size.text),
'checksum': checksum.text,
'checksumtype': checksum_type,
})
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:33,代码来源:presto.py
示例9: import_unknown_metadata_files
def import_unknown_metadata_files(self, metadata_files):
"""
Import metadata files whose type is not known to us. These are any files
that we are not already parsing.
:param metadata_files: object containing access to all metadata files
:type metadata_files: pulp_rpm.plugins.importers.yum.repomd.metadata.MetadataFiles
"""
for metadata_type, file_info in metadata_files.metadata.iteritems():
if metadata_type not in metadata_files.KNOWN_TYPES:
file_path = file_info['local_path']
checksum_type = file_info['checksum']['algorithm']
checksum_type = verification.sanitize_checksum_type(checksum_type)
checksum = file_info['checksum']['hex_digest']
# Find an existing model
model = models.YumMetadataFile.objects.filter(
data_type=metadata_type,
repo_id=self.repo.repo_id).first()
# If an existing model, use that
if model:
model.checksum = checksum
model.checksum_type = checksum_type
else:
# Else, create a new mode
model = models.YumMetadataFile(
data_type=metadata_type,
repo_id=self.repo.repo_id,
checksum=checksum,
checksum_type=checksum_type)
model.set_storage_path(os.path.basename(file_path))
model.save_and_import_content(file_path)
# associate/re-associate model to the repo
repo_controller.associate_single_unit(self.repo, model)
开发者ID:dokuhebi,项目名称:pulp_rpm,代码行数:35,代码来源:sync.py
示例10: test_nothing_necessary
def test_nothing_necessary(self):
"""
Assert that the method doesn't change the checksum_type when it's not needed.
"""
checksum_type = verification.sanitize_checksum_type('sha512')
self.assertEqual(checksum_type, 'sha512')
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:test_verification.py
示例11: test_none
def test_none(self):
"""
Assert correct behavior when the checksum_type is None.
"""
checksum_type = verification.sanitize_checksum_type(None)
self.assertEqual(checksum_type, None)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:test_verification.py
示例12: test_SHA256_to_sha256
def test_SHA256_to_sha256(self):
"""
Assert that "SHA256" is converted to "sha256".
"""
checksum_type = verification.sanitize_checksum_type('SHA256')
self.assertEqual(checksum_type, 'sha256')
开发者ID:alanoe,项目名称:pulp,代码行数:7,代码来源:test_verification.py
示例13: process_package_element
def process_package_element(element):
"""
Process one XML block from prestodelta.xml and return a models.DRPM instance
:param element: object representing one "DRPM" block from the XML file
:type element: xml.etree.ElementTree.Element
:return: models.DRPM instance for the XML block
:rtype: pulp_rpm.plugins.db.models.DRPM
"""
delta = element.find('delta')
filename = delta.find('filename')
sequence = delta.find('sequence')
size = delta.find('size')
checksum = delta.find('checksum')
checksum_type = verification.sanitize_checksum_type(checksum.attrib['type'])
return models.DRPM(
new_package=element.attrib['name'],
epoch=element.attrib['epoch'],
version=element.attrib['version'],
release=element.attrib['release'],
arch=element.attrib['arch'],
oldepoch=delta.attrib['oldepoch'],
oldversion=delta.attrib['oldversion'],
oldrelease=delta.attrib['oldrelease'],
filename=filename.text,
sequence=sequence.text,
size=int(size.text),
checksum=checksum.text,
checksumtype=checksum_type)
开发者ID:Mahendra-Shivaswamy-Devops,项目名称:pulp_rpm,代码行数:31,代码来源:presto.py
示例14: test_ShA_to_sha1
def test_ShA_to_sha1(self):
"""
Assert that "ShA" is converted to "sha1".
"""
checksum_type = verification.sanitize_checksum_type('ShA')
self.assertEqual(checksum_type, 'sha1')
开发者ID:alanoe,项目名称:pulp,代码行数:7,代码来源:test_verification.py
示例15: process_download_reports
def process_download_reports(self, reports):
"""
Once downloading is complete, add information about each file to this
model instance. This is required before saving the new unit.
:param reports: list of successful download reports
:type reports: list(pulp.common.download.report.DownloadReport)
"""
if not isinstance(self.files, list):
self.files = []
for report in reports:
# the following data model is mostly intended to match what the
# previous importer generated.
self.files.append({
'checksum': report.data['checksum'],
'checksumtype': verification.sanitize_checksum_type(report.data['checksumtype']),
'downloadurl': report.url,
'filename': os.path.basename(report.data['relativepath']),
'fileName': os.path.basename(report.data['relativepath']),
'item_type': "distribution",
'pkgpath': os.path.join(
self._storage_path, os.path.dirname(report.data['relativepath']),
),
'relativepath': report.data['relativepath'],
'savepath': report.destination,
'size': report.total_bytes,
})
开发者ID:dkliban,项目名称:pulp_rpm,代码行数:28,代码来源:models.py
示例16: _migrate_rpmlike_units
def _migrate_rpmlike_units(unit_type):
"""
This function performs the migration on RPMs, DRPMs, and SRPMs. These all have the same schema
when it comes to checksumtype, so they can be treated the same way.
:param unit_type: The unit_type_id, as found in pulp_rpm.common.ids.
:type unit_type: basestring
"""
repos = connection.get_collection('repos')
repo_content_units = connection.get_collection('repo_content_units')
unit_collection = connection.get_collection('units_%s' % unit_type)
for unit in unit_collection.find():
try:
sanitized_type = verification.sanitize_checksum_type(unit['checksumtype'])
if sanitized_type != unit['checksumtype']:
# Let's see if we can get away with changing its checksumtype to the sanitized
# value. If this works, we won't have to do anything else.
unit_collection.update({'_id': unit['_id']},
{'$set': {'checksumtype': sanitized_type}})
except errors.DuplicateKeyError:
# Looks like there is already an identical unit with the sanitized checksum type. This
# means we need to remove the current unit, but first we will need to change any
# references to this unit to point to the other.
conflicting_unit = unit_collection.find_one(
{'name': unit['name'], 'epoch': unit['epoch'], 'version': unit['version'],
'release': unit['release'], 'arch': unit['arch'], 'checksum': unit['checksum'],
'checksumtype': sanitized_type})
for rcu in repo_content_units.find({'unit_type_id': unit_type, 'unit_id': unit['_id']}):
# Now we must either switch the rcu from pointing to unit to pointing to
# conflicting_unit, or delete the rcu if there is already one in the same repo.
try:
msg = _('Updating %(repo_id)s to contain %(type)s %(conflicting)s instead of '
'%(old_id)s.')
msg = msg % {'repo_id': rcu['repo_id'], 'type': unit_type,
'conflicting': conflicting_unit['_id'], 'old_id': unit['_id']}
_logger.debug(msg)
repo_content_units.update({'_id': rcu['_id']},
{'$set': {'unit_id': conflicting_unit['_id']}})
except errors.DuplicateKeyError:
# We will delete this RepoContentUnit since the sha1 RPM is already in the
# repository.
msg = _('Removing %(type)s %(old_id)s from repo %(repo_id)s since it conflicts '
'with %(conflicting)s.')
msg = msg % {'repo_id': rcu['repo_id'], 'type': unit_type,
'conflicting': conflicting_unit['_id'], 'old_id': unit['_id']}
_logger.debug(msg)
repo_content_units.remove({'_id': rcu['_id']})
# In this case, we now need to decrement the repository's "content_unit_counts"
# for this unit_type by one, since we removed a unit from a repository.
repos.update(
{'id': rcu['repo_id']},
{'$inc': {'content_unit_counts.%s' % unit_type: -1}})
# Now that we have removed or altered all references to the "sha" Unit, we need to
# remove it since it is a duplicate.
unit_collection.remove({'_id': unit['_id']})
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:56,代码来源:0017_merge_sha_sha1.py
示例17: _migrate_yum_metadata_files
def _migrate_yum_metadata_files():
"""
Migrate each YumMetadataFile to use the new sanitized checksum_type. This is mostly similar to
_migrate_rpmlike_units, except that the checksum type field name is checksum_type instead of
checksumtype, and there can't be any collisions since the checksum type isn't part of this
unit's unit_key. This means we don't have to worry about the repo_content_units table.
"""
collection = connection.get_collection('units_yum_repo_metadata_file')
for unit in collection.find():
sanitized_type = verification.sanitize_checksum_type(unit['checksum_type'])
if sanitized_type != unit['checksum_type']:
collection.update({'_id': unit['_id']},
{'$set': {'checksum_type': sanitized_type}})
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:13,代码来源:0017_merge_sha_sha1.py
示例18: update_unit_files
def update_unit_files(unit, files):
"""
Update the *files* list on the unit.
:param unit: A distribution model object.
:type unit: pulp_rpm.plugins.db.models.Distribution
:param files: List of distribution files.
:type files: list
"""
_list = []
if not isinstance(unit.files, list):
_list = list(unit.files)
for _file in files:
_list.append({
RELATIVE_PATH: _file[RELATIVE_PATH],
CHECKSUM: _file[CHECKSUM],
CHECKSUM_TYPE: verification.sanitize_checksum_type(_file[CHECKSUM_TYPE])})
unit.files = _list
开发者ID:goosemania,项目名称:pulp_rpm,代码行数:18,代码来源:treeinfo.py
示例19: process_successful_download_reports
def process_successful_download_reports(unit, reports):
"""
Once downloading is complete, add information about each file to this
model instance. This is required before saving the new unit.
:param reports: list of successful pulp.common.download.report.DownloadReport
:type reports: list
"""
files = []
if not isinstance(unit.files, list):
files = list(unit.files)
for report in reports:
_file = report.data
files.append({
RELATIVE_PATH: _file[RELATIVE_PATH],
CHECKSUM: _file[CHECKSUM],
CHECKSUM_TYPE: verification.sanitize_checksum_type(_file[CHECKSUM_TYPE])})
unit.files = files
开发者ID:goosemania,项目名称:pulp_rpm,代码行数:18,代码来源:treeinfo.py
示例20: _migrate_errata
def _migrate_errata():
"""
Visit each errata and check its references RPMs for the checksum type, sanitizing it if
necessary. Since these sums aren't part of the unit key for erratum, this will not cause any
collisions. The erratum also do not reference RPMs by unit_id, but by unit_key, so this is
important.
"""
errata = connection.get_collection('units_erratum')
for erratum in errata.find():
changed = False
pkglist = erratum.get('pkglist', [])
for collection in pkglist:
for package in collection.get('packages', []):
if 'sum' in package and package['sum']:
sanitized_type = verification.sanitize_checksum_type(package['sum'][0])
if sanitized_type != package['sum'][0]:
package['sum'][0] = sanitized_type
changed = True
if changed:
errata.update({'_id': erratum['_id']},
{'$set': {'pkglist': pkglist}})
开发者ID:AndreaGiardini,项目名称:pulp_rpm,代码行数:21,代码来源:0017_merge_sha_sha1.py
注:本文中的pulp.plugins.util.verification.sanitize_checksum_type函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论