• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.gen_distributor函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_smash.tests.rpm.api_v2.utils.gen_distributor函数的典型用法代码示例。如果您正苦于以下问题:Python gen_distributor函数的具体用法?Python gen_distributor怎么用?Python gen_distributor使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了gen_distributor函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: make_repo

    def make_repo(self, cfg, remote):
        """Create a repository with an importer and pair of distributors.

        Create an RPM repository with:

        * A yum importer with a valid feed.
        * A yum distributor.
        * An RPM rsync distributor referencing the yum distributor.

        In addition, schedule the repository for deletion.

        :param pulp_smash.config.ServerConfig cfg: Information about the Pulp
            server being targeted.
        :param remote: A dict for the RPM rsync distributor's ``remote``
            section.
        :returns: The repository's href, as a string.
        """
        api_client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        body['distributors'] = [gen_distributor()]
        body['distributors'].append({
            'distributor_id': utils.uuid4(),
            'distributor_type_id': 'rpm_rsync_distributor',
            'distributor_config': {
                'predistributor_id': body['distributors'][0]['distributor_id'],
                'remote': remote,
            }
        })
        repo_href = api_client.post(REPOSITORY_PATH, body)['_href']
        self.addCleanup(api_client.delete, repo_href)
        return repo_href
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:32,代码来源:test_rsync_distributor.py


示例2: setUpClass

    def setUpClass(cls):
        """Upload an erratum to a repo, publish, and download the erratum.

        Do the following:

        1. Create an RPM repository with a distributor.
        2. Upload an erratum to the repository.
        3. Publish the repository.
        4. Fetch the repository's ``updateinfo.xml`` file.
        """
        super(UploadErratumTestCase, cls).setUpClass()
        cls.erratum = gen_erratum()

        # Create an RPM repository with a feed and distributor.
        client = api.Client(cls.cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        body['distributors'] = [gen_distributor()]
        repo = client.post(REPOSITORY_PATH, body)
        cls.resources.add(repo['_href'])

        # Sync content into the repository, and give it an erratum.
        utils.sync_repo(cls.cfg, repo['_href'])
        utils.upload_import_erratum(cls.cfg, cls.erratum, repo['_href'])
        repo = client.get(repo['_href'], params={'details': True})

        # Publish the repository, and fetch and parse updateinfo.xml
        distributor = repo['distributors'][0]
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )
        path = urljoin('/pulp/repos/', distributor['config']['relative_url'])
        cls.updateinfo = get_repomd_xml(cls.cfg, path, 'updateinfo')
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:34,代码来源:test_upload_publish.py


示例3: make_repo

    def make_repo(self, cfg, dist_cfg_updates):
        """Create a repository with an importer and pair of distributors.

        Create an RPM repository with:

        * A yum importer with a valid feed.
        * A yum distributor.
        * An RPM rsync distributor referencing the yum distributor.

        In addition, schedule the repository for deletion.

        :param pulp_smash.config.ServerConfig cfg: Information about the Pulp
            server being targeted.
        :param dist_cfg_updates: A dict to be merged into the RPM rsync
            distributor's ``distributor_config`` dict. At a minimum, this
            argument should have a value of ``{'remote': {…}}``.
        :returns: A detailed dict of information about the repo.
        """
        api_client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
        body['distributors'] = [gen_distributor()]
        body['distributors'].append({
            'distributor_id': utils.uuid4(),
            'distributor_type_id': 'rpm_rsync_distributor',
            'distributor_config': {
                'predistributor_id': body['distributors'][0]['distributor_id'],
            }
        })
        body['distributors'][1]['distributor_config'].update(dist_cfg_updates)
        repo = api_client.post(REPOSITORY_PATH, body)
        self.addCleanup(api_client.delete, repo['_href'])
        return api_client.get(repo['_href'], params={'details': True})
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:33,代码来源:test_rsync_distributor.py


示例4: setUpClass

    def setUpClass(cls):
        """Generate, fetch and parse a ``repomd.xml`` file.

        Do the following:

        1. Create an RPM repository, add a YUM distributor, and publish the
           repository.
        2. Fetch the ``repomd.xml`` file from the distributor, and parse it.
        """
        super(RepoMDTestCase, cls).setUpClass()

        # Create a repository. Add a yum distributor and publish it.
        client = api.Client(cls.cfg, api.json_handler)
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])
        distributor = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor(),
        )
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )

        # Fetch and parse repomd.xml
        client.response_handler = xml_handler
        path = urljoin('/pulp/repos/', distributor['config']['relative_url'])
        path = urljoin(path, 'repodata/repomd.xml')
        cls.root_element = client.get(path)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:29,代码来源:test_repomd.py


示例5: test_publish_override_config

    def test_publish_override_config(self):
        """Use the ``packages_directory`` publish override option.

        Create a distributor with default options, and use it to publish the
        repository. Specify the ``packages_directory`` option during the
        publish as an override option. Verify packages end up in the specified
        directory, relative to the published repository's root.
        """
        if selectors.bug_is_untestable(1976, self.cfg.version):
            self.skipTest('https://pulp.plan.io/issues/1976')
        client = api.Client(self.cfg, api.json_handler)
        distributor = client.post(
            urljoin(self.repo_href, 'distributors/'),
            gen_distributor(),
        )
        packages_dir = utils.uuid4()
        client.post(urljoin(self.repo_href, 'actions/publish/'), {
            'id': distributor['id'],
            'override_config': {'packages_directory': packages_dir},
        })
        primary_xml = get_parse_repodata_primary_xml(self.cfg, distributor)
        package_hrefs = get_package_hrefs(primary_xml)
        self.assertGreater(len(package_hrefs), 0)
        for package_href in package_hrefs:
            with self.subTest(package_href=package_href):
                self.assertEqual(os.path.dirname(package_href), packages_dir)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:26,代码来源:test_packages_directory.py


示例6: setUpClass

    def setUpClass(cls):
        """Create an RPM repository, upload package groups, and publish."""
        super(UploadPackageGroupsTestCase, cls).setUpClass()

        # Create a repository and add a distributor to it.
        client = api.Client(cls.cfg, api.json_handler)
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])
        distributor = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor(),
        )

        # Generate several package groups, import them into the repository, and
        # publish the repository.
        cls.package_groups = {
            'minimal': _gen_minimal_group(),
            'realistic': _gen_realistic_group(),
        }
        cls.tasks = {}
        for key, package_group in cls.package_groups.items():
            report = _upload_import_package_group(cls.cfg, repo, package_group)
            cls.tasks[key] = tuple(api.poll_spawned_tasks(cls.cfg, report))
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )

        # Fetch the generated repodata of type 'group' (a.k.a. 'comps')
        cls.root_element = get_repomd_xml(
            cls.cfg,
            urljoin('/pulp/repos/', distributor['config']['relative_url']),
            'group'
        )
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:34,代码来源:test_comps_xml.py


示例7: setUpClass

    def setUpClass(cls):
        """Create a schedule to publish a repo, verify the ``total_run_count``.

        Do the following:

        1. Create a repository with a valid feed
        2. Sync it
        3. Schedule publish to run every 2 minutes
        4. Wait for 130 seconds and read the schedule to get the number of
           "publish" runs
        """
        super(ScheduledPublishTestCase, cls).setUpClass()
        client = api.Client(cls.cfg, api.json_handler)

        # Create a repo with a valid feed and sync it
        body = gen_repo()
        body["importer_config"]["feed"] = RPM_FEED_URL
        repo = client.post(REPOSITORY_PATH, body)
        cls.resources.add(repo["_href"])
        utils.sync_repo(cls.cfg, repo["_href"])

        # Schedule a publish to run every 2 minutes
        distributor = gen_distributor()
        client.post(urljoin(repo["_href"], "distributors/"), distributor)
        scheduling_url = "/".join(["distributors", distributor["distributor_id"], "schedules/publish/"])
        schedule_path = urljoin(repo["_href"], scheduling_url)
        schedule = client.post(schedule_path, {"schedule": "PT2M"})

        # Wait for publish to run
        time.sleep(130)

        # Read the schedule
        cls.response = client.get(schedule["_href"])
开发者ID:seandst,项目名称:pulp-smash,代码行数:33,代码来源:test_schedule_publish.py


示例8: setUpClass

    def setUpClass(cls):
        """Generate, fetch and parse a ``repomd.xml`` file.

        Do the following:

        1. Create an RPM repository with a YUM distributor and publish it.
        2. Fetch the ``repomd.xml`` file from the distributor, and parse it.
        """
        super(RepoMDTestCase, cls).setUpClass()
        if check_issue_2277(cls.cfg):
            raise unittest.SkipTest('https://pulp.plan.io/issues/2277')

        # Create a repository with a yum distributor and publish it.
        client = api.Client(cls.cfg, api.json_handler)
        body = gen_repo()
        body['distributors'] = [gen_distributor()]
        repo = client.post(REPOSITORY_PATH, body)
        repo = client.get(repo['_href'], params={'details': True})
        cls.resources.add(repo['_href'])
        utils.publish_repo(cls.cfg, repo)

        # Fetch and parse repomd.xml
        client.response_handler = xml_handler
        path = urljoin(
            '/pulp/repos/',
            repo['distributors'][0]['config']['relative_url'],
        )
        path = urljoin(path, 'repodata/repomd.xml')
        cls.root_element = client.get(path)
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:29,代码来源:test_repomd.py


示例9: setUpClass

    def setUpClass(cls):
        """Publish a yum repo containing some updates."""
        super(UpdateInfoTestCase, cls).setUpClass()

        cls.tasks = {}
        cls.errata = {}

        client = api.Client(cls.cfg, api.json_handler)

        # Create a repository for use by the test.
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])

        # add yum distributor to the repo
        distribute = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor())

        # import some errata
        cls.import_updates(client, repo)

        # ask for it to be published
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distribute['id']})

        repo_url = urljoin('/pulp/repos/',
                           distribute['config']['relative_url'])
        cls.updateinfo_tree = cls.get_repodata_xml(repo_url, 'updateinfo')
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:29,代码来源:test_updateinfo.py


示例10: health_check

    def health_check(self):
        """Execute step three of the test plan."""
        client = api.Client(self.cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        repo = client.post(REPOSITORY_PATH, body)
        self.addCleanup(api.Client(self.cfg).delete, repo['_href'])
        client.post(
            urljoin(repo['_href'], 'actions/sync/'),
            {'override_config': {}},
        )
        distributor = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor(),
        )
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )
        client.response_handler = api.safe_handler
        url = urljoin('/pulp/repos/', distributor['config']['relative_url'])
        url = urljoin(url, RPM)
        pulp_rpm = client.get(url).content

        # Does this RPM match the original RPM?
        rpm = client.get(urljoin(RPM_FEED_URL, RPM)).content
        self.assertEqual(rpm, pulp_rpm)
开发者ID:shubham90,项目名称:pulp-smash,代码行数:27,代码来源:test_broker.py


示例11: setUpClass

    def setUpClass(cls):
        """Creates, publishes repo and fetches repomd.xml."""
        super(RepoMDTestCase, cls).setUpClass()

        cls.tasks = {}
        cls.errata = {}

        client = api.Client(cls.cfg, api.json_handler)

        # Create a repository for use by the test.
        repo = client.post(REPOSITORY_PATH, gen_repo())
        cls.resources.add(repo['_href'])

        # add yum distributor to the repo
        distribute = client.post(
            urljoin(repo['_href'], 'distributors/'),
            gen_distributor())

        # ask for it to be published
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distribute['id']})

        # fetch the repomd.xml
        repo_url = urljoin('/pulp/repos/',
                           distribute['config']['relative_url'])
        repomd_url = urljoin(repo_url, 'repodata/repomd.xml')

        client = api.Client(cls.cfg, xml_handler)
        cls.repomd_tree = client.get(repomd_url)
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:30,代码来源:test_repomd.py


示例12: setUpClass

    def setUpClass(cls):
        """Create a schedule to publish the repository.

        Do the following:

        1. Create a repository with a valid feed
        2. Sync it
        3. Schedule publish to run every 30 seconds
        """
        super(CreateSuccessTestCase, cls).setUpClass()
        client = api.Client(cls.cfg)

        # Create a repo with a valid feed and sync it
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        repo = client.post(REPOSITORY_PATH, body).json()
        cls.resources.add(repo['_href'])
        utils.sync_repo(cls.cfg, repo['_href'])

        # Schedule a publish to run every 30 seconds
        distributor = gen_distributor()
        distributor_url = urljoin(repo['_href'], 'distributors/')
        client.post(
            distributor_url,
            distributor
        )
        scheduling_url = urljoin(
            distributor_url,
            '{}/schedules/publish/'.format(distributor['distributor_id']),
        )
        cls.response = client.post(
            scheduling_url,
            {'schedule': 'PT30S'}
        )
        cls.attrs = cls.response.json()
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:35,代码来源:test_schedule_publish.py


示例13: setUpClass

    def setUpClass(cls):
        """Bind a consumer to a distributor.

        Do the following:

        1. Add a consumer.
        2. Add a repository.
        3. Add a distributor to the repository.
        4. Bind the consumer to the distributor.
        """
        super(BindConsumerTestCase, cls).setUpClass()

        # Steps 1–3
        client = api.Client(cls.cfg, api.json_handler)
        cls.consumer = client.post(CONSUMER_PATH, {'id': utils.uuid4()})
        repository = client.post(REPOSITORY_PATH, gen_repo())
        distributor = client.post(
            urljoin(repository['_href'], 'distributors/'),
            gen_distributor()
        )
        cls.resources.add(repository['_href'])

        # Step 4
        client.response_handler = api.safe_handler
        path = urljoin(CONSUMER_PATH, cls.consumer['consumer']['id'] + '/')
        path = urljoin(path, 'bindings/')
        cls.request = {
            'binding_config': {'B': 21},
            'distributor_id': distributor['id'],
            'notify_agent': False,
            'repo_id': distributor['repo_id'],
        }
        cls.response = client.post(path, cls.request)
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:33,代码来源:test_consumer.py


示例14: _create_distributor

def _create_distributor(
        server_config, href, distributor_type_id, checksum_type=None):
    """Create an export distributor for the entity at ``href``."""
    path = urljoin(href, 'distributors/')
    body = gen_distributor()
    body['distributor_type_id'] = distributor_type_id
    if checksum_type is not None:
        body['distributor_config']['checksum_type'] = checksum_type
    return api.Client(server_config).post(path, body).json()
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:9,代码来源:test_export.py


示例15: setUpClass

    def setUpClass(cls):  # pylint:disable=arguments-differ
        """Create two repositories, first is feed of second one.

        Provides server config and set of iterable to delete. Following steps
        are executed:

        1. Create repository foo with feed, sync and publish it.
        2. Create repository bar with foo as a feed with
           ``retain_old_count=0``.
        3. Run sync of repo foo.
        4. Get information on both repositories.
        """
        super(RetainOldCountTestCase, cls).setUpClass()
        client = api.Client(cls.cfg)
        cls.responses = {}
        hrefs = []  # repository hrefs

        # Create and sync the first repository.
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
        cls.responses['first sync'] = client.post(
            urljoin(hrefs[0], 'actions/sync/'),
            {'override_config': {}}
        )

        # Add distributor and publish
        cls.responses['distribute'] = client.post(
            urljoin(hrefs[0], 'distributors/'),
            gen_distributor(),
        )
        cls.responses['publish'] = client.post(
            urljoin(hrefs[0], 'actions/publish/'),
            {'id': cls.responses['distribute'].json()['id']},
        )

        # Create and sync the second repository. Ensure it fetches content from
        # the first, and that the `retain_old_count` option is set correctly.
        # We disable SSL validation for a practical reason: each HTTPS feed
        # must have a certificate to work, which is burdensome to do here.
        body = gen_repo()
        body['importer_config']['feed'] = urljoin(
            cls.cfg.base_url,
            _PUBLISH_DIR +
            cls.responses['distribute'].json()['config']['relative_url'],
        )
        body['importer_config']['retain_old_count'] = 0  # see docstring
        body['importer_config']['ssl_validation'] = False
        hrefs.append(client.post(REPOSITORY_PATH, body).json()['_href'])
        cls.responses['second sync'] = client.post(
            urljoin(hrefs[1], 'actions/sync/'),
            {'override_config': {}}
        )

        # Read the repositories and mark them for deletion.
        cls.repos = [client.get(href).json() for href in hrefs]
        cls.resources.update(set(hrefs))
开发者ID:rohanpm,项目名称:pulp-smash,代码行数:57,代码来源:test_retain_old_count.py


示例16: test_all

    def test_all(self):
        """Sync a repo whose updateinfo file has multiple pkglist sections.

        Do the following:

        1. Create and sync a repository with an importer and distributor.
           Ensure the importer's feed is set to
           :data:`pulp_smash.constants.RPM_PKGLISTS_UPDATEINFO_FEED_URL`.
        2. Publish the repository, and fetch and parse its updateinfo file.
        3. Verify the updateinfo contains the correct number of ``<pkglists>``
           sections, with the correct contents in each.
        """
        cfg = config.get_config()
        if selectors.bug_is_untestable(2227, cfg.version):
            self.skipTest('https://pulp.plan.io/issues/2277')

        # Create and sync a repository.
        client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_PKGLISTS_UPDATEINFO_FEED_URL
        body['distributors'] = [gen_distributor()]
        repo = client.post(REPOSITORY_PATH, body)
        self.addCleanup(client.delete, repo['_href'])
        utils.sync_repo(cfg, repo['_href'])

        # Publish the repository, and fetch and parse its updateinfo file.
        repo = client.get(repo['_href'], params={'details': True})
        self.assertEqual(len(repo['distributors']), 1, repo['distributors'])
        distributor = repo['distributors'][0]
        client.post(
            urljoin(repo['_href'], 'actions/publish/'),
            {'id': distributor['id']},
        )
        root_element = get_repomd_xml(
            cfg,
            urljoin('/pulp/repos/', distributor['config']['relative_url']),
            'updateinfo'
        )

        # Verify the contents of the updateinfo file.
        debug = ElementTree.tostring(root_element)
        pkglists = root_element.find('update').findall('pkglist')
        self.assertEqual(len(pkglists), 3, debug)

        collections = [pkglist.find('collection') for pkglist in pkglists]
        names = {collection.find('name').text for collection in collections}
        self.assertEqual(names, {'1', '2', '3'}, debug)

        packages = {
            collection.find('package').find('filename').text
            for collection in collections
        }
        self.assertEqual(packages, {
            'penguin-0.9.1-1.noarch.rpm',
            'shark-0.1-1.noarch.rpm',
            'walrus-5.21-1.noarch.rpm',
        }, debug)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:57,代码来源:test_updateinfo.py


示例17: setUpClass

 def setUpClass(cls):
     """Create and sync a repository."""
     super(ForceFullTestCase, cls).setUpClass()
     client = api.Client(cls.cfg, api.json_handler)
     body = gen_repo()
     body['importer_config']['feed'] = RPM_FEED_URL
     body['distributors'] = [gen_distributor()]
     repo = client.post(REPOSITORY_PATH, body)
     cls.resources.add(repo['_href'])
     utils.sync_repo(cls.cfg, repo['_href'])
     cls.repo = client.get(repo['_href'], params={'details': True})
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:11,代码来源:test_force_full.py


示例18: _create_sync_repo

    def _create_sync_repo(self, cfg):
        """Create and sync a repository. Return a detailed dict of repo info.

        Also, schedule the repository for deletion with ``addCleanup()``.
        """
        client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_UNSIGNED_FEED_URL
        body['distributors'] = [gen_distributor()]
        repo = client.post(REPOSITORY_PATH, body)
        self.addCleanup(client.delete, repo['_href'])
        utils.sync_repo(cfg, repo['_href'])
        repo = client.get(repo['_href'], params={'details': True})
        return client.get(repo['_href'], params={'details': True})
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:14,代码来源:test_repomd.py


示例19: _create_repo

def _create_repo(server_config, download_policy):
    """Create an RPM repository with the given download policy.

    The repository has a valid feed and is configured to auto-publish. Return
    the JSON-decoded response body.
    """
    body = gen_repo()
    body['importer_config']['download_policy'] = download_policy
    body['importer_config']['feed'] = RPM_SIGNED_FEED_URL
    distributor = gen_distributor()
    distributor['auto_publish'] = True
    distributor['distributor_config']['relative_url'] = body['id']
    body['distributors'] = [distributor]
    return api.Client(server_config).post(REPOSITORY_PATH, body).json()
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:14,代码来源:test_download_policies.py


示例20: test_all

    def test_all(self):
        """Publish a repository with the repoview feature on and off."""
        cfg = config.get_config()
        if cfg.version < Version('2.9'):
            self.skipTest('https://pulp.plan.io/issues/189')

        # Create a repo, and add content
        client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['distributors'] = [gen_distributor()]
        repo_href = client.post(constants.REPOSITORY_PATH, body)['_href']
        self.addCleanup(client.delete, repo_href)
        rpm = utils.http_get(constants.RPM_UNSIGNED_URL)
        utils.upload_import_unit(cfg, rpm, 'rpm', repo_href)

        # Gather some facts about the repo distributor
        dist = client.get(urljoin(repo_href, 'distributors/'))[0]
        dist_url = urljoin('/pulp/repos/', dist['config']['relative_url'])

        # Publish the repo
        client.response_handler = api.safe_handler
        client.post(urljoin(repo_href, 'actions/publish/'), {'id': dist['id']})
        response = client.get(dist_url)
        with self.subTest(comment='first publish'):
            self.assertEqual(len(response.history), 0, response.history)

        # Publish the repo a second time
        client.post(
            urljoin(repo_href, 'actions/publish/'),
            {'id': dist['id'], 'override_config': {
                'generate_sqlite': True,
                'repoview': True,
            }},
        )
        response = client.get(dist_url)
        with self.subTest(comment='second publish'):
            self.assertEqual(len(response.history), 1, response.history)
            self.assertEqual(
                response.request.url,
                urljoin(response.history[0].request.url, 'repoview/index.html')
            )

        # Publish the repo a third time
        if selectors.bug_is_untestable(2349, cfg.version):
            self.skipTest('https://pulp.plan.io/issues/2349')
        client.post(urljoin(repo_href, 'actions/publish/'), {'id': dist['id']})
        response = client.get(dist_url)
        with self.subTest(comment='third publish'):
            self.assertEqual(len(response.history), 0, response.history)
开发者ID:release-engineering,项目名称:pulp-smash,代码行数:49,代码来源:test_repoview.py



注:本文中的pulp_smash.tests.rpm.api_v2.utils.gen_distributor函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.gen_repo函数代码示例发布时间:2022-05-25
下一篇:
Python selectors.bug_is_untestable函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap