• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python api.poll_spawned_tasks函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_smash.api.poll_spawned_tasks函数的典型用法代码示例。如果您正苦于以下问题:Python poll_spawned_tasks函数的具体用法?Python poll_spawned_tasks怎么用?Python poll_spawned_tasks使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了poll_spawned_tasks函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: publish

def publish(cfg, publisher, repo, version_href=None):
    """Publish a repository.

    :param pulp_smash.config.PulpSmashConfig cfg: Information about the Pulp
        host.
    :param publisher: A dict of information about the publisher of the
        repository to be published.
    :param repo: A dict of information about the repository.
    :param version_href: The repository version to be published.
    :returns: A publication. A dict of information about the just created
        publication.
    """
    if version_href is None:
        body = {'repository': repo['_href']}
    else:
        body = {'repository_version': version_href}
    client = api.Client(cfg, api.json_handler)
    call_report = client.post(urljoin(publisher['_href'], 'publish/'), body)
    # As of this writing, Pulp 3 only returns one task. If Pulp 3 starts
    # returning multiple tasks, this may need to be re-written.
    tasks = tuple(api.poll_spawned_tasks(cfg, call_report))
    if len(tasks) != 1:
        message = (
            'Multiple tasks were spawned in response to API call. This is '
            'unexpected, and Pulp Smash may handle the response incorrectly. '
            'Here is the tasks generated: {}'
        )
        message = message.format(tasks)
        warnings.warn(message, RuntimeWarning)
    return client.get(tasks[-1]['created_resources'][0])
开发者ID:dralley,项目名称:pulp-smash,代码行数:30,代码来源:utils.py


示例2: setUpClass

    def setUpClass(cls):
        """Create and sync two puppet repositories."""
        super(SyncValidFeedTestCase, cls).setUpClass()
        utils.reset_pulp(cls.cfg)  # See: https://pulp.plan.io/issues/1406
        bodies = tuple((_gen_repo() for _ in range(2)))
        for i, query in enumerate((
                _PUPPET_QUERY, _PUPPET_QUERY.replace('-', '_'))):
            bodies[i]['importer_config'] = {
                'feed': _PUPPET_FEED,
                'queries': [query],
            }
        client = api.Client(cls.cfg, api.json_handler)
        repos = [client.post(REPOSITORY_PATH, body) for body in bodies]
        cls.resources.update({repo['_href'] for repo in repos})

        # Trigger repository sync and collect completed tasks.
        cls.reports = []  # raw responses to "start syncing" commands
        cls.tasks = []  # completed tasks
        client.response_handler = api.echo_handler
        for repo in repos:
            report = client.post(urljoin(repo['_href'], 'actions/sync/'))
            report.raise_for_status()
            cls.reports.append(report)
            for task in api.poll_spawned_tasks(cls.cfg, report.json()):
                cls.tasks.append(task)
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:25,代码来源:test_sync_publish.py


示例3: setUpClass

    def setUpClass(cls):
        """Create an RPM repository, upload package groups, and publish."""
        super(UploadPackageGroupsTestCase, cls).setUpClass()

        # Create a repository and add a distributor to it.
        client = api.Client(cls.cfg, api.json_handler)
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])
        distributor = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor(),
        )

        # Generate several package groups, import them into the repository, and
        # publish the repository.
        cls.package_groups = {
            'minimal': _gen_minimal_group(),
            'realistic': _gen_realistic_group(),
        }
        cls.tasks = {}
        for key, package_group in cls.package_groups.items():
            report = _upload_import_package_group(cls.cfg, repo, package_group)
            cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )

        # Fetch the generated repodata of type 'group' (a.k.a. 'comps')
        cls.root_element = get_repomd_xml(
            cls.cfg,
            urljoin('/pulp/repos/', distributor['config']['relative_url']),
            'group'
        )
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_comps_xml.py


