• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.uuid4函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_smash.utils.uuid4函数的典型用法代码示例。如果您正苦于以下问题:Python uuid4函数的具体用法?Python uuid4怎么用?Python uuid4使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了uuid4函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setUpClass

    def setUpClass(cls):
        """Intentionally fail at creating several sync schedules for a repo.

        Each schedule tests a different failure scenario.
        """
        super(CreateFailureTestCase, cls).setUpClass()
        href, importer_type_id = cls.create_repo()

        # We'll need these below.
        scheduling_path = _SCHEDULE_PATH.format(importer_type_id)
        scheduling_path_bad = _SCHEDULE_PATH.format(utils.uuid4())
        bad_repo_path = "{}/{}/".format(REPOSITORY_PATH, utils.uuid4())

        # Use client to get paths with bodies. Save responses and status_codes.
        client = api.Client(cls.cfg)
        client.response_handler = api.echo_handler
        paths = (
            urljoin(href, scheduling_path),
            urljoin(href, scheduling_path),
            urljoin(href, scheduling_path),
            urljoin(href, scheduling_path),
            urljoin(href, scheduling_path_bad),
            urljoin(bad_repo_path, scheduling_path),
        )
        cls.bodies = (
            {"schedule": None},  # 400
            {"schedule": "PT30S", "unknown": "parameter"},  # 400
            ["Incorrect data type"],  # 400
            {"missing_required_keys": "schedule"},  # 400
            _SCHEDULE,  # tests incorrect importer in url, 404
            _SCHEDULE,  # tests incorrect repo in url, 404
        )
        cls.responses = tuple((client.post(path, body) for path, body in zip(paths, cls.bodies)))
        cls.status_codes = (400, 400, 400, 400, 404, 404)
开发者ID:seandst,项目名称:pulp-smash,代码行数:34,代码来源:test_schedule_sync.py


示例2: setUpClass

    def setUpClass(cls):
        """Create three repositories and read, update and delete them."""
        super(ReadUpdateDeleteTestCase, cls).setUpClass()
        client = api.Client(cls.cfg, api.json_handler)
        cls.repos = tuple((
            client.post(REPOSITORY_PATH, {'id': utils.uuid4()})
            for _ in range(3)
        ))
        cls.responses = {}
        client.response_handler = api.safe_handler

        # Read the first repo
        path = cls.repos[0]['_href']
        cls.responses['read'] = client.get(path)
        for key in {'importers', 'distributors', 'details'}:
            cls.responses['read_' + key] = client.get(path, params={key: True})

        # Update the second repo
        path = cls.repos[1]['_href']
        cls.update_body = {'delta': {
            key: utils.uuid4() for key in {'description', 'display_name'}
        }}
        cls.responses['update'] = client.put(path, cls.update_body)

        # Delete the third.
        cls.responses['delete'] = client.delete(cls.repos[2]['_href'])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:26,代码来源:test_repository.py


示例3: test_02_read_invalid_date

 def test_02_read_invalid_date(self):
     """Read a task by an invalid date."""
     with self.assertRaises(HTTPError):
         self.filter_tasks({
             'finished_at': utils.uuid4(),
             'started_at': utils.uuid4()
         })
开发者ID:asmacdo,项目名称:pulp,代码行数:7,代码来源:test_tasks.py


示例4: setUpClass

    def setUpClass(cls):
        """Create three users and read, update and delete them respectively."""
        super(ReadUpdateDeleteTestCase, cls).setUpClass()

        # Create three users and save their attributes.
        client = api.Client(cls.cfg, response_handler=api.json_handler)
        hrefs = [
            client.post(USER_PATH, {'login': utils.uuid4()})['_href']
            for _ in range(3)
        ]

        # Read, update and delete the users, and save the raw responses.
        client.response_handler = api.safe_handler
        cls.update_body = {'delta': {
            'name': utils.uuid4(),
            'password': utils.uuid4(),
            'roles': ['super-users'],
        }}
        cls.responses = {}
        cls.responses['read'] = client.get(hrefs[0])
        cls.responses['update'] = client.put(hrefs[1], cls.update_body)
        cls.responses['delete'] = client.delete(hrefs[2])

        # Read, update and delete the deleted user, and save the raw responses.
        client.response_handler = api.echo_handler
        cls.responses['read deleted'] = client.get(hrefs[2])
        cls.responses['update deleted'] = client.put(hrefs[2], {})
        cls.responses['delete deleted'] = client.delete(hrefs[2])

        # Mark resources to be deleted.
        cls.resources = {hrefs[0], hrefs[1]}
开发者ID:omaciel,项目名称:pulp-smash,代码行数:31,代码来源:test_user.py


示例5: setUpClass

    def setUpClass(cls):
        """Create a repository and add an importer and distributor to it.

        Do the following:

        1. Create a repository.
        2. Read the repository's importers and distributors.
        3. Add an importer and distributor to the repo.
        4. Re-read the repository's importers and distributors.
        """
        super(AddImporterDistributorTestCase, cls).setUpClass()
        if cls.cfg.version >= Version("2.10") and selectors.bug_is_untestable(2082, cls.cfg.version):
            raise SkipTest("https://pulp.plan.io/issues/2082")

        # Steps 1 and 2.
        client = api.Client(cls.cfg, api.json_handler)
        href = client.post(REPOSITORY_PATH, {"id": utils.uuid4()})["_href"]
        cls.resources.add(href)
        cls.pre_imp = client.get(urljoin(href, "importers/"))
        cls.pre_dist = client.get(urljoin(href, "distributors/"))

        # Steps 3 and 4.
        client.response_handler = api.safe_handler
        cls.add_imp = client.post(urljoin(href, "importers/"), {"importer_type_id": "iso_importer"})
        cls.add_dist = client.post(
            urljoin(href, "distributors/"),
            {"distributor_config": {}, "distributor_id": utils.uuid4(), "distributor_type_id": "iso_distributor"},
        )
        client.response_handler = api.json_handler
        cls.post_imp = client.get(urljoin(href, "importers/"))
        cls.post_dist = client.get(urljoin(href, "distributors/"))
开发者ID:pcreech,项目名称:pulp-smash,代码行数:31,代码来源:test_iso_crud.py


示例6: _gen_distributor

def _gen_distributor():
    """Return a semi-random dict for use in creating a YUM distributor."""
    return {
        "auto_publish": False,
        "distributor_id": utils.uuid4(),
        "distributor_type_id": "yum_distributor",
        "distributor_config": {"http": True, "https": True, "relative_url": utils.uuid4() + "/"},
    }
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:8,代码来源:test_broker.py


示例7: test_success

 def test_success(self):
     """Assert the method returns a path when a config file is found."""
     with mock.patch.object(xdg.BaseDirectory, 'load_config_paths') as lcp:
         lcp.return_value = ('an_iterable', 'of_xdg', 'config_paths')
         with mock.patch.object(os.path, 'isfile') as isfile:
             isfile.return_value = True
             # pylint:disable=protected-access
             config._get_config_file_path(utils.uuid4(), utils.uuid4())
     self.assertGreater(isfile.call_count, 0)
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:9,代码来源:test_config.py


示例8: setUpClass

    def setUpClass(cls):
        """Create three repositories and read, update and delete them."""
        super(ReadUpdateDeleteTestCase, cls).setUpClass()
        cls.bodies = {
            'read': {
                'distributors': [_DISTRIBUTOR],
                'id': utils.uuid4(),
                'importer_config': {},
                'importer_type_id': 'iso_importer',
                'notes': {'this': 'one'},
            },
            'update': {  # like read, minus notes…
                'description': utils.uuid4(),  # plus this
                'display_name': utils.uuid4(),  # and this
                'distributors': [_DISTRIBUTOR],
                'id': utils.uuid4(),
                'importer_config': {},
                'importer_type_id': 'iso_importer',
            },
            'delete': {  # like read…
                'description': utils.uuid4(),  # plus this
                'display_name': utils.uuid4(),  # and this
                'distributors': [_DISTRIBUTOR],
                'id': utils.uuid4(),
                'importer_config': {},
                'importer_type_id': 'iso_importer',
                'notes': {utils.uuid4(): utils.uuid4()},
            },
        }
        cls.update_body = {'delta': {
            key: utils.uuid4() for key in ('description', 'display_name')
        }}
        cls.responses = {}

        # Create repositories.
        client = api.Client(cls.cfg, api.json_handler)
        repos = {
            key: client.post(REPOSITORY_PATH, body)
            for key, body in cls.bodies.items()
        }
        for key in {'read', 'update'}:
            cls.resources.add(repos[key]['_href'])

        # Read, update and delete the repositories.
        client.response_handler = api.safe_handler
        cls.responses['read'] = client.get(repos['read']['_href'])
        for key in {'importers', 'distributors', 'details'}:
            cls.responses['read_' + key] = client.get(
                repos['read']['_href'],
                params={key: True},
            )
        cls.responses['update'] = client.put(
            repos['update']['_href'],
            cls.update_body,
        )
        cls.responses['delete'] = client.delete(repos['delete']['_href'])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:56,代码来源:test_iso_crud.py


示例9: test_failures

 def test_failures(self):
     """Assert the  method raises an exception when no config is found."""
     with mock.patch.object(xdg.BaseDirectory, 'load_config_paths') as lcp:
         lcp.return_value = ('an_iterable', 'of_xdg', 'config_paths')
         with mock.patch.object(os.path, 'isfile') as isfile:
             isfile.return_value = False
             with self.assertRaises(exceptions.ConfigFileNotFoundError):
                 # pylint:disable=protected-access
                 config._get_config_file_path(utils.uuid4(), utils.uuid4())
     self.assertGreater(isfile.call_count, 0)
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:10,代码来源:test_config.py


示例10: _gen_distributor

def _gen_distributor():
    """Return a semi-random dict for use in creating a YUM distributor."""
    return {
        'auto_publish': False,
        'distributor_id': utils.uuid4(),
        'distributor_type_id': 'yum_distributor',
        'distributor_config': {
            'http': True,
            'https': True,
            'relative_url': utils.uuid4() + '/',
        },
    }
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py


示例11: setUpClass

    def setUpClass(cls):
        """Create several schedules.

        Each schedule is created to test a different failure scenario.
        """
        super(CreateFailureTestCase, cls).setUpClass()
        client = api.Client(cls.cfg)

        # Create a repo with a valid feed and sync it
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        repo = client.post(REPOSITORY_PATH, body).json()
        cls.resources.add(repo['_href'])
        utils.sync_repo(cls.cfg, repo['_href'])

        # Add a distibutor
        distributor = gen_distributor()
        client.post(
            urljoin(repo['_href'], 'distributors/'),
            distributor
        )
        client.response_handler = api.echo_handler
        cls.bodies = (
            {'schedule': None},  # 400
            {'unknown': 'parameter', 'schedule': 'PT30S'},  # 400
            ['Incorrect data type'],  # 400
            {'missing_required_keys': 'schedule'},  # 400
            {'schedule': 'PT30S'},  # tests incorrect distributor in url, 404
            {'schedule': 'PT30S'},  # tests incorrect repo in url, 404
        )
        scheduling_url = '/'.join([
            'distributors', distributor['distributor_id'], 'schedules/publish/'
        ])
        bad_distributor_url = '/'.join([
            'distributors', utils.uuid4(), 'schedules/publish/'
        ])
        bad_repo_path = '/'.join([REPOSITORY_PATH, utils.uuid4()])
        cls.paths = (
            urljoin(repo['_href'], scheduling_url),
            urljoin(repo['_href'], scheduling_url),
            urljoin(repo['_href'], scheduling_url),
            urljoin(repo['_href'], scheduling_url),
            urljoin(repo['_href'], bad_distributor_url),
            urljoin(bad_repo_path, scheduling_url)
        )
        cls.status_codes = (400, 400, 400, 400, 404, 404)
        cls.responses = [
            client.post(path, req_body) for path, req_body in zip(
                cls.paths, cls.bodies)
        ]
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:50,代码来源:test_schedule_publish.py


示例12: _gen_attrs

def _gen_attrs():
    """Generate attributes for populating a ``ServerConfig``.

    Example usage: ``ServerConfig(**_gen_attrs())``.

    :returns: A dict. It populates all attributes in a ``ServerConfig``.
    """
    attrs = {
        key: utils.uuid4() for key in ('base_url', 'cli_transport', 'verify')
    }
    attrs['auth'] = [utils.uuid4() for _ in range(2)]
    attrs['version'] = '.'.join(
        type('')(random.randint(1, 150)) for _ in range(4)
    )
    return attrs
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:15,代码来源:test_config.py


示例13: setUpClass

    def setUpClass(cls):
        """Create a value for the rsync distrib's ``remote`` config section.

        Using the same config for each of the test methods allows the test
        methods to behave more similarly.
        """
        cls.cfg = config.get_config()
        ssh_user = utils.uuid4()[:12]
        cls.remote = {
            'host': 'example.com',
            'root': '/home/' + ssh_user,
            'ssh_identity_file': '/' + utils.uuid4(),
            'ssh_user': ssh_user,
        }
        cls._remote = cls.remote.copy()
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:15,代码来源:test_rsync_distributor.py


示例14: test_sync_downloaded_content

    def test_sync_downloaded_content(self):
        """Create two repositories with the same feed, and sync them serially.

        More specifically, this test creates two puppet repositories with
        identical feeds, syncs them serially, and verifies that both have equal
        non-zero content unit counts.
        """
        cfg = config.get_config()
        if selectors.bug_is_untestable(1937, cfg.version):
            self.skipTest('https://pulp.plan.io/issues/1937')
        utils.pulp_admin_login(cfg)

        # Create two repos, schedule them for deletion, and sync them.
        client = cli.Client(cfg)
        repo_ids = [utils.uuid4() for _ in range(2)]
        for repo_id in repo_ids:
            client.run((
                'pulp-admin puppet repo create '
                '--repo-id {} --feed {} --queries {}'
            ).format(repo_id, PUPPET_FEED, PUPPET_QUERY).split())
            self.addCleanup(client.run, (
                'pulp-admin puppet repo delete --repo-id {}'
            ).format(repo_id).split())
            client.run((
                'pulp-admin puppet repo sync run --repo-id {}'
            ).format(repo_id).split())

        # Verify the number of puppet modules in each repository.
        unit_counts = [
            get_num_units_in_repo(cfg, repo_id) for repo_id in repo_ids
        ]
        for i, unit_count in enumerate(unit_counts):
            with self.subTest(i=i):
                self.assertGreater(unit_count, 0)
        self.assertEqual(unit_counts[0], unit_counts[1])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:35,代码来源:test_puppet_sync.py


示例15: test_force_sync

    def test_force_sync(self):
        """Test whether one can force Pulp to perform a full sync."""
        cfg = config.get_config()
        if selectors.bug_is_untestable(1982, cfg.version):
            self.skipTest("https://pulp.plan.io/issues/1982")

        # Create and sync a repository.
        client = cli.Client(cfg)
        repo_id = utils.uuid4()
        client.run("pulp-admin rpm repo create --repo-id {} --feed {}".format(repo_id, RPM_SIGNED_FEED_URL).split())
        self.addCleanup(client.run, "pulp-admin rpm repo delete --repo-id {}".format(repo_id).split())
        sync_repo(cfg, repo_id)

        # Delete a random RPM
        rpms = self._list_rpms(cfg)
        client.run("{} rm -rf {}".format("sudo" if not is_root(cfg) else "", random.choice(rpms)).split())
        with self.subTest(comment="Verify the RPM was removed."):
            self.assertEqual(len(self._list_rpms(cfg)), len(rpms) - 1)

        # Sync the repository *without* force_sync.
        sync_repo(cfg, repo_id)
        with self.subTest(comment="Verify the RPM has not been restored."):
            self.assertEqual(len(self._list_rpms(cfg)), len(rpms) - 1)

        # Sync the repository again
        sync_repo(cfg, repo_id, force_sync=True)
        with self.subTest(comment="Verify the RPM has been restored."):
            self.assertEqual(len(self._list_rpms(cfg)), len(rpms))
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:28,代码来源:test_sync.py


示例16: gen_repo

def gen_repo():
    """Return a semi-random dict that used for creating a Docker repo."""
    return {
        'id': utils.uuid4(), 'importer_config': {},
        'importer_type_id': 'docker_importer',
        'notes': {'_repo-type': 'docker-repo'},
    }
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:7,代码来源:utils.py


示例17: setUpModule

def setUpModule():  # pylint:disable=invalid-name
    """Possibly skip tests. Create and sync an RPM repository.

    Skip tests in this module if the RPM plugin is not installed on the target
    Pulp server. Then create an RPM repository with a feed and sync it. Test
    cases may copy data from this repository but should **not** change it.
    """
    set_up_module()
    cfg = config.get_config()
    client = cli.Client(config.get_config())

    # log in, then create repository
    utils.pulp_admin_login(cfg)
    global _REPO_ID  # pylint:disable=global-statement
    _REPO_ID = utils.uuid4()
    client.run(
        'pulp-admin rpm repo create --repo-id {} --feed {}'
        .format(_REPO_ID, constants.RPM_FEED_URL).split()
    )

    # If setUpModule() fails, tearDownModule() isn't run. In addition, we can't
    # use addCleanup(), as it's an instance method. If this set-up procedure
    # grows, consider implementing a stack of tear-down steps instead.
    try:
        client.run(
            'pulp-admin rpm repo sync run --repo-id {}'
            .format(_REPO_ID).split()
        )
    except subprocess.CalledProcessError:
        client.run(
            'pulp-admin rpm repo delete --repo-id {}'.format(_REPO_ID).split()
        )
        raise
开发者ID:danuzclaudes,项目名称:redhat-summer-internship,代码行数:33,代码来源:test_copy_units.py


示例18: make_repo

    def make_repo(self, cfg, remote):
        """Create a repository with an importer and pair of distributors.

        Create an RPM repository with:

        * A yum importer with a valid feed.
        * A yum distributor.
        * An RPM rsync distributor referencing the yum distributor.

        In addition, schedule the repository for deletion.

        :param pulp_smash.config.ServerConfig cfg: Information about the Pulp
            server being targeted.
        :param remote: A dict for the RPM rsync distributor's ``remote``
            section.
        :returns: The repository's href, as a string.
        """
        api_client = api.Client(cfg, api.json_handler)
        body = gen_repo()
        body['importer_config']['feed'] = RPM_FEED_URL
        body['distributors'] = [gen_distributor()]
        body['distributors'].append({
            'distributor_id': utils.uuid4(),
            'distributor_type_id': 'rpm_rsync_distributor',
            'distributor_config': {
                'predistributor_id': body['distributors'][0]['distributor_id'],
                'remote': remote,
            }
        })
        repo_href = api_client.post(REPOSITORY_PATH, body)['_href']
        self.addCleanup(api_client.delete, repo_href)
        return repo_href
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:32,代码来源:test_rsync_distributor.py


示例19: setUpClass

 def setUpClass(cls):
     """Create a content source."""
     super(RefreshAndDeleteContentSourcesTestCase, cls).setUpClass()
     cls.cfg = config.get_config()
     if cls.cfg.version < Version('2.8.6'):
         raise unittest.SkipTest('This test requires at least 2.8.6')
     pulp_admin_login(cls.cfg)
     cls.client = cli.Client(cls.cfg)
     cls.content_source_id = uuid4()
     content_source_path = generate_content_source(
         cls.cfg,
         cls.content_source_id,
         enabled='1',
         type='yum',
         base_url=RPM_SIGNED_FEED_URL,
     )
     sudo = '' if is_root(cls.cfg) else 'sudo '
     cls.responses = [
         cls.client.run(
             'pulp-admin content sources refresh'.split()
         ),
         _get_content_source_ids(cls.cfg),
         cls.client.run(
             'pulp-admin content sources refresh --source-id {}'
             .format(cls.content_source_id).split()
         ),
     ]
     cls.client.run(
         '{}rm -f {}'.format(sudo, content_source_path).split())
     cls.responses.append(_get_content_source_ids(cls.cfg))
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:30,代码来源:test_content_sources.py


示例20: test_01_create_task

 def test_01_create_task(self):
     """Create a task."""
     repo = self.client.post(REPO_PATH, gen_repo())
     self.addCleanup(self.client.delete, repo['_href'])
     attrs = {'description': utils.uuid4()}
     response = self.client.patch(repo['_href'], attrs)
     self.task.update(self.client.get(response['task']))
开发者ID:asmacdo,项目名称:pulp,代码行数:7,代码来源:test_tasks.py



注:本文中的pulp_smash.utils.uuid4函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pulsar.arbiter函数代码示例发布时间:2022-05-25
下一篇:
Python utils.sync_repo函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap