本文整理汇总了Python中pulp_smash.api.poll_spawned_tasks函数的典型用法代码示例。如果您正苦于以下问题:Python poll_spawned_tasks函数的具体用法?Python poll_spawned_tasks怎么用?Python poll_spawned_tasks使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了poll_spawned_tasks函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: publish
def publish(cfg, publisher, repo, version_href=None):
"""Publish a repository.
:param pulp_smash.config.PulpSmashConfig cfg: Information about the Pulp
host.
:param publisher: A dict of information about the publisher of the
repository to be published.
:param repo: A dict of information about the repository.
:param version_href: The repository version to be published.
:returns: A publication. A dict of information about the just created
publication.
"""
if version_href is None:
body = {'repository': repo['_href']}
else:
body = {'repository_version': version_href}
client = api.Client(cfg, api.json_handler)
call_report = client.post(urljoin(publisher['_href'], 'publish/'), body)
# As of this writing, Pulp 3 only returns one task. If Pulp 3 starts
# returning multiple tasks, this may need to be re-written.
tasks = tuple(api.poll_spawned_tasks(cfg, call_report))
if len(tasks) != 1:
message = (
'Multiple tasks were spawned in response to API call. This is '
'unexpected, and Pulp Smash may handle the response incorrectly. '
'Here is the tasks generated: {}'
)
message = message.format(tasks)
warnings.warn(message, RuntimeWarning)
return client.get(tasks[-1]['created_resources'][0])
开发者ID:dralley,项目名称:pulp-smash,代码行数:30,代码来源:utils.py
示例2: setUpClass
def setUpClass(cls):
"""Create and sync two puppet repositories."""
super(SyncValidFeedTestCase, cls).setUpClass()
utils.reset_pulp(cls.cfg) # See: https://pulp.plan.io/issues/1406
bodies = tuple((_gen_repo() for _ in range(2)))
for i, query in enumerate((
_PUPPET_QUERY, _PUPPET_QUERY.replace('-', '_'))):
bodies[i]['importer_config'] = {
'feed': _PUPPET_FEED,
'queries': [query],
}
client = api.Client(cls.cfg, api.json_handler)
repos = [client.post(REPOSITORY_PATH, body) for body in bodies]
cls.resources.update({repo['_href'] for repo in repos})
# Trigger repository sync and collect completed tasks.
cls.reports = [] # raw responses to "start syncing" commands
cls.tasks = [] # completed tasks
client.response_handler = api.echo_handler
for repo in repos:
report = client.post(urljoin(repo['_href'], 'actions/sync/'))
report.raise_for_status()
cls.reports.append(report)
for task in api.poll_spawned_tasks(cls.cfg, report.json()):
cls.tasks.append(task)
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:25,代码来源:test_sync_publish.py
示例3: setUpClass
def setUpClass(cls):
"""Create an RPM repository, upload package groups, and publish."""
super(UploadPackageGroupsTestCase, cls).setUpClass()
# Create a repository and add a distributor to it.
client = api.Client(cls.cfg, api.json_handler)
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
distributor = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor(),
)
# Generate several package groups, import them into the repository, and
# publish the repository.
cls.package_groups = {
'minimal': _gen_minimal_group(),
'realistic': _gen_realistic_group(),
}
cls.tasks = {}
for key, package_group in cls.package_groups.items():
report = _upload_import_package_group(cls.cfg, repo, package_group)
cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
# Fetch the generated repodata of type 'group' (a.k.a. 'comps')
cls.root_element = get_repomd_xml(
cls.cfg,
urljoin('/pulp/repos/', distributor['config']['relative_url']),
'group'
)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_comps_xml.py
示例4: setUpClass
def setUpClass(cls):
"""Create an RPM repository with a valid feed and sync it.
Do the following:
1. Reset Pulp, including the Squid cache.
2. Create a repository with the "background" download policy.
3. Sync and publish the repository.
4. Download an RPM from the repository.
"""
super(BackgroundTestCase, cls).setUpClass()
if (selectors.bug_is_untestable(1905, cls.cfg.version) and
_os_is_rhel6(cls.cfg)):
raise unittest.SkipTest('https://pulp.plan.io/issues/1905')
# Required to ensure content is actually downloaded.
utils.reset_squid(cls.cfg)
utils.reset_pulp(cls.cfg)
# Create, sync and publish a repository.
repo = _create_repo(cls.cfg, 'background')
cls.resources.add(repo['_href'])
report = utils.sync_repo(cls.cfg, repo['_href']).json()
# Record the tasks spawned when syncing the repository, and the state
# of the repository itself after the sync.
client = api.Client(cls.cfg)
cls.repo = client.get(repo['_href'], params={'details': True}).json()
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, report))
# Download an RPM.
path = urljoin('/pulp/repos/', repo['id'] + '/')
path = urljoin(path, RPM)
cls.rpm = client.get(path)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_download_policies.py
示例5: setUpClass
def setUpClass(cls):
"""Create an OSTree repository with a valid feed and branch."""
super(SyncTestCase, cls).setUpClass()
body = gen_repo()
body['importer_config']['feed'] = OSTREE_FEED
body['importer_config']['branches'] = [OSTREE_BRANCH]
repo = api.Client(cls.cfg).post(REPOSITORY_PATH, body).json()
cls.resources.add(repo['_href'])
cls.report = utils.sync_repo(cls.cfg, repo['_href'])
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:seandst,项目名称:pulp-smash,代码行数:10,代码来源:test_sync_publish.py
示例6: test_01_force_full_false
def test_01_force_full_false(self):
"""Publish the repository and set ``force_full`` to false.
A full publish should occur.
"""
call_report = self.publish_repo(force_full=False)
last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
task_steps = last_task['result']['details']
step = self.get_step(task_steps, 'rpms')
self.assertGreater(step['num_processed'], 0, step)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:10,代码来源:test_force_full.py
示例7: setUpClass
def setUpClass(cls):
"""Create an RPM repository with an invalid feed and sync it."""
super(SyncInvalidFeedTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body["importer_config"]["feed"] = utils.uuid4()
repo = client.post(REPOSITORY_PATH, body)
client.response_handler = api.echo_handler
cls.report = client.post(urljoin(repo["_href"], "actions/sync/"))
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
cls.resources.add(repo["_href"])
开发者ID:pcreech,项目名称:pulp-smash,代码行数:11,代码来源:test_sync_publish.py
示例8: test_task_progress_report
def test_task_progress_report(self):
"""Assert no task's progress report contains error details.
Other assertions about the final state of each task are handled by the
client's response handler. (For more information, see the source of
:func:`pulp_smash.api.safe_handler`.)
"""
tasks = tuple(api.poll_spawned_tasks(self.cfg, self.report.json()))
for i, task in enumerate(tasks):
with self.subTest(i=i):
error_details = task['progress_report']['yum_importer']['content']['error_details'] # noqa pylint:disable=line-too-long
self.assertEqual(error_details, [], task)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py
示例9: setUpClass
def setUpClass(cls):
"""Create an OSTree repository with a valid feed and branch."""
super(SyncTestCase, cls).setUpClass()
if selectors.bug_is_untestable(1934, cls.cfg.version):
raise unittest2.SkipTest('https://pulp.plan.io/issues/1934')
body = gen_repo()
body['importer_config']['feed'] = OSTREE_FEED
body['importer_config']['branches'] = [OSTREE_BRANCH]
repo = api.Client(cls.cfg).post(REPOSITORY_PATH, body).json()
cls.resources.add(repo['_href'])
cls.report = utils.sync_repo(cls.cfg, repo['_href'])
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py
示例10: test_01_force_full_false
def test_01_force_full_false(self):
"""Publish the repository and set ``force_full`` to false.
A full publish should occur.
"""
call_report = utils.publish_repo(self.cfg, self.repo, {
'id': self.repo['distributors'][0]['id'],
'override_config': {'force_full': False}
}).json()
last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
task_steps = last_task['result']['details']
step = self.get_step(task_steps, 'rpms')
self.assertGreater(step['num_processed'], 0, step)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:13,代码来源:test_force_full.py
示例11: _sync_repo
def _sync_repo(server_config, href):
"""Sync a repository and wait for the sync to complete.
Verify only the call report's status code. Do not verify each individual
task, as the default response handler does. Return ``call_report, tasks``.
"""
response = api.Client(server_config, api.echo_handler).post(
urljoin(href, 'actions/sync/'),
{'override_config': {}},
)
response.raise_for_status()
tasks = tuple(api.poll_spawned_tasks(server_config, response.json()))
return response, tasks
开发者ID:seandst,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py
示例12: setUpClass
def setUpClass(cls):
"""Create an RPM repository with an invalid feed and sync it."""
super(SyncInvalidFeedTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = _gen_repo()
body['importer_config']['feed'] = utils.uuid4()
repo = client.post(REPOSITORY_PATH, body)
client.response_handler = api.echo_handler
path = urljoin(repo['_href'], 'actions/sync/')
cls.report = client.post(path, {'override_config': {}})
cls.report.raise_for_status()
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
cls.resources.add(repo['_href'])
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py
示例13: test_no_change_in_second_sync
def test_no_change_in_second_sync(self):
"""Verify that syncing a second time has no changes.
If the repository have not changed then Pulp must state that anything
was changed when doing a second sync.
"""
report = utils.sync_repo(self.cfg, self.repo_href)
tasks = tuple(api.poll_spawned_tasks(self.cfg, report.json()))
with self.subTest(comment='spawned tasks'):
self.assertEqual(len(tasks), 1)
for count_type in ('added_count', 'removed_count', 'updated_count'):
with self.subTest(comment=count_type):
self.assertEqual(tasks[0]['result'][count_type], 0)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py
示例14: setUpClass
def setUpClass(cls):
"""Create a puppet repository with an invalid feed and sync it."""
super(SyncInvalidFeedTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config'] = {'feed': 'http://' + utils.uuid4()}
repo = client.post(REPOSITORY_PATH, body)
cls.resources.add(repo['_href'])
# Trigger a repository sync and collect completed tasks.
client.response_handler = api.echo_handler
cls.report = client.post(urljoin(repo['_href'], 'actions/sync/'))
cls.report.raise_for_status()
cls.tasks = list(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:elyezer,项目名称:pulp-smash,代码行数:14,代码来源:test_sync_publish.py
示例15: _create_sync_repo
def _create_sync_repo(server_config, body):
# Create repository.
client = api.Client(server_config, api.json_handler)
repo = client.post(REPOSITORY_PATH, body)
# Sync repository and collect task statuses.
client.response_handler = api.echo_handler
response = client.post(
urljoin(repo['_href'], 'actions/sync/'),
{'override_config': {}},
)
response.raise_for_status()
tasks = tuple(api.poll_spawned_tasks(server_config, response.json()))
return repo['_href'], response, tasks
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:14,代码来源:test_sync_publish.py
示例16: test_02_force_full_omit
def test_02_force_full_omit(self):
"""Publish the repository and omit ``force_full``.
A fast-forward publish should occur. This test targets `Pulp #1966`_.
.. _Pulp #1966: https://pulp.plan.io/issues/1966
"""
if (self.cfg.version >= Version('2.9') and
selectors.bug_is_untestable(1966, self.cfg.version)):
self.skipTest('https://pulp.plan.io/issues/1966')
call_report = self.publish_repo()
last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
task_steps = last_task['result']['details']
step = self.get_step(task_steps, 'rpms')
self.assertEqual(step['num_processed'], 0, step)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:15,代码来源:test_force_full.py
示例17: setUpClass
def setUpClass(cls):
"""Create an RPM repo with a valid feed, sync it, and read the repo."""
super(SyncValidFeedTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body)
client.response_handler = api.echo_handler
path = urljoin(repo['_href'], 'actions/sync/')
cls.report = client.post(path, {'override_config': {}})
cls.report.raise_for_status()
cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
client.response_handler = api.json_handler
cls.repo = client.get(repo['_href'])
cls.resources.add(repo['_href'])
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:15,代码来源:test_sync_publish.py
示例18: verify_sync
def verify_sync(self, cfg, call_report):
"""Verify the call to sync the Python repository succeeded.
Assert that:
* The call report has an HTTP 202 status code.
* None of the tasks spawned by the "sync" request contain errors.
"""
self.assertEqual(call_report.status_code, 202)
tasks = tuple(api.poll_spawned_tasks(cfg, call_report.json()))
for i, task in enumerate(tasks):
step_reports = task['progress_report']['python_importer']
for step in step_reports:
with self.subTest(i=i):
error_details = step['error_details']
self.assertEqual(error_details, [], task)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:16,代码来源:test_sync_publish.py
示例19: setUpClass
def setUpClass(cls):
"""Create an RPM repository, upload errata, and publish the repository.
More specifically, do the following:
1. Create an RPM repository.
2. Add a YUM distributor.
3. Generate a pair of errata. Upload them to Pulp and import them into
the repository.
4. Publish the repository. Fetch the ``updateinfo.xml`` file from the
distributor (via ``repomd.xml``), and parse it.
"""
super(UpdateInfoTestCase, cls).setUpClass()
cls.errata = {
'import_no_pkglist': _gen_errata_no_pkglist(),
'import_typical': _gen_errata_typical(),
}
cls.tasks = {} # {'import_no_pkglist': (…), 'import_typical': (…)}
# Create a repository and add a yum distributor.
client = api.Client(cls.cfg, api.json_handler)
repo = client.post(REPOSITORY_PATH, gen_repo())
cls.resources.add(repo['_href'])
distributor = client.post(
urljoin(repo['_href'], 'distributors/'),
gen_distributor(),
)
# Import errata into our repository. Publish the repository.
for key, erratum in cls.errata.items():
report = utils.upload_import_erratum(
cls.cfg,
erratum,
repo['_href'],
)
cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
client.post(
urljoin(repo['_href'], 'actions/publish/'),
{'id': distributor['id']},
)
# Fetch and parse updateinfo.xml (or updateinfo.xml.gz), via repomd.xml
cls.root_element = get_repomd_xml(
cls.cfg,
urljoin('/pulp/repos/', distributor['config']['relative_url']),
'updateinfo'
)
开发者ID:elyezer,项目名称:pulp-smash,代码行数:47,代码来源:test_updateinfo.py
示例20: test_second_publish_tasks
def test_second_publish_tasks(self):
"""Assert the second publish's last task reports a no-op publish."""
call_report = self.call_reports[1]
last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
if hasattr(self, 'cfg') and self.cfg.version < Version('2.10'):
summary = 'Skipped. Nothing changed since last publish'
else:
summary = (
'Skipped: Repository content has not changed since last '
'publish.'
)
self.assertEqual(
last_task['result']['summary'],
summary,
last_task,
)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:17,代码来源:test_no_op_publish.py
注:本文中的pulp_smash.api.poll_spawned_tasks函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论