示例4: setUpClass

    def setUpClass(cls):
        """Create an RPM repository with a valid feed and sync it.

        Do the following:

        1. Reset Pulp, including the Squid cache.
        2. Create a repository with the "background" download policy.
        3. Sync and publish the repository.
        4. Download an RPM from the repository.
        """
        super(BackgroundTestCase, cls).setUpClass()
        if (selectors.bug_is_untestable(1905, cls.cfg.version) and
                _os_is_rhel6(cls.cfg)):
            raise unittest.SkipTest('https://pulp.plan.io/issues/1905')

        # Required to ensure content is actually downloaded.
        utils.reset_squid(cls.cfg)
        utils.reset_pulp(cls.cfg)

        # Create, sync and publish a repository.
        repo = _create_repo(cls.cfg, 'background')
        cls.resources.add(repo['_href'])
        report = utils.sync_repo(cls.cfg, repo['_href']).json()

        # Record the tasks spawned when syncing the repository, and the state
        # of the repository itself after the sync.
        client = api.Client(cls.cfg)
        cls.repo = client.get(repo['_href'], params={'details': True}).json()
        cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, report))

        # Download an RPM.
        path = urljoin('/pulp/repos/', repo['id'] + '/')
        path = urljoin(path, RPM)
        cls.rpm = client.get(path)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_download_policies.py


示例5: setUpClass

 def setUpClass(cls):
     """Create an OSTree repository with a valid feed and branch."""
     super(SyncTestCase, cls).setUpClass()
     body = gen_repo()
     body['importer_config']['feed'] = OSTREE_FEED
     body['importer_config']['branches'] = [OSTREE_BRANCH]
     repo = api.Client(cls.cfg).post(REPOSITORY_PATH, body).json()
     cls.resources.add(repo['_href'])
     cls.report = utils.sync_repo(cls.cfg, repo['_href'])
     cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:seandst,项目名称:pulp-smash,代码行数:10,代码来源:test_sync_publish.py


示例6: test_01_force_full_false

    def test_01_force_full_false(self):
        """Publish the repository and set ``force_full`` to false.

        A full publish should occur.
        """
        call_report = self.publish_repo(force_full=False)
        last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
        task_steps = last_task['result']['details']
        step = self.get_step(task_steps, 'rpms')
        self.assertGreater(step['num_processed'], 0, step)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:10,代码来源:test_force_full.py


示例7: setUpClass

 def setUpClass(cls):
     """Create an RPM repository with an invalid feed and sync it."""
     super(SyncInvalidFeedTestCase, cls).setUpClass()
     client = api.Client(cls.cfg, api.json_handler)
     body = gen_repo()
     body["importer_config"]["feed"] = utils.uuid4()
     repo = client.post(REPOSITORY_PATH, body)
     client.response_handler = api.echo_handler
     cls.report = client.post(urljoin(repo["_href"], "actions/sync/"))
     cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
     cls.resources.add(repo["_href"])
开发者ID:pcreech,项目名称:pulp-smash,代码行数:11,代码来源:test_sync_publish.py


示例8: test_task_progress_report

    def test_task_progress_report(self):
        """Assert no task's progress report contains error details.

        Other assertions about the final state of each task are handled by the
        client's response handler. (For more information, see the source of
        :func:`pulp_smash.api.safe_handler`.)
        """
        tasks = tuple(api.poll_spawned_tasks(self.cfg, self.report.json()))
        for i, task in enumerate(tasks):
            with self.subTest(i=i):
                error_details = task['progress_report']['yum_importer']['content']['error_details']  # noqa pylint:disable=line-too-long
                self.assertEqual(error_details, [], task)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py


示例9: setUpClass

 def setUpClass(cls):
     """Create an OSTree repository with a valid feed and branch."""
     super(SyncTestCase, cls).setUpClass()
     if selectors.bug_is_untestable(1934, cls.cfg.version):
         raise unittest2.SkipTest('https://pulp.plan.io/issues/1934')
     body = gen_repo()
     body['importer_config']['feed'] = OSTREE_FEED
     body['importer_config']['branches'] = [OSTREE_BRANCH]
     repo = api.Client(cls.cfg).post(REPOSITORY_PATH, body).json()
     cls.resources.add(repo['_href'])
     cls.report = utils.sync_repo(cls.cfg, repo['_href'])
     cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py


示例10: test_01_force_full_false

    def test_01_force_full_false(self):
        """Publish the repository and set ``force_full`` to false.

        A full publish should occur.
        """
        call_report = utils.publish_repo(self.cfg, self.repo, {
            'id': self.repo['distributors'][0]['id'],
            'override_config': {'force_full': False}
        }).json()
        last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
        task_steps = last_task['result']['details']
        step = self.get_step(task_steps, 'rpms')
        self.assertGreater(step['num_processed'], 0, step)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:13,代码来源:test_force_full.py


示例11: _sync_repo

def _sync_repo(server_config, href):
    """Sync a repository and wait for the sync to complete.

    Verify only the call report's status code. Do not verify each individual
    task, as the default response handler does. Return ``call_report, tasks``.
    """
    response = api.Client(server_config, api.echo_handler).post(
        urljoin(href, 'actions/sync/'),
        {'override_config': {}},
    )
    response.raise_for_status()
    tasks = tuple(api.poll_spawned_tasks(server_config, response.json()))
    return response, tasks
开发者ID:seandst,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py


示例12: setUpClass

 def setUpClass(cls):
     """Create an RPM repository with an invalid feed and sync it."""
     super(SyncInvalidFeedTestCase, cls).setUpClass()
     client = api.Client(cls.cfg, api.json_handler)
     body = _gen_repo()
     body['importer_config']['feed'] = utils.uuid4()
     repo = client.post(REPOSITORY_PATH, body)
     client.response_handler = api.echo_handler
     path = urljoin(repo['_href'], 'actions/sync/')
     cls.report = client.post(path, {'override_config': {}})
     cls.report.raise_for_status()
     cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
     cls.resources.add(repo['_href'])
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py


示例13: test_no_change_in_second_sync

    def test_no_change_in_second_sync(self):
        """Verify that syncing a second time has no changes.

        If the repository have not changed then Pulp must state that anything
        was changed when doing a second sync.
        """
        report = utils.sync_repo(self.cfg, self.repo_href)
        tasks = tuple(api.poll_spawned_tasks(self.cfg, report.json()))
        with self.subTest(comment='spawned tasks'):
            self.assertEqual(len(tasks), 1)
        for count_type in ('added_count', 'removed_count', 'updated_count'):
            with self.subTest(comment=count_type):
                self.assertEqual(tasks[0]['result'][count_type], 0)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:13,代码来源:test_sync_publish.py


示例14: setUpClass

    def setUpClass(cls):
        """Create a puppet repository with an invalid feed and sync it."""
        super(SyncInvalidFeedTestCase, cls).setUpClass()
        client = api.Client(cls.cfg, api.json_handler)
        body = gen_repo()
        body['importer_config'] = {'feed': 'http://' + utils.uuid4()}
        repo = client.post(REPOSITORY_PATH, body)
        cls.resources.add(repo['_href'])

        # Trigger a repository sync and collect completed tasks.
        client.response_handler = api.echo_handler
        cls.report = client.post(urljoin(repo['_href'], 'actions/sync/'))
        cls.report.raise_for_status()
        cls.tasks = list(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
开发者ID:elyezer,项目名称:pulp-smash,代码行数:14,代码来源:test_sync_publish.py


示例15: _create_sync_repo

def _create_sync_repo(server_config, body):
    # Create repository.
    client = api.Client(server_config, api.json_handler)
    repo = client.post(REPOSITORY_PATH, body)

    # Sync repository and collect task statuses.
    client.response_handler = api.echo_handler
    response = client.post(
        urljoin(repo['_href'], 'actions/sync/'),
        {'override_config': {}},
    )
    response.raise_for_status()
    tasks = tuple(api.poll_spawned_tasks(server_config, response.json()))
    return repo['_href'], response, tasks
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:14,代码来源:test_sync_publish.py


示例16: test_02_force_full_omit

    def test_02_force_full_omit(self):
        """Publish the repository and omit ``force_full``.

        A fast-forward publish should occur. This test targets `Pulp #1966`_.

        .. _Pulp #1966: https://pulp.plan.io/issues/1966
        """
        if (self.cfg.version >= Version('2.9') and
                selectors.bug_is_untestable(1966, self.cfg.version)):
            self.skipTest('https://pulp.plan.io/issues/1966')
        call_report = self.publish_repo()
        last_task = next(api.poll_spawned_tasks(self.cfg, call_report))
        task_steps = last_task['result']['details']
        step = self.get_step(task_steps, 'rpms')
        self.assertEqual(step['num_processed'], 0, step)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:15,代码来源:test_force_full.py


示例17: setUpClass

 def setUpClass(cls):
     """Create an RPM repo with a valid feed, sync it, and read the repo."""
     super(SyncValidFeedTestCase, cls).setUpClass()
     client = api.Client(cls.cfg, api.json_handler)
     body = gen_repo()
     body['importer_config']['feed'] = RPM_FEED_URL
     repo = client.post(REPOSITORY_PATH, body)
     client.response_handler = api.echo_handler
     path = urljoin(repo['_href'], 'actions/sync/')
     cls.report = client.post(path, {'override_config': {}})
     cls.report.raise_for_status()
     cls.tasks = tuple(api.poll_spawned_tasks(cls.cfg, cls.report.json()))
     client.response_handler = api.json_handler
     cls.repo = client.get(repo['_href'])
     cls.resources.add(repo['_href'])
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:15,代码来源:test_sync_publish.py


示例18: verify_sync

    def verify_sync(self, cfg, call_report):
        """Verify the call to sync the Python repository succeeded.

        Assert that:

        * The call report has an HTTP 202 status code.
        * None of the tasks spawned by the "sync" request contain errors.
        """
        self.assertEqual(call_report.status_code, 202)
        tasks = tuple(api.poll_spawned_tasks(cfg, call_report.json()))
        for i, task in enumerate(tasks):
            step_reports = task['progress_report']['python_importer']
            for step in step_reports:
                with self.subTest(i=i):
                    error_details = step['error_details']
                    self.assertEqual(error_details, [], task)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:16,代码来源:test_sync_publish.py


示例19: setUpClass

    def setUpClass(cls):
        """Create an RPM repository, upload errata, and publish the repository.

        More specifically, do the following:

        1. Create an RPM repository.
        2. Add a YUM distributor.
        3. Generate a pair of errata. Upload them to Pulp and import them into
           the repository.
        4. Publish the repository. Fetch the ``updateinfo.xml`` file from the
           distributor (via ``repomd.xml``), and parse it.
        """
        super(UpdateInfoTestCase, cls).setUpClass()
        cls.errata = {
            'import_no_pkglist': _gen_errata_no_pkglist(),
            'import_typical': _gen_errata_typical(),
        }
        cls.tasks = {}  # {'import_no_pkglist': (…), 'import_typical': (…)}

        # Create a repository and add a yum distributor.
        client = api.Client(cls.cfg, api.json_handler)
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])
        distributor = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor(),
        )

        # Import errata into our repository. Publish the repository.
        for key, erratum in cls.errata.items():
            report = utils.upload_import_erratum(
                cls.cfg,
                erratum,
                repo['_href'],
            )
            cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )

        # Fetch and parse updateinfo.xml (or updateinfo.xml.gz), via repomd.xml
        cls.root_element = get_repomd_xml(
            cls.cfg,
            urljoin('/pulp/repos/', distributor['config']['relative_url']),
            'updateinfo'
        )
开发者ID:elyezer,项目名称:pulp-smash,代码行数:47,代码来源:test_updateinfo.py


示例20: test_second_publish_tasks

    def test_second_publish_tasks(self):
        """Assert the second publish's last task reports a no-op publish."""
        call_report = self.call_reports[1]
        last_task = next(api.poll_spawned_tasks(self.cfg, call_report))

        if hasattr(self, 'cfg') and self.cfg.version < Version('2.10'):
            summary = 'Skipped. Nothing changed since last publish'
        else:
            summary = (
                'Skipped: Repository content has not changed since last '
                'publish.'
            )
        self.assertEqual(
            last_task['result']['summary'],
            summary,
            last_task,
        )
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:17,代码来源:test_no_op_publish.py



注:本文中的pulp_smash.api.poll_spawned_tasks函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python compat.urljoin函数代码示例发布时间:2022-05-25
下一篇:
Python util.getLogger函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap