本文整理汇总了Python中pulp_smash.tests.rpm.api_v2.utils.gen_repo函数的典型用法代码示例。如果您正苦于以下问题:Python gen_repo函数的具体用法?Python gen_repo怎么用?Python gen_repo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了gen_repo函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Conditionally skip tests. Create repositories with fixture data."""
cfg = config.get_config()
if selectors.bug_is_untestable(1991, cfg.version):
raise unittest.SkipTest('https://pulp.plan.io/issues/1991')
set_up_module()
# Fetch RPMs.
_SIGNED_PACKAGES['rpm'] = utils.http_get(RPM_URL)
_SIGNED_PACKAGES['srpm'] = utils.http_get(SRPM_URL)
_UNSIGNED_PACKAGES['rpm'] = utils.http_get(RPM_UNSIGNED_URL)
_UNSIGNED_PACKAGES['srpm'] = utils.http_get(SRPM_UNSIGNED_URL)
if selectors.bug_is_testable(1806, cfg.version):
_SIGNED_PACKAGES['drpm'] = utils.http_get(DRPM_URL)
_UNSIGNED_PACKAGES['drpm'] = utils.http_get(DRPM_UNSIGNED_URL)
# Create repos, and upload RPMs to them.
client = api.Client(cfg, api.json_handler)
try:
repo = client.post(REPOSITORY_PATH, gen_repo())
_REPOS['signed'] = repo
for type_id, pkg in _SIGNED_PACKAGES.items():
utils.upload_import_unit(cfg, pkg, type_id, repo['_href'])
repo = client.post(REPOSITORY_PATH, gen_repo())
_REPOS['unsigned'] = repo
for type_id, pkg in _UNSIGNED_PACKAGES.items():
utils.upload_import_unit(cfg, pkg, type_id, repo['_href'])
except:
_SIGNED_PACKAGES.clear()
_UNSIGNED_PACKAGES.clear()
for _ in range(len(_REPOS)):
client.delete(_REPOS.popitem()[1]['_href'])
raise
开发者ID:elyezer,项目名称:pulp-smash,代码行数:34,代码来源:test_signatures_checked_for_copies.py
示例2: setUpClass
def setUpClass(cls): # pylint:disable=arguments-differ
"""Create two repositories, first is feed of second one.
Provides server config and set of iterable to delete. Following steps
are executed:
1. Create repository foo with feed, sync and publish it.
2. Create repository bar with foo as a feed with
``retain_old_count=0``.
3. Run sync of repo foo.
4. Get information on both repositories.
"""
super(RetainOldCountTestCase, cls).setUpClass()
client = api.Client(cls.cfg)
cls.responses = {}
hrefs = [] # repository hrefs
# Create and sync the first repository.
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
cls.responses['first sync'] = client.post(
urljoin(hrefs[0], 'actions/sync/'),
{'override_config': {}}
)
# Add distributor and publish
cls.responses['distribute'] = client.post(
urljoin(hrefs[0], 'distributors/'),
gen_distributor(),
)
cls.responses['publish'] = client.post(
urljoin(hrefs[0], 'actions/publish/'),
{'id': cls.responses['distribute'].json()['id']},
)
# Create and sync the second repository. Ensure it fetches content from
# the first, and that the `retain_old_count` option is set correctly.
# We disable SSL validation for a practical reason: each HTTPS feed
# must have a certificate to work, which is burdensome to do here.
body = gen_repo()
body['importer_config']['feed'] = urljoin(
cls.cfg.base_url,
_PUBLISH_DIR +
cls.responses['distribute'].json()['config']['relative_url'],
)
body['importer_config']['retain_old_count'] = 0 # see docstring
body['importer_config']['ssl_validation'] = False
hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
cls.responses['second sync'] = client.post(
urljoin(hrefs[1], 'actions/sync/'),
{'override_config': {}}
)
# Read the repositories and mark them for deletion.
cls.repos = [client.get(href).json() for href in hrefs]
cls.resources.update(set(hrefs))
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:57,代码来源:test_retain_old_count.py
示例3: setUpClass
def setUpClass(cls):
"""Generate, fetch and parse a ``repomd.xml`` file.
Do the following:
1. Create an RPM repository, add a YUM distributor, and publish the
repository.
2. Fetch the ``repomd.xml`` file from the distributor, and parse it.
"""
super(RepoMDTestCase, cls).setUpClass()
# Create a repository. Add a yum distributor and publish it.
client = api.Client(cls.cfg, api.json_handler)
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
distributor = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor(),
)
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
# Fetch and parse repomd.xml
client.response_handler = xml_handler
path = urljoin('/pulp/repos/', distributor['config']['relative_url'])
path = urljoin(path, 'repodata/repomd.xml')
cls.root_element = client.get(path)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:29,代码来源:test_repomd.py
示例4: setUpClass
def setUpClass(cls):
"""Create an RPM repository, upload package groups, and publish."""
super(UploadPackageGroupsTestCase, cls).setUpClass()
# Create a repository and add a distributor to it.
client = api.Client(cls.cfg, api.json_handler)
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
distributor = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor(),
)
# Generate several package groups, import them into the repository, and
# publish the repository.
cls.package_groups = {
'minimal': _gen_minimal_group(),
'realistic': _gen_realistic_group(),
}
cls.tasks = {}
for key, package_group in cls.package_groups.items():
report = _upload_import_package_group(cls.cfg, repo, package_group)
cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
# Fetch the generated repodata of type 'group' (a.k.a. 'comps')
cls.root_element = get_repomd_xml(
cls.cfg,
urljoin('/pulp/repos/', distributor['config']['relative_url']),
'group'
)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_comps_xml.py
示例5: setUpClass
def setUpClass(cls):
"""Generate, fetch and parse a ``repomd.xml`` file.
Do the following:
1. Create an RPM repository with a YUM distributor and publish it.
2. Fetch the ``repomd.xml`` file from the distributor, and parse it.
"""
super(RepoMDTestCase, cls).setUpClass()
if check_issue_2277(cls.cfg):
raise unittest.SkipTest('https://pulp.plan.io/issues/2277')
# Create a repository with a yum distributor and publish it.
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
repo = client.get(repo['_href'], params={'details': True})
cls.resources.add(repo['_href'])
utils.publish_repo(cls.cfg, repo)
# Fetch and parse repomd.xml
client.response_handler = xml_handler
path = urljoin(
'/pulp/repos/',
repo['distributors'][0]['config']['relative_url'],
)
path = urljoin(path, 'repodata/repomd.xml')
cls.root_element = client.get(path)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:29,代码来源:test_repomd.py
示例6: setUpClass
def setUpClass(cls):
"""Publish a yum repo containing some updates."""
super(UpdateInfoTestCase, cls).setUpClass()
cls.tasks = {}
cls.errata = {}
client = api.Client(cls.cfg, api.json_handler)
# Create a repository for use by the test.
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
# add yum distributor to the repo
distribute = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor())
# import some errata
cls.import_updates(client, repo)
# ask for it to be published
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distribute['id']})
repo_url = urljoin('/pulp/repos/',
distribute['config']['relative_url'])
cls.updateinfo_tree = cls.get_repodata_xml(repo_url, 'updateinfo')
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:29,代码来源:test_updateinfo.py
示例7: setUpClass
def setUpClass(cls):
"""Creates, publishes repo and fetches repomd.xml."""
super(RepoMDTestCase, cls).setUpClass()
cls.tasks = {}
cls.errata = {}
client = api.Client(cls.cfg, api.json_handler)
# Create a repository for use by the test.
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
# add yum distributor to the repo
distribute = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor())
# ask for it to be published
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distribute['id']})
# fetch the repomd.xml
repo_url = urljoin('/pulp/repos/',
distribute['config']['relative_url'])
repomd_url = urljoin(repo_url, 'repodata/repomd.xml')
client = api.Client(cls.cfg, xml_handler)
cls.repomd_tree = client.get(repomd_url)
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:30,代码来源:test_repomd.py
示例8: make_repo
def make_repo(self, cfg, remote):
"""Create a repository with an importer and pair of distributors.
Create an RPM repository with:
* A yum importer with a valid feed.
* A yum distributor.
* An RPM rsync distributor referencing the yum distributor.
In addition, schedule the repository for deletion.
:param pulp_smash.config.ServerConfig cfg: Information about the Pulp
server being targeted.
:param remote: A dict for the RPM rsync distributor's ``remote``
section.
:returns: The repository's href, as a string.
"""
api_client = api.Client(cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
body['distributors'] = [gen_distributor()]
body['distributors'].append({
'distributor_id': utils.uuid4(),
'distributor_type_id': 'rpm_rsync_distributor',
'distributor_config': {
'predistributor_id': body['distributors'][0]['distributor_id'],
'remote': remote,
}
})
repo_href = api_client.post(REPOSITORY_PATH, body)['_href']
self.addCleanup(api_client.delete, repo_href)
return repo_href
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:32,代码来源:test_rsync_distributor.py
示例9: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Possibly skip the tests in this module. Create and sync an RPM repo.
Skip this module of tests if Pulp is older than version 2.9. (See `Pulp
#1724`_.) Then create an RPM repository with a feed and sync it. Test cases
may copy data from this repository but should **not** change it.
.. _Pulp #1724: https://pulp.plan.io/issues/1724
"""
set_up_module()
cfg = config.get_config()
if cfg.version < Version("2.9"):
raise unittest2.SkipTest("This module requires Pulp 2.9 or greater.")
# Create and sync a repository. If this set-up procedure grows, consider
# implementing a stack of tear-down actions
client = api.Client(cfg, api.json_handler)
body = gen_repo()
body["importer_config"]["feed"] = RPM_FEED_URL
global _REPO # pylint:disable=global-statement
_REPO = client.post(REPOSITORY_PATH, body)
try:
utils.sync_repo(cfg, _REPO["_href"])
except (exceptions.CallReportError, exceptions.TaskReportError, exceptions.TaskTimedOutError):
client.delete(_REPO["_href"])
raise
开发者ID:pcreech,项目名称:pulp-smash,代码行数:26,代码来源:test_no_op_publish.py
示例10: setUpClass
def setUpClass(cls):
"""Create an RPM repository, sync it, and remove some units from it.
After creating and syncing an RPM repository, we walk through the unit
type IDs listed in
:data:`pulp_smash.tests.rpm.api_v2.test_unassociate.RemoveAssociatedUnits.TYPE_IDS`
and remove on unit of each kind from the repository. We verify Pulp's
behaviour by recording repository contents pre and post removal.
"""
super(RemoveAssociatedUnits, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
sync_path = urljoin(repo['_href'], 'actions/sync/')
client.post(sync_path, {'override_config': {}})
# List starting content
cls.before_units = {
type_id: _list_repo_units_of_type(client, repo['_href'], type_id)
for type_id in cls.TYPE_IDS
}
# Remove one of each unit and store its id for later assertions
cls.removed_units = {}
for type_id, units_list in cls.before_units.items():
cls.removed_units[type_id] = units_list[0]
_remove_unit(client, repo['_href'], type_id, units_list[0])
# List final content
cls.after_units = {
type_id: _list_repo_units_of_type(client, repo['_href'], type_id)
for type_id in cls.TYPE_IDS
}
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:35,代码来源:test_unassociate.py
示例11: setUpClass
def setUpClass(cls):
"""Bind a consumer to a distributor.
Do the following:
1. Add a consumer.
2. Add a repository.
3. Add a distributor to the repository.
4. Bind the consumer to the distributor.
"""
super(BindConsumerTestCase, cls).setUpClass()
# Steps 1–3
client = api.Client(cls.cfg, api.json_handler)
cls.consumer = client.post(CONSUMER_PATH, {'id': utils.uuid4()})
repository = client.post(REPOSITORY_PATH, gen_repo())
distributor = client.post(
urljoin(repository['_href'], 'distributors/'),
gen_distributor()
)
cls.resources.add(repository['_href'])
# Step 4
client.response_handler = api.safe_handler
path = urljoin(CONSUMER_PATH, cls.consumer['consumer']['id'] + '/')
path = urljoin(path, 'bindings/')
cls.request = {
'binding_config': {'B': 21},
'distributor_id': distributor['id'],
'notify_agent': False,
'repo_id': distributor['repo_id'],
}
cls.response = client.post(path, cls.request)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:33,代码来源:test_consumer.py
示例12: health_check
def health_check(self):
"""Execute step three of the test plan."""
client = api.Client(self.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
self.addCleanup(api.Client(self.cfg).delete, repo['_href'])
client.post(
urljoin(repo['_href'], 'actions/sync/'),
{'override_config': {}},
)
distributor = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor(),
)
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
client.response_handler = api.safe_handler
url = urljoin('/pulp/repos/', distributor['config']['relative_url'])
url = urljoin(url, RPM)
pulp_rpm = client.get(url).content
# Does this RPM match the original RPM?
rpm = client.get(urljoin(RPM_FEED_URL, RPM)).content
self.assertEqual(rpm, pulp_rpm)
开发者ID:shubham90,项目名称:pulp-smash,代码行数:27,代码来源:test_broker.py
示例13: setUpClass
def setUpClass(cls):
"""Upload an erratum to a repo, publish, and download the erratum.
Do the following:
1. Create an RPM repository with a distributor.
2. Upload an erratum to the repository.
3. Publish the repository.
4. Fetch the repository's ``updateinfo.xml`` file.
"""
super(UploadErratumTestCase, cls).setUpClass()
cls.erratum = gen_erratum()
# Create an RPM repository with a feed and distributor.
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
# Sync content into the repository, and give it an erratum.
utils.sync_repo(cls.cfg, repo['_href'])
utils.upload_import_erratum(cls.cfg, cls.erratum, repo['_href'])
repo = client.get(repo['_href'], params={'details': True})
# Publish the repository, and fetch and parse updateinfo.xml
distributor = repo['distributors'][0]
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
path = urljoin('/pulp/repos/', distributor['config']['relative_url'])
cls.updateinfo = get_repomd_xml(cls.cfg, path, 'updateinfo')
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:34,代码来源:test_upload_publish.py
示例14: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Possibly skip the tests in this module. Create and sync an RPM repo.
Skip this module of tests if Pulp is older than version 2.9. (See `Pulp
#1724`_.) Then create an RPM repository with a feed and sync it. Test cases
may copy data from this repository but should **not** change it.
.. _Pulp #1724: https://pulp.plan.io/issues/1724
"""
set_up_module()
cfg = config.get_config()
if cfg.version < Version('2.9'):
raise unittest.SkipTest('This module requires Pulp 2.9 or greater.')
if check_issue_2277(cfg):
raise unittest.SkipTest('https://pulp.plan.io/issues/2277')
# Create and sync a repository.
client = api.Client(cfg, api.json_handler)
_CLEANUP.append((client.delete, [ORPHANS_PATH], {}))
body = gen_repo()
body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
_REPO.clear()
_REPO.update(client.post(REPOSITORY_PATH, body))
_CLEANUP.append((client.delete, [_REPO['_href']], {}))
try:
utils.sync_repo(cfg, _REPO['_href'])
except (exceptions.CallReportError, exceptions.TaskReportError,
exceptions.TaskTimedOutError):
tearDownModule()
raise
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:30,代码来源:test_no_op_publish.py
示例15: setUpClass
def setUpClass(cls):
"""Create a schedule to publish a repo, verify the ``total_run_count``.
Do the following:
1. Create a repository with a valid feed
2. Sync it
3. Schedule publish to run every 2 minutes
4. Wait for 130 seconds and read the schedule to get the number of
"publish" runs
"""
super(ScheduledPublishTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
# Create a repo with a valid feed and sync it
body = gen_repo()
body["importer_config"]["feed"] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo["_href"])
utils.sync_repo(cls.cfg, repo["_href"])
# Schedule a publish to run every 2 minutes
distributor = gen_distributor()
client.post(urljoin(repo["_href"], "distributors/"), distributor)
scheduling_url = "/".join(["distributors", distributor["distributor_id"], "schedules/publish/"])
schedule_path = urljoin(repo["_href"], scheduling_url)
schedule = client.post(schedule_path, {"schedule": "PT2M"})
# Wait for publish to run
time.sleep(130)
# Read the schedule
cls.response = client.get(schedule["_href"])
开发者ID:seandst,项目名称:pulp-smash,代码行数:33,代码来源:test_schedule_publish.py
示例16: setUpClass
def setUpClass(cls):
"""Create, sync and delete an RPM repository.
Doing this provides orphans that the remaining test methods can make
use of. If this method fails, it's possible that other repositories
exist with references to the same content units.
"""
super(OrphansTestCase, cls).setUpClass()
# Create orphans.
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
try:
utils.sync_repo(cls.cfg, repo['_href'])
finally:
client.delete(repo['_href'])
# Verify that orphans are present. Support for langpack content units
# was added in Pulp 2.9.
orphans = client.get(ORPHANS_PATH)
expected_count = 39
if cls.cfg.version >= Version('2.9'):
expected_count += 1
actual_count = _count_orphans(orphans)
if expected_count != actual_count:
# We can't use fail(), as it's an instance method.
raise AssertionError(
'Test case setup failed. We attempted to create {} orphans, '
'but actually created {}. Orphans: {}'
.format(expected_count, actual_count, orphans)
)
开发者ID:elyezer,项目名称:pulp-smash,代码行数:33,代码来源:test_orphan_remove.py
示例17: setUpClass
def setUpClass(cls):
"""Create a schedule to publish the repository.
Do the following:
1. Create a repository with a valid feed
2. Sync it
3. Schedule publish to run every 30 seconds
"""
super(CreateSuccessTestCase, cls).setUpClass()
client = api.Client(cls.cfg)
# Create a repo with a valid feed and sync it
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body).json()
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Schedule a publish to run every 30 seconds
distributor = gen_distributor()
distributor_url = urljoin(repo['_href'], 'distributors/')
client.post(
distributor_url,
distributor
)
scheduling_url = urljoin(
distributor_url,
'{}/schedules/publish/'.format(distributor['distributor_id']),
)
cls.response = client.post(
scheduling_url,
{'schedule': 'PT30S'}
)
cls.attrs = cls.response.json()
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:35,代码来源:test_schedule_publish.py
示例18: setUpClass
def setUpClass(cls):
"""Create an RPM repository, sync it, and remove some units from it.
After creating and syncing an RPM repository, we walk through the unit
type IDs listed in
:data:`pulp_smash.tests.rpm.api_v2.test_unassociate.RemoveUnitsTestCase.TYPE_IDS`
and remove on unit of each kind from the repository. We verify Pulp's
behaviour by recording repository contents pre and post removal.
"""
super(RemoveUnitsTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Remove one unit of each type.
cls.units_before = _search_units(cls.cfg, repo['_href'], cls.TYPE_IDS)
cls.units_removed = []
for type_id in cls.TYPE_IDS:
unit = random.choice(_get_units_by_type(cls.units_before, type_id))
cls.units_removed.append(unit)
_remove_unit(cls.cfg, repo['_href'], unit)
cls.units_after = _search_units(cls.cfg, repo['_href'], cls.TYPE_IDS)
开发者ID:seandst,项目名称:pulp-smash,代码行数:25,代码来源:test_unassociate.py
示例19: make_repo
def make_repo(self, cfg, dist_cfg_updates):
"""Create a repository with an importer and pair of distributors.
Create an RPM repository with:
* A yum importer with a valid feed.
* A yum distributor.
* An RPM rsync distributor referencing the yum distributor.
In addition, schedule the repository for deletion.
:param pulp_smash.config.ServerConfig cfg: Information about the Pulp
server being targeted.
:param dist_cfg_updates: A dict to be merged into the RPM rsync
distributor's ``distributor_config`` dict. At a minimum, this
argument should have a value of ``{'remote': {…}}``.
:returns: A detailed dict of information about the repo.
"""
api_client = api.Client(cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
body['distributors'] = [gen_distributor()]
body['distributors'].append({
'distributor_id': utils.uuid4(),
'distributor_type_id': 'rpm_rsync_distributor',
'distributor_config': {
'predistributor_id': body['distributors'][0]['distributor_id'],
}
})
body['distributors'][1]['distributor_config'].update(dist_cfg_updates)
repo = api_client.post(REPOSITORY_PATH, body)
self.addCleanup(api_client.delete, repo['_href'])
return api_client.get(repo['_href'], params={'details': True})
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:33,代码来源:test_rsync_distributor.py
示例20: _create_repository
def _create_repository(cfg, importer_config):
"""Create an RPM repository with the given importer configuration.
Return the repository's href.
"""
body = gen_repo()
body['importer_config'] = importer_config
return api.Client(cfg).post(REPOSITORY_PATH, body).json()['_href']
开发者ID:elyezer,项目名称:pulp-smash,代码行数:8,代码来源:test_signatures_checked_for_uploads.py
注:本文中的pulp_smash.tests.rpm.api_v2.utils.gen_repo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论