本文整理汇总了Python中pulp_puppet.common.model.Module类的典型用法代码示例。如果您正苦于以下问题:Python Module类的具体用法?Python Module怎么用?Python Module使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Module类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_update_from_json
def test_update_from_json(self):
# Setup
module = Module('jdob-valid', '1.0.0', 'jdob')
# Test
module.update_from_json(VALID_MODULE_METADATA_JSON)
# Verify
self.assert_valid_module(module)
开发者ID:jlsherrill,项目名称:pulp,代码行数:9,代码来源:test_common_model.py
示例2: handle_uploaded_unit
def handle_uploaded_unit(repo, type_id, unit_key, metadata, file_path, conduit):
"""
Handles an upload unit request to the importer. This call is responsible
for moving the unit from its temporary location where Pulp stored the
upload to the final storage location (as dictated by Pulp) for the unit.
This call will also update the database in Pulp to reflect the unit
and its association to the repository.
:param repo: repository into which the unit is being uploaded
:type repo: pulp.plugins.model.Repository
:param type_id: type of unit being uploaded
:type type_id: str
:param unit_key: unique identifier for the unit
:type unit_key: dict
:param metadata: extra data about the unit
:type metadata: dict
:param file_path: temporary location of the uploaded file
:type file_path: str
:param conduit: for calls back into Pulp
:type conduit: pulp.plugins.conduit.upload.UploadConduit
"""
if type_id != constants.TYPE_PUPPET_MODULE:
raise NotImplementedError()
# Create a module with unit_key if supplied
initial_module = None
if unit_key:
initial_module = Module.from_dict(unit_key)
# Extract the metadata from the module
extracted_data = metadata_parser.extract_metadata(file_path, repo.working_dir, initial_module)
checksum = metadata_parser.calculate_checksum(file_path)
# Create a module from the metadata
module = Module.from_json(extracted_data)
module.checksum = checksum
# Create the Pulp unit
type_id = constants.TYPE_PUPPET_MODULE
unit_key = module.unit_key()
unit_metadata = module.unit_metadata()
relative_path = constants.STORAGE_MODULE_RELATIVE_PATH % module.filename()
unit = conduit.init_unit(type_id, unit_key, unit_metadata, relative_path)
# Copy from the upload temporary location into where Pulp wants it to live
shutil.copy(file_path, unit.storage_path)
# Save the unit into the destination repository
conduit.save_unit(unit)
return {'success_flag': True, 'summary': '', 'details': {}}
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:53,代码来源:upload.py
示例3: test_generate_unit_key
def test_generate_unit_key(self):
# Test
key = self.command.generate_unit_key(self.filename)
# Verify
expected_key = Module.generate_unit_key('valid', '1.0.0', 'jdob')
self.assertEqual(key, expected_key)
开发者ID:ehelms,项目名称:pulp,代码行数:7,代码来源:test_extension_upload.py
示例4: NegativeMetadataTests
class NegativeMetadataTests(unittest.TestCase):
def setUp(self):
self.author = "jdob"
self.name = None # set in test itself
self.version = "1.0.0"
self.module = Module(self.name, self.version, self.author)
self.module_dir = os.path.join(DATA_DIR, "bad-modules")
self.tmp_dir = tempfile.mkdtemp(prefix="puppet-metadata-tests")
def tearDown(self):
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
def test_extract_metadata_bad_tarball(self):
# Setup
self.module.name = "empty"
filename = os.path.join(self.module_dir, self.module.filename())
# Test
try:
metadata.extract_metadata(self.module, filename, self.tmp_dir)
self.fail()
except metadata.ExtractionException, e:
self.assertEqual(e.module_filename, filename)
开发者ID:ehelms,项目名称:pulp,代码行数:26,代码来源:test_importer_metadata.py
示例5: _import_modules
def _import_modules(self, inventory, module_paths):
"""
Import the puppet modules (tarballs) at the specified paths.
:param inventory: A module inventory object.
:type inventory: Inventory
:param module_paths: A list of paths to puppet module files.
:type module_paths: list
:return: A list of the imported module unit keys.
:rtype: list
"""
imported_modules = []
for module_path in module_paths:
if self.canceled:
return []
puppet_manifest = self._extract_metadata(module_path)
module = Module.from_json(puppet_manifest)
if inventory.already_associated(module):
# Decrement the total number of modules we're importing
self.report.modules_total_count -= 1
continue
_LOG.info(IMPORT_MODULE % dict(mod=module_path))
imported_modules.append(module.unit_key())
self._add_module(module_path, module)
self.report.modules_finished_count += 1
self.report.update_progress()
# Write the report, making sure we don't overwrite the a failure in _fetch_modules
if self.report.modules_state != constants.STATE_FAILED:
self.report.modules_state = constants.STATE_SUCCESS
self.report.modules_execution_time = time() - self.started_fetch_modules
self.report.update_progress()
return imported_modules
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:34,代码来源:directory.py
示例6: _generate_metadata
def _generate_metadata(self, modules):
"""
Generates the repository metadata document for all modules in the
:type modules: list of pulp.plugins.model.AssociatedUnit
"""
_logger.info('Generating metadata for repository <%s>' % self.repo.id)
# Convert the Pulp data types into the local model
metadata = RepositoryMetadata()
for m in modules:
combined = copy.copy(m.unit_key)
combined.update(m.metadata)
module = Module.from_dict(combined)
metadata.modules.append(module)
# Write the JSON representation of the metadata to the repository
json_metadata = metadata.to_json()
build_dir = self._build_dir()
metadata_file = os.path.join(build_dir, constants.REPO_METADATA_FILENAME)
f = open(metadata_file, 'w')
f.write(json_metadata)
f.close()
开发者ID:bmbouter,项目名称:pulp_puppet,代码行数:25,代码来源:publish.py
示例7: NegativeMetadataTests
class NegativeMetadataTests(unittest.TestCase):
def setUp(self):
self.author = 'jdob'
self.name = None # set in test itself
self.version = '1.0.0'
self.module = Module(self.name, self.version, self.author)
self.module_dir = os.path.join(DATA_DIR, 'bad-modules')
self.tmp_dir = tempfile.mkdtemp(prefix='puppet-metadata-tests')
def tearDown(self):
if os.path.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)
def test_extract_metadata_bad_tarball(self):
# Setup
self.module.name = 'empty'
filename = os.path.join(self.module_dir, self.module.filename())
# Test
try:
metadata.extract_metadata(self.module, filename, self.tmp_dir)
self.fail()
except metadata.ExtractionException, e:
self.assertEqual(e.module_filename, filename)
self.assertEqual(e.property_names[0], filename)
self.assertTrue(isinstance(e, InvalidValue))
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:29,代码来源:test_importer_metadata.py
示例8: setUp
def setUp(self):
self.author = 'jdob'
self.name = 'valid'
self.version = '1.0.0'
self.module = Module(self.name, self.version, self.author)
self.module_dir = os.path.join(DATA_DIR, 'good-modules', 'jdob-valid', 'pkg')
self.tmp_dir = tempfile.mkdtemp(prefix='puppet-metadata-tests')
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:9,代码来源:test_importer_metadata.py
示例9: test_from_dict
def test_from_dict(self):
# Setup
data = json.loads(VALID_MODULE_METADATA_JSON)
# Test
module = Module.from_dict(data)
# Verify
self.assert_valid_module(module)
开发者ID:jlsherrill,项目名称:pulp,代码行数:9,代码来源:test_common_model.py
示例10: test_generate_unit_key_complex_version
def test_generate_unit_key_complex_version(self):
filename = os.path.join(MODULES_DIR, 'jdob-valid-1.0.0-rc1.tar.gz')
# Test
key = self.command.generate_unit_key(filename)
# Verify
expected_key = Module.generate_unit_key('valid', '1.0.0-rc1', 'jdob')
self.assertEqual(key, expected_key)
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:9,代码来源:test_extension_upload.py
示例11: _do_import_modules
def _do_import_modules(self, metadata):
"""
Actual logic of the import. This method will do a best effort per module;
if an individual module fails it will be recorded and the import will
continue. This method will only raise an exception in an extreme case
where it cannot react and continue.
"""
def unit_key_str(unit_key_dict):
"""
Converts the unit key dict form into a single string that can be
used as the key in a dict lookup.
"""
template = '%s-%s-%s'
return template % (encode_unicode(unit_key_dict['name']),
encode_unicode(unit_key_dict['version']),
encode_unicode(unit_key_dict['author']))
downloader = self._create_downloader()
self.downloader = downloader
# Ease lookup of modules
modules_by_key = dict([(unit_key_str(m.unit_key()), m) for m in metadata.modules])
# Collect information about the repository's modules before changing it
module_criteria = UnitAssociationCriteria(type_ids=[constants.TYPE_PUPPET_MODULE])
existing_units = self.sync_conduit.get_units(criteria=module_criteria)
existing_modules = [Module.from_unit(x) for x in existing_units]
existing_module_keys = [unit_key_str(m.unit_key()) for m in existing_modules]
new_unit_keys = self._resolve_new_units(existing_module_keys, modules_by_key.keys())
remove_unit_keys = self._resolve_remove_units(existing_module_keys, modules_by_key.keys())
# Once we know how many things need to be processed, we can update the
# progress report
self.progress_report.modules_total_count = len(new_unit_keys)
self.progress_report.modules_finished_count = 0
self.progress_report.modules_error_count = 0
self.progress_report.update_progress()
# Add new units
for key in new_unit_keys:
if self._canceled:
break
module = modules_by_key[key]
try:
self._add_new_module(downloader, module)
self.progress_report.modules_finished_count += 1
except Exception, e:
self.progress_report.add_failed_module(module, e, sys.exc_info()[2])
self.progress_report.update_progress()
开发者ID:hjensas,项目名称:pulp_puppet,代码行数:52,代码来源:forge.py
示例12: test_from_json
def test_from_json(self):
# Setup
data = json.loads(VALID_MODULE_METADATA_JSON)
# Test
module = Module.from_json(data)
# Verify
self.assertEqual(module.name, "valid")
self.assertEqual(module.author, "jdob")
module.name = "jdob-valid" # rename the module to use the assert
self.assert_valid_module(module)
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:13,代码来源:test_common_model.py
示例13: test_extract_metadata
def test_extract_metadata(self):
# Setup
filename = os.path.join(self.module_dir, self.module.filename())
# Test
metadata_json = metadata.extract_metadata(filename, self.tmp_dir)
self.module = Module.from_json(metadata_json)
# Verify
self.assertEqual(self.module.name, 'valid')
self.assertEqual(self.module.version, '1.0.0')
self.assertEqual(self.module.author, 'jdob')
self._assert_test_module_metadata()
开发者ID:jeremycline,项目名称:pulp_puppet,代码行数:14,代码来源:test_metadata.py
示例14: test_extract_metadata_no_module
def test_extract_metadata_no_module(self, mkdtemp):
# Setup
filename = os.path.join(self.module_dir, self.module.filename())
extraction_dir = os.path.join(self.tmp_dir, "1234")
mkdtemp.return_value = extraction_dir
metadata_json = metadata.extract_metadata(filename, self.tmp_dir)
self.module = Module.from_json(metadata_json)
# Verify
self.assertEqual(self.module.name, 'valid')
self.assertEqual(self.module.version, '1.0.0')
self.assertEqual(self.module.author, 'jdob')
self._assert_test_module_metadata()
self.assertTrue(not os.path.exists(extraction_dir))
开发者ID:jeremycline,项目名称:pulp_puppet,代码行数:17,代码来源:test_metadata.py
示例15: test_from_json_old_name
def test_from_json_old_name(self):
"""
Test that the Module.from_json method handles the old naming style
"""
# Setup
metadata = {
'name': 'oldauthor/oldmodule',
'version': '0.1.0',
}
# Test
module = Module.from_json(metadata)
# Verify
self.assertEqual(module.author, 'oldauthor')
self.assertEqual(module.name, 'oldmodule')
self.assertEqual(module.version, '0.1.0')
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:17,代码来源:test_common_model.py
示例16: _associated
def _associated(conduit):
"""
Retrieve the modules associated with a repository, build a set containing
the unit_key for each and return it.
:param conduit: Provides access to relevant Pulp functionality.
:type conduit: pulp.plugins.conduits.unit_import.ImportUnitConduit
:return: The set of unit keys.
:rtype: set
"""
key_set = set()
criteria = UnitAssociationCriteria(
type_ids=[constants.TYPE_PUPPET_MODULE], unit_fields=Module.UNIT_KEY_NAMES)
units = conduit.get_units(criteria=criteria, as_generator=True)
for unit in units:
module = Module.from_unit(unit)
key_set.add(tuple(module.unit_key().items()))
return key_set
开发者ID:asmacdo,项目名称:pulp_puppet,代码行数:18,代码来源:directory.py
示例17: test_extract_metadata_non_standard_packaging
def test_extract_metadata_non_standard_packaging(self):
# Setup
self.module = Module('misnamed', '1.0.0', 'ldob')
self.module_dir = os.path.join(DATA_DIR, 'bad-modules')
filename = os.path.join(self.module_dir, self.module.filename())
# Test
metadata.extract_metadata(self.module, filename, self.tmp_dir)
# Verify - contains the same module as jdob-valid-1.0.0, so this is safe
self.assertEqual(self.module.name, 'misnamed')
self.assertEqual(self.module.version, '1.0.0')
self.assertEqual(self.module.author, 'ldob')
self._assert_test_module_metadata()
extraction_root = os.path.join(self.tmp_dir, self.module.author)
self.assertTrue(not os.path.exists(extraction_root))
开发者ID:abhaychrungoo,项目名称:pulp_puppet,代码行数:18,代码来源:test_importer_metadata.py
示例18: handle_uploaded_unit
def handle_uploaded_unit(type_id, unit_key, metadata, file_path, conduit):
"""
Handles an upload unit request to the importer. This call is responsible
for moving the unit from its temporary location where Pulp stored the
upload to the final storage location (as dictated by Pulp) for the unit.
This call will also update the database in Pulp to reflect the unit
and its association to the repository.
:param type_id: type of unit being uploaded
:type type_id: str
:param unit_key: unique identifier for the unit
:type unit_key: dict
:param metadata: extra data about the unit
:type metadata: dict
:param file_path: temporary location of the uploaded file
:type file_path: str
:param conduit: for calls back into Pulp
:type conduit: pulp.plugins.conduit.upload.UploadConduit
"""
if type_id != constants.TYPE_PUPPET_MODULE:
raise NotImplementedError()
# Create a module out of the uploaded metadata
combined = copy.copy(unit_key)
combined.update(metadata)
module = Module.from_dict(combined)
# Create the Pulp unit
type_id = constants.TYPE_PUPPET_MODULE
unit_key = module.unit_key()
unit_metadata = module.unit_metadata()
relative_path = constants.STORAGE_MODULE_RELATIVE_PATH % module.filename()
unit = conduit.init_unit(type_id, unit_key, unit_metadata, relative_path)
# Copy from the upload temporary location into where Pulp wants it to live
shutil.copy(file_path, unit.storage_path)
# Save the unit into the destination repository
conduit.save_unit(unit)
开发者ID:jlsherrill,项目名称:pulp,代码行数:41,代码来源:upload.py
示例19: _import_modules
def _import_modules(self, module_paths):
"""
Import the puppet modules (tarballs) at the specified paths. This will also handle
removing any modules in the local repository if they are no longer present on remote
repository and the 'remove_missing' config value is True.
:param module_paths: A list of paths to puppet module files.
:type module_paths: list
"""
criteria = UnitAssociationCriteria(type_ids=[constants.TYPE_PUPPET_MODULE],
unit_fields=Module.UNIT_KEY_NAMES)
local_units = self.conduit.get_units(criteria=criteria)
local_unit_keys = [unit.unit_key for unit in local_units]
remote_unit_keys = []
for module_path in module_paths:
if self.canceled:
return
puppet_manifest = self._extract_metadata(module_path)
module = Module.from_json(puppet_manifest)
remote_unit_keys.append(module.unit_key())
# Even though we've already basically processed this unit, not doing this makes the
# progress reporting confusing because it shows Pulp always importing all the modules.
if module.unit_key() in local_unit_keys:
self.report.modules_total_count -= 1
continue
_logger.debug(IMPORT_MODULE % dict(mod=module_path))
self._add_module(module_path, module)
self.report.modules_finished_count += 1
self.report.update_progress()
# Write the report, making sure we don't overwrite a failure in _fetch_modules
if self.report.modules_state not in constants.COMPLETE_STATES:
self.report.modules_state = constants.STATE_SUCCESS
self.report.modules_execution_time = time() - self.started_fetch_modules
self.report.update_progress()
remove_missing = self.config.get_boolean(constants.CONFIG_REMOVE_MISSING)
if remove_missing is None:
remove_missing = constants.DEFAULT_REMOVE_MISSING
if remove_missing:
self._remove_missing(local_units, remote_unit_keys)
开发者ID:bmbouter,项目名称:pulp_puppet,代码行数:43,代码来源:directory.py
示例20: test_extract_metadata_non_standard_packaging
def test_extract_metadata_non_standard_packaging(self, mkdtemp):
# Setup
self.module = Module('misnamed', '1.0.0', 'ldob')
self.module_dir = os.path.join(DATA_DIR, 'bad-modules')
filename = os.path.join(self.module_dir, self.module.filename())
extraction_dir = os.path.join(self.tmp_dir, "test")
mkdtemp.return_value = extraction_dir
# Test
metadata_json = metadata.extract_metadata(filename, self.tmp_dir)
self.module.update_from_dict(metadata_json)
# Verify - contains the same module as jdob-valid-1.0.0, so this is safe
self.assertEqual(self.module.name, 'misnamed')
self.assertEqual(self.module.version, '1.0.0')
self.assertEqual(self.module.author, 'ldob')
self._assert_test_module_metadata()
self.assertTrue(not os.path.exists(extraction_dir))
开发者ID:jeremycline,项目名称:pulp_puppet,代码行数:20,代码来源:test_metadata.py
注:本文中的pulp_puppet.common.model.Module类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论