本文整理汇总了Python中pulp_smash.utils.sync_repo函数的典型用法代码示例。如果您正苦于以下问题:Python sync_repo函数的具体用法?Python sync_repo怎么用?Python sync_repo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sync_repo函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_all
def test_all(self):
"""Execute the test case business logic."""
cfg = config.get_config()
self.check_issue_2363(cfg)
repo_href = self.create_repo(cfg, RPM_MIRRORLIST_BAD, _gen_rel_url())
with self.assertRaises(TaskReportError):
utils.sync_repo(cfg, repo_href)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:7,代码来源:test_mirrorlist.py
示例2: setUpClass
def setUpClass(cls):
"""Create an RPM repository, sync it, and remove some units from it.
After creating and syncing an RPM repository, we walk through the unit
type IDs listed in
:data:`pulp_smash.tests.rpm.api_v2.test_unassociate.RemoveUnitsTestCase.TYPE_IDS`
and remove on unit of each kind from the repository. We verify Pulp's
behaviour by recording repository contents pre and post removal.
"""
super(RemoveUnitsTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Remove one unit of each type.
cls.units_before = _search_units(cls.cfg, repo['_href'], cls.TYPE_IDS)
cls.units_removed = []
for type_id in cls.TYPE_IDS:
unit = random.choice(_get_units_by_type(cls.units_before, type_id))
cls.units_removed.append(unit)
_remove_unit(cls.cfg, repo['_href'], unit)
cls.units_after = _search_units(cls.cfg, repo['_href'], cls.TYPE_IDS)
开发者ID:seandst,项目名称:pulp-smash,代码行数:25,代码来源:test_unassociate.py
示例3: setUpClass
def setUpClass(cls):
"""Upload an erratum to a repo, publish, and download the erratum.
Do the following:
1. Create an RPM repository with a distributor.
2. Upload an erratum to the repository.
3. Publish the repository.
4. Fetch the repository's ``updateinfo.xml`` file.
"""
super(UploadErratumTestCase, cls).setUpClass()
cls.erratum = gen_erratum()
# Create an RPM repository with a feed and distributor.
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
# Sync content into the repository, and give it an erratum.
utils.sync_repo(cls.cfg, repo['_href'])
utils.upload_import_erratum(cls.cfg, cls.erratum, repo['_href'])
repo = client.get(repo['_href'], params={'details': True})
# Publish the repository, and fetch and parse updateinfo.xml
distributor = repo['distributors'][0]
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
path = urljoin('/pulp/repos/', distributor['config']['relative_url'])
cls.updateinfo = get_repomd_xml(cls.cfg, path, 'updateinfo')
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:34,代码来源:test_upload_publish.py
示例4: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Possibly skip the tests in this module. Create and sync an RPM repo.
Skip this module of tests if Pulp is older than version 2.9. (See `Pulp
#1724`_.) Then create an RPM repository with a feed and sync it. Test cases
may copy data from this repository but should **not** change it.
.. _Pulp #1724: https://pulp.plan.io/issues/1724
"""
set_up_module()
cfg = config.get_config()
if cfg.version < Version('2.9'):
raise unittest.SkipTest('This module requires Pulp 2.9 or greater.')
if check_issue_2277(cfg):
raise unittest.SkipTest('https://pulp.plan.io/issues/2277')
# Create and sync a repository.
client = api.Client(cfg, api.json_handler)
_CLEANUP.append((client.delete, [ORPHANS_PATH], {}))
body = gen_repo()
body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
_REPO.clear()
_REPO.update(client.post(REPOSITORY_PATH, body))
_CLEANUP.append((client.delete, [_REPO['_href']], {}))
try:
utils.sync_repo(cfg, _REPO['_href'])
except (exceptions.CallReportError, exceptions.TaskReportError,
exceptions.TaskTimedOutError):
tearDownModule()
raise
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:30,代码来源:test_no_op_publish.py
示例5: setUpClass
def setUpClass(cls):
"""Create, sync and publish a repository. Fetch its ``comps.xml``."""
super(SyncRepoTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
# Create a repo.
body = gen_repo()
body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
# Sync and publish the repo.
repo = client.get(repo['_href'], params={'details': True})
utils.sync_repo(cls.cfg, repo['_href'])
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': repo['distributors'][0]['id']},
)
repo = client.get(repo['_href'], params={'details': True})
# Fetch and parse comps.xml.
dist = repo['distributors'][0]
dist_url = urljoin('/pulp/repos/', dist['config']['relative_url'])
cls.root_element = get_repomd_xml(cls.cfg, dist_url, 'group')
cls.xml_as_str = ElementTree.tostring(cls.root_element)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:26,代码来源:test_comps_xml.py
示例6: setUpClass
def setUpClass(cls):
"""Create a schedule to publish a repo, verify the ``total_run_count``.
Do the following:
1. Create a repository with a valid feed
2. Sync it
3. Schedule publish to run every 2 minutes
4. Wait for 130 seconds and read the schedule to get the number of
"publish" runs
"""
super(ScheduledPublishTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
# Create a repo with a valid feed and sync it
body = gen_repo()
body["importer_config"]["feed"] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo["_href"])
utils.sync_repo(cls.cfg, repo["_href"])
# Schedule a publish to run every 2 minutes
distributor = gen_distributor()
client.post(urljoin(repo["_href"], "distributors/"), distributor)
scheduling_url = "/".join(["distributors", distributor["distributor_id"], "schedules/publish/"])
schedule_path = urljoin(repo["_href"], scheduling_url)
schedule = client.post(schedule_path, {"schedule": "PT2M"})
# Wait for publish to run
time.sleep(130)
# Read the schedule
cls.response = client.get(schedule["_href"])
开发者ID:seandst,项目名称:pulp-smash,代码行数:33,代码来源:test_schedule_publish.py
示例7: test_all
def test_all(self):
"""Create, sync and publish an OSTree repository.
Verify that:
* The distributor's ``last_publish`` attribute is ``None`` after the
sync. This demonstrates that ``auto_publish`` correctly defaults to
``False``.
* The distributor's ``last_publish`` attribute is not ``None`` after
the publish.
"""
cfg = config.get_config()
client = api.Client(cfg, api.json_handler)
# Create a repository.
body = gen_repo()
body['importer_config']['feed'] = OSTREE_FEED
body['importer_config']['branches'] = [OSTREE_BRANCH]
body['distributors'].append(gen_distributor())
repo = client.post(REPOSITORY_PATH, body)
self.addCleanup(client.delete, repo['_href'])
# Sync the repository.
utils.sync_repo(cfg, repo['_href'])
repo = client.get(repo['_href'], params={'details': True})
with self.subTest(comment='verify last_publish after sync'):
self.assertIsNone(repo['distributors'][0]['last_publish'])
# Publish the repository.
utils.publish_repo(cfg, repo)
repo = client.get(repo['_href'], params={'details': True})
with self.subTest(comment='verify last_publish after publish'):
self.assertIsNotNone(repo['distributors'][0]['last_publish'])
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:33,代码来源:test_publish.py
示例8: setUpClass
def setUpClass(cls):
"""Create an RPM repository with a valid feed and sync it.
Do the following:
1. Reset Pulp, including the Squid cache.
2. Create a repository with the "on demand" download policy.
3. Sync and publish the repository.
4. Download an RPM from the published repository.
5. Download the same RPM to ensure it is served by the cache.
"""
super(OnDemandTestCase, cls).setUpClass()
# Ensure `locally_stored_units` is 0 before we start.
utils.reset_squid(cls.cfg)
utils.reset_pulp(cls.cfg)
# Create, sync and publish a repository.
repo = _create_repo(cls.cfg, 'on_demand')
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Read the repository.
client = api.Client(cls.cfg)
cls.repo = client.get(repo['_href'], params={'details': True}).json()
# Download the same RPM twice.
path = urljoin('/pulp/repos/', repo['id'] + '/')
path = urljoin(path, RPM)
cls.rpm = client.get(path)
cls.same_rpm = client.get(path)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:31,代码来源:test_download_policies.py
示例9: setUpClass
def setUpClass(cls):
"""Create a schedule to publish the repository.
Do the following:
1. Create a repository with a valid feed
2. Sync it
3. Schedule publish to run every 30 seconds
"""
super(CreateSuccessTestCase, cls).setUpClass()
client = api.Client(cls.cfg)
# Create a repo with a valid feed and sync it
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body).json()
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Schedule a publish to run every 30 seconds
distributor = gen_distributor()
distributor_url = urljoin(repo['_href'], 'distributors/')
client.post(
distributor_url,
distributor
)
scheduling_url = urljoin(
distributor_url,
'{}/schedules/publish/'.format(distributor['distributor_id']),
)
cls.response = client.post(
scheduling_url,
{'schedule': 'PT30S'}
)
cls.attrs = cls.response.json()
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:35,代码来源:test_schedule_publish.py
示例10: setUpClass
def setUpClass(cls):
"""Create, sync and delete an RPM repository.
Doing this provides orphans that the remaining test methods can make
use of. If this method fails, it's possible that other repositories
exist with references to the same content units.
"""
super(OrphansTestCase, cls).setUpClass()
# Create orphans.
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
try:
utils.sync_repo(cls.cfg, repo['_href'])
finally:
client.delete(repo['_href'])
# Verify that orphans are present. Support for langpack content units
# was added in Pulp 2.9.
orphans = client.get(ORPHANS_PATH)
expected_count = 39
if cls.cfg.version >= Version('2.9'):
expected_count += 1
actual_count = _count_orphans(orphans)
if expected_count != actual_count:
# We can't use fail(), as it's an instance method.
raise AssertionError(
'Test case setup failed. We attempted to create {} orphans, '
'but actually created {}. Orphans: {}'
.format(expected_count, actual_count, orphans)
)
开发者ID:elyezer,项目名称:pulp-smash,代码行数:33,代码来源:test_orphan_remove.py
示例11: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Possibly skip the tests in this module. Create and sync an RPM repo.
Skip this module of tests if Pulp is older than version 2.9. (See `Pulp
#1724`_.) Then create an RPM repository with a feed and sync it. Test cases
may copy data from this repository but should **not** change it.
.. _Pulp #1724: https://pulp.plan.io/issues/1724
"""
set_up_module()
cfg = config.get_config()
if cfg.version < Version("2.9"):
raise unittest2.SkipTest("This module requires Pulp 2.9 or greater.")
# Create and sync a repository. If this set-up procedure grows, consider
# implementing a stack of tear-down actions
client = api.Client(cfg, api.json_handler)
body = gen_repo()
body["importer_config"]["feed"] = RPM_FEED_URL
global _REPO # pylint:disable=global-statement
_REPO = client.post(REPOSITORY_PATH, body)
try:
utils.sync_repo(cfg, _REPO["_href"])
except (exceptions.CallReportError, exceptions.TaskReportError, exceptions.TaskTimedOutError):
client.delete(_REPO["_href"])
raise
开发者ID:pcreech,项目名称:pulp-smash,代码行数:26,代码来源:test_no_op_publish.py
示例12: test_all
def test_all(self):
"""Sync a repo whose updateinfo file has multiple pkglist sections.
Do the following:
1. Create and sync a repository with an importer and distributor.
Ensure the importer's feed is set to
:data:`pulp_smash.constants.RPM_PKGLISTS_UPDATEINFO_FEED_URL`.
2. Publish the repository, and fetch and parse its updateinfo file.
3. Verify the updateinfo contains the correct number of ``<pkglists>``
sections, with the correct contents in each.
"""
cfg = config.get_config()
if selectors.bug_is_untestable(2227, cfg.version):
self.skipTest('https://pulp.plan.io/issues/2277')
# Create and sync a repository.
client = api.Client(cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_PKGLISTS_UPDATEINFO_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
self.addCleanup(client.delete, repo['_href'])
utils.sync_repo(cfg, repo['_href'])
# Publish the repository, and fetch and parse its updateinfo file.
repo = client.get(repo['_href'], params={'details': True})
self.assertEqual(len(repo['distributors']), 1, repo['distributors'])
distributor = repo['distributors'][0]
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
root_element = get_repomd_xml(
cfg,
urljoin('/pulp/repos/', distributor['config']['relative_url']),
'updateinfo'
)
# Verify the contents of the updateinfo file.
debug = ElementTree.tostring(root_element)
pkglists = root_element.find('update').findall('pkglist')
self.assertEqual(len(pkglists), 3, debug)
collections = [pkglist.find('collection') for pkglist in pkglists]
names = {collection.find('name').text for collection in collections}
self.assertEqual(names, {'1', '2', '3'}, debug)
packages = {
collection.find('package').find('filename').text
for collection in collections
}
self.assertEqual(packages, {
'penguin-0.9.1-1.noarch.rpm',
'shark-0.1-1.noarch.rpm',
'walrus-5.21-1.noarch.rpm',
}, debug)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:57,代码来源:test_updateinfo.py
示例13: setUpClass
def setUpClass(cls):
"""Create a repository with a feed and sync it."""
super(PackagesDirectoryTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
cls.repo_href = client.post(REPOSITORY_PATH, body)['_href']
cls.resources.add(cls.repo_href)
utils.sync_repo(cls.cfg, cls.repo_href)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:9,代码来源:test_packages_directory.py
示例14: setUpClass
def setUpClass(cls):
"""Create and sync a repository."""
if inspect.getmro(cls)[0] == BaseSearchTestCase:
raise unittest.SkipTest("Abstract base class.")
super(BaseSearchTestCase, cls).setUpClass()
body = gen_repo()
body["importer_config"]["feed"] = cls.get_feed_url()
cls.repo = api.Client(cls.cfg).post(REPOSITORY_PATH, body).json()
cls.resources.add(cls.repo["_href"])
utils.sync_repo(cls.cfg, cls.repo["_href"])
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:10,代码来源:test_search.py
示例15: setUpClass
def setUpClass(cls):
"""Create a repository with a feed and sync it."""
super(PackagesDirectoryTestCase, cls).setUpClass()
if check_issue_2277(cls.cfg):
raise unittest.SkipTest('https://pulp.plan.io/issues/2277')
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
cls.repo_href = client.post(REPOSITORY_PATH, body)['_href']
cls.resources.add(cls.repo_href)
utils.sync_repo(cls.cfg, cls.repo_href)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:11,代码来源:test_packages_directory.py
示例16: setUpClass
def setUpClass(cls):
"""Create and sync a repository."""
super(ForceFullTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
cls.repo = client.get(repo['_href'], params={'details': True})
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:11,代码来源:test_force_full.py
示例17: setUpClass
def setUpClass(cls): # pylint:disable=arguments-differ
"""Create two repositories, first is feed of second one.
Provides server config and set of iterable to delete. Following steps
are executed:
1. Create repository foo with feed, sync and publish it.
2. Create repository bar with foo as a feed with
``retain_old_count=0``.
3. Run sync of repo foo.
4. Get information on both repositories.
"""
super(RetainOldCountTestCase, cls).setUpClass()
if selectors.bug_is_untestable(2277, cls.cfg.version):
raise unittest.SkipTest('https://pulp.plan.io/issues/2277')
client = api.Client(cls.cfg)
cls.responses = {}
hrefs = [] # repository hrefs
# Create and sync the first repository.
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
cls.responses['first sync'] = utils.sync_repo(cls.cfg, hrefs[0])
# Add distributor and publish
cls.responses['distribute'] = client.post(
urljoin(hrefs[0], 'distributors/'),
gen_distributor(),
)
cls.responses['publish'] = client.post(
urljoin(hrefs[0], 'actions/publish/'),
{'id': cls.responses['distribute'].json()['id']},
)
# Create and sync the second repository. Ensure it fetches content from
# the first, and that the `retain_old_count` option is set correctly.
# We disable SSL validation for a practical reason: each HTTPS feed
# must have a certificate to work, which is burdensome to do here.
body = gen_repo()
body['importer_config']['feed'] = urljoin(
cls.cfg.base_url,
_PUBLISH_DIR +
cls.responses['distribute'].json()['config']['relative_url'],
)
body['importer_config']['retain_old_count'] = 0 # see docstring
body['importer_config']['ssl_validation'] = False
hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
cls.responses['second sync'] = utils.sync_repo(cls.cfg, hrefs[1])
# Read the repositories and mark them for deletion.
cls.repos = [client.get(href).json() for href in hrefs]
cls.resources.update(set(hrefs))
开发者ID:elyezer,项目名称:pulp-smash,代码行数:53,代码来源:test_retain_old_count.py
示例18: _create_sync_repo
def _create_sync_repo(self, feed_url):
"""Create a repository with the given feed and sync it.
Return the repository's href.
"""
self.addCleanup(self.client.delete, ORPHANS_PATH)
body = gen_repo()
body['importer_config']['feed'] = feed_url
repo_href = self.client.post(REPOSITORY_PATH, body)['_href']
self.addCleanup(self.client.delete, repo_href)
utils.sync_repo(self.cfg, repo_href)
return repo_href
开发者ID:elyezer,项目名称:pulp-smash,代码行数:12,代码来源:test_signatures_saved_for_packages.py
示例19: _create_sync_repo
def _create_sync_repo(self, cfg):
"""Create and sync a repository. Return a detailed dict of repo info.
Also, schedule the repository for deletion with ``addCleanup()``.
"""
client = api.Client(cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_UNSIGNED_FEED_URL
body['distributors'] = [gen_distributor()]
repo = client.post(REPOSITORY_PATH, body)
self.addCleanup(client.delete, repo['_href'])
utils.sync_repo(cfg, repo['_href'])
repo = client.get(repo['_href'], params={'details': True})
return client.get(repo['_href'], params={'details': True})
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:14,代码来源:test_repomd.py
示例20: setUpClass
def setUpClass(cls):
"""Create an RPM repo, sync it, delete the repo, remove orphans."""
super(OrphanRemoveAllTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body["importer_config"]["feed"] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
utils.sync_repo(cls.cfg, repo["_href"])
cls.num_orphans_pre_repo_del = _count_orphans(client)
client.delete(repo["_href"])
cls.num_orphans_post_repo_del = _count_orphans(client)
client.delete(ORPHANS_PATH)
cls.num_orphans_after_rm = _count_orphans(client)
开发者ID:seandst,项目名称:pulp-smash,代码行数:14,代码来源:test_orphan_remove.py
注:本文中的pulp_smash.utils.sync_repo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论