本文整理汇总了Python中pulp_smash.utils.uuid4函数的典型用法代码示例。如果您正苦于以下问题:Python uuid4函数的具体用法?Python uuid4怎么用?Python uuid4使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了uuid4函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setUpClass
def setUpClass(cls):
"""Intentionally fail at creating several sync schedules for a repo.
Each schedule tests a different failure scenario.
"""
super(CreateFailureTestCase, cls).setUpClass()
href, importer_type_id = cls.create_repo()
# We'll need these below.
scheduling_path = _SCHEDULE_PATH.format(importer_type_id)
scheduling_path_bad = _SCHEDULE_PATH.format(utils.uuid4())
bad_repo_path = "{}/{}/".format(REPOSITORY_PATH, utils.uuid4())
# Use client to get paths with bodies. Save responses and status_codes.
client = api.Client(cls.cfg)
client.response_handler = api.echo_handler
paths = (
urljoin(href, scheduling_path),
urljoin(href, scheduling_path),
urljoin(href, scheduling_path),
urljoin(href, scheduling_path),
urljoin(href, scheduling_path_bad),
urljoin(bad_repo_path, scheduling_path),
)
cls.bodies = (
{"schedule": None}, # 400
{"schedule": "PT30S", "unknown": "parameter"}, # 400
["Incorrect data type"], # 400
{"missing_required_keys": "schedule"}, # 400
_SCHEDULE, # tests incorrect importer in url, 404
_SCHEDULE, # tests incorrect repo in url, 404
)
cls.responses = tuple((client.post(path, body) for path, body in zip(paths, cls.bodies)))
cls.status_codes = (400, 400, 400, 400, 404, 404)
开发者ID:seandst,项目名称:pulp-smash,代码行数:34,代码来源:test_schedule_sync.py
示例2: setUpClass
def setUpClass(cls):
"""Create three repositories and read, update and delete them."""
super(ReadUpdateDeleteTestCase, cls).setUpClass()
client = api.Client(cls.cfg, api.json_handler)
cls.repos = tuple((
client.post(REPOSITORY_PATH, {'id': utils.uuid4()})
for _ in range(3)
))
cls.responses = {}
client.response_handler = api.safe_handler
# Read the first repo
path = cls.repos[0]['_href']
cls.responses['read'] = client.get(path)
for key in {'importers', 'distributors', 'details'}:
cls.responses['read_' + key] = client.get(path, params={key: True})
# Update the second repo
path = cls.repos[1]['_href']
cls.update_body = {'delta': {
key: utils.uuid4() for key in {'description', 'display_name'}
}}
cls.responses['update'] = client.put(path, cls.update_body)
# Delete the third.
cls.responses['delete'] = client.delete(cls.repos[2]['_href'])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:26,代码来源:test_repository.py
示例3: test_02_read_invalid_date
def test_02_read_invalid_date(self):
"""Read a task by an invalid date."""
with self.assertRaises(HTTPError):
self.filter_tasks({
'finished_at': utils.uuid4(),
'started_at': utils.uuid4()
})
开发者ID:asmacdo,项目名称:pulp,代码行数:7,代码来源:test_tasks.py
示例4: setUpClass
def setUpClass(cls):
"""Create three users and read, update and delete them respectively."""
super(ReadUpdateDeleteTestCase, cls).setUpClass()
# Create three users and save their attributes.
client = api.Client(cls.cfg, response_handler=api.json_handler)
hrefs = [
client.post(USER_PATH, {'login': utils.uuid4()})['_href']
for _ in range(3)
]
# Read, update and delete the users, and save the raw responses.
client.response_handler = api.safe_handler
cls.update_body = {'delta': {
'name': utils.uuid4(),
'password': utils.uuid4(),
'roles': ['super-users'],
}}
cls.responses = {}
cls.responses['read'] = client.get(hrefs[0])
cls.responses['update'] = client.put(hrefs[1], cls.update_body)
cls.responses['delete'] = client.delete(hrefs[2])
# Read, update and delete the deleted user, and save the raw responses.
client.response_handler = api.echo_handler
cls.responses['read deleted'] = client.get(hrefs[2])
cls.responses['update deleted'] = client.put(hrefs[2], {})
cls.responses['delete deleted'] = client.delete(hrefs[2])
# Mark resources to be deleted.
cls.resources = {hrefs[0], hrefs[1]}
开发者ID:omaciel,项目名称:pulp-smash,代码行数:31,代码来源:test_user.py
示例5: setUpClass
def setUpClass(cls):
"""Create a repository and add an importer and distributor to it.
Do the following:
1. Create a repository.
2. Read the repository's importers and distributors.
3. Add an importer and distributor to the repo.
4. Re-read the repository's importers and distributors.
"""
super(AddImporterDistributorTestCase, cls).setUpClass()
if cls.cfg.version >= Version("2.10") and selectors.bug_is_untestable(2082, cls.cfg.version):
raise SkipTest("https://pulp.plan.io/issues/2082")
# Steps 1 and 2.
client = api.Client(cls.cfg, api.json_handler)
href = client.post(REPOSITORY_PATH, {"id": utils.uuid4()})["_href"]
cls.resources.add(href)
cls.pre_imp = client.get(urljoin(href, "importers/"))
cls.pre_dist = client.get(urljoin(href, "distributors/"))
# Steps 3 and 4.
client.response_handler = api.safe_handler
cls.add_imp = client.post(urljoin(href, "importers/"), {"importer_type_id": "iso_importer"})
cls.add_dist = client.post(
urljoin(href, "distributors/"),
{"distributor_config": {}, "distributor_id": utils.uuid4(), "distributor_type_id": "iso_distributor"},
)
client.response_handler = api.json_handler
cls.post_imp = client.get(urljoin(href, "importers/"))
cls.post_dist = client.get(urljoin(href, "distributors/"))
开发者ID:pcreech,项目名称:pulp-smash,代码行数:31,代码来源:test_iso_crud.py
示例6: _gen_distributor
def _gen_distributor():
"""Return a semi-random dict for use in creating a YUM distributor."""
return {
"auto_publish": False,
"distributor_id": utils.uuid4(),
"distributor_type_id": "yum_distributor",
"distributor_config": {"http": True, "https": True, "relative_url": utils.uuid4() + "/"},
}
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:8,代码来源:test_broker.py
示例7: test_success
def test_success(self):
"""Assert the method returns a path when a config file is found."""
with mock.patch.object(xdg.BaseDirectory, 'load_config_paths') as lcp:
lcp.return_value = ('an_iterable', 'of_xdg', 'config_paths')
with mock.patch.object(os.path, 'isfile') as isfile:
isfile.return_value = True
# pylint:disable=protected-access
config._get_config_file_path(utils.uuid4(), utils.uuid4())
self.assertGreater(isfile.call_count, 0)
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:9,代码来源:test_config.py
示例8: setUpClass
def setUpClass(cls):
"""Create three repositories and read, update and delete them."""
super(ReadUpdateDeleteTestCase, cls).setUpClass()
cls.bodies = {
'read': {
'distributors': [_DISTRIBUTOR],
'id': utils.uuid4(),
'importer_config': {},
'importer_type_id': 'iso_importer',
'notes': {'this': 'one'},
},
'update': { # like read, minus notes…
'description': utils.uuid4(), # plus this
'display_name': utils.uuid4(), # and this
'distributors': [_DISTRIBUTOR],
'id': utils.uuid4(),
'importer_config': {},
'importer_type_id': 'iso_importer',
},
'delete': { # like read…
'description': utils.uuid4(), # plus this
'display_name': utils.uuid4(), # and this
'distributors': [_DISTRIBUTOR],
'id': utils.uuid4(),
'importer_config': {},
'importer_type_id': 'iso_importer',
'notes': {utils.uuid4(): utils.uuid4()},
},
}
cls.update_body = {'delta': {
key: utils.uuid4() for key in ('description', 'display_name')
}}
cls.responses = {}
# Create repositories.
client = api.Client(cls.cfg, api.json_handler)
repos = {
key: client.post(REPOSITORY_PATH, body)
for key, body in cls.bodies.items()
}
for key in {'read', 'update'}:
cls.resources.add(repos[key]['_href'])
# Read, update and delete the repositories.
client.response_handler = api.safe_handler
cls.responses['read'] = client.get(repos['read']['_href'])
for key in {'importers', 'distributors', 'details'}:
cls.responses['read_' + key] = client.get(
repos['read']['_href'],
params={key: True},
)
cls.responses['update'] = client.put(
repos['update']['_href'],
cls.update_body,
)
cls.responses['delete'] = client.delete(repos['delete']['_href'])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:56,代码来源:test_iso_crud.py
示例9: test_failures
def test_failures(self):
"""Assert the method raises an exception when no config is found."""
with mock.patch.object(xdg.BaseDirectory, 'load_config_paths') as lcp:
lcp.return_value = ('an_iterable', 'of_xdg', 'config_paths')
with mock.patch.object(os.path, 'isfile') as isfile:
isfile.return_value = False
with self.assertRaises(exceptions.ConfigFileNotFoundError):
# pylint:disable=protected-access
config._get_config_file_path(utils.uuid4(), utils.uuid4())
self.assertGreater(isfile.call_count, 0)
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:10,代码来源:test_config.py
示例10: _gen_distributor
def _gen_distributor():
"""Return a semi-random dict for use in creating a YUM distributor."""
return {
'auto_publish': False,
'distributor_id': utils.uuid4(),
'distributor_type_id': 'yum_distributor',
'distributor_config': {
'http': True,
'https': True,
'relative_url': utils.uuid4() + '/',
},
}
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:12,代码来源:test_sync_publish.py
示例11: setUpClass
def setUpClass(cls):
"""Create several schedules.
Each schedule is created to test a different failure scenario.
"""
super(CreateFailureTestCase, cls).setUpClass()
client = api.Client(cls.cfg)
# Create a repo with a valid feed and sync it
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
repo = client.post(REPOSITORY_PATH, body).json()
cls.resources.add(repo['_href'])
utils.sync_repo(cls.cfg, repo['_href'])
# Add a distibutor
distributor = gen_distributor()
client.post(
urljoin(repo['_href'], 'distributors/'),
distributor
)
client.response_handler = api.echo_handler
cls.bodies = (
{'schedule': None}, # 400
{'unknown': 'parameter', 'schedule': 'PT30S'}, # 400
['Incorrect data type'], # 400
{'missing_required_keys': 'schedule'}, # 400
{'schedule': 'PT30S'}, # tests incorrect distributor in url, 404
{'schedule': 'PT30S'}, # tests incorrect repo in url, 404
)
scheduling_url = '/'.join([
'distributors', distributor['distributor_id'], 'schedules/publish/'
])
bad_distributor_url = '/'.join([
'distributors', utils.uuid4(), 'schedules/publish/'
])
bad_repo_path = '/'.join([REPOSITORY_PATH, utils.uuid4()])
cls.paths = (
urljoin(repo['_href'], scheduling_url),
urljoin(repo['_href'], scheduling_url),
urljoin(repo['_href'], scheduling_url),
urljoin(repo['_href'], scheduling_url),
urljoin(repo['_href'], bad_distributor_url),
urljoin(bad_repo_path, scheduling_url)
)
cls.status_codes = (400, 400, 400, 400, 404, 404)
cls.responses = [
client.post(path, req_body) for path, req_body in zip(
cls.paths, cls.bodies)
]
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:50,代码来源:test_schedule_publish.py
示例12: _gen_attrs
def _gen_attrs():
"""Generate attributes for populating a ``ServerConfig``.
Example usage: ``ServerConfig(**_gen_attrs())``.
:returns: A dict. It populates all attributes in a ``ServerConfig``.
"""
attrs = {
key: utils.uuid4() for key in ('base_url', 'cli_transport', 'verify')
}
attrs['auth'] = [utils.uuid4() for _ in range(2)]
attrs['version'] = '.'.join(
type('')(random.randint(1, 150)) for _ in range(4)
)
return attrs
开发者ID:bowlofeggs,项目名称:pulp-smash,代码行数:15,代码来源:test_config.py
示例13: setUpClass
def setUpClass(cls):
"""Create a value for the rsync distrib's ``remote`` config section.
Using the same config for each of the test methods allows the test
methods to behave more similarly.
"""
cls.cfg = config.get_config()
ssh_user = utils.uuid4()[:12]
cls.remote = {
'host': 'example.com',
'root': '/home/' + ssh_user,
'ssh_identity_file': '/' + utils.uuid4(),
'ssh_user': ssh_user,
}
cls._remote = cls.remote.copy()
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:15,代码来源:test_rsync_distributor.py
示例14: test_sync_downloaded_content
def test_sync_downloaded_content(self):
"""Create two repositories with the same feed, and sync them serially.
More specifically, this test creates two puppet repositories with
identical feeds, syncs them serially, and verifies that both have equal
non-zero content unit counts.
"""
cfg = config.get_config()
if selectors.bug_is_untestable(1937, cfg.version):
self.skipTest('https://pulp.plan.io/issues/1937')
utils.pulp_admin_login(cfg)
# Create two repos, schedule them for deletion, and sync them.
client = cli.Client(cfg)
repo_ids = [utils.uuid4() for _ in range(2)]
for repo_id in repo_ids:
client.run((
'pulp-admin puppet repo create '
'--repo-id {} --feed {} --queries {}'
).format(repo_id, PUPPET_FEED, PUPPET_QUERY).split())
self.addCleanup(client.run, (
'pulp-admin puppet repo delete --repo-id {}'
).format(repo_id).split())
client.run((
'pulp-admin puppet repo sync run --repo-id {}'
).format(repo_id).split())
# Verify the number of puppet modules in each repository.
unit_counts = [
get_num_units_in_repo(cfg, repo_id) for repo_id in repo_ids
]
for i, unit_count in enumerate(unit_counts):
with self.subTest(i=i):
self.assertGreater(unit_count, 0)
self.assertEqual(unit_counts[0], unit_counts[1])
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:35,代码来源:test_puppet_sync.py
示例15: test_force_sync
def test_force_sync(self):
"""Test whether one can force Pulp to perform a full sync."""
cfg = config.get_config()
if selectors.bug_is_untestable(1982, cfg.version):
self.skipTest("https://pulp.plan.io/issues/1982")
# Create and sync a repository.
client = cli.Client(cfg)
repo_id = utils.uuid4()
client.run("pulp-admin rpm repo create --repo-id {} --feed {}".format(repo_id, RPM_SIGNED_FEED_URL).split())
self.addCleanup(client.run, "pulp-admin rpm repo delete --repo-id {}".format(repo_id).split())
sync_repo(cfg, repo_id)
# Delete a random RPM
rpms = self._list_rpms(cfg)
client.run("{} rm -rf {}".format("sudo" if not is_root(cfg) else "", random.choice(rpms)).split())
with self.subTest(comment="Verify the RPM was removed."):
self.assertEqual(len(self._list_rpms(cfg)), len(rpms) - 1)
# Sync the repository *without* force_sync.
sync_repo(cfg, repo_id)
with self.subTest(comment="Verify the RPM has not been restored."):
self.assertEqual(len(self._list_rpms(cfg)), len(rpms) - 1)
# Sync the repository again
sync_repo(cfg, repo_id, force_sync=True)
with self.subTest(comment="Verify the RPM has been restored."):
self.assertEqual(len(self._list_rpms(cfg)), len(rpms))
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:28,代码来源:test_sync.py
示例16: gen_repo
def gen_repo():
"""Return a semi-random dict that used for creating a Docker repo."""
return {
'id': utils.uuid4(), 'importer_config': {},
'importer_type_id': 'docker_importer',
'notes': {'_repo-type': 'docker-repo'},
}
开发者ID:BrnoPCmaniak,项目名称:pulp-smash,代码行数:7,代码来源:utils.py
示例17: setUpModule
def setUpModule(): # pylint:disable=invalid-name
"""Possibly skip tests. Create and sync an RPM repository.
Skip tests in this module if the RPM plugin is not installed on the target
Pulp server. Then create an RPM repository with a feed and sync it. Test
cases may copy data from this repository but should **not** change it.
"""
set_up_module()
cfg = config.get_config()
client = cli.Client(config.get_config())
# log in, then create repository
utils.pulp_admin_login(cfg)
global _REPO_ID # pylint:disable=global-statement
_REPO_ID = utils.uuid4()
client.run(
'pulp-admin rpm repo create --repo-id {} --feed {}'
.format(_REPO_ID, constants.RPM_FEED_URL).split()
)
# If setUpModule() fails, tearDownModule() isn't run. In addition, we can't
# use addCleanup(), as it's an instance method. If this set-up procedure
# grows, consider implementing a stack of tear-down steps instead.
try:
client.run(
'pulp-admin rpm repo sync run --repo-id {}'
.format(_REPO_ID).split()
)
except subprocess.CalledProcessError:
client.run(
'pulp-admin rpm repo delete --repo-id {}'.format(_REPO_ID).split()
)
raise
开发者ID:danuzclaudes,项目名称:redhat-summer-internship,代码行数:33,代码来源:test_copy_units.py
示例18: make_repo
def make_repo(self, cfg, remote):
"""Create a repository with an importer and pair of distributors.
Create an RPM repository with:
* A yum importer with a valid feed.
* A yum distributor.
* An RPM rsync distributor referencing the yum distributor.
In addition, schedule the repository for deletion.
:param pulp_smash.config.ServerConfig cfg: Information about the Pulp
server being targeted.
:param remote: A dict for the RPM rsync distributor's ``remote``
section.
:returns: The repository's href, as a string.
"""
api_client = api.Client(cfg, api.json_handler)
body = gen_repo()
body['importer_config']['feed'] = RPM_FEED_URL
body['distributors'] = [gen_distributor()]
body['distributors'].append({
'distributor_id': utils.uuid4(),
'distributor_type_id': 'rpm_rsync_distributor',
'distributor_config': {
'predistributor_id': body['distributors'][0]['distributor_id'],
'remote': remote,
}
})
repo_href = api_client.post(REPOSITORY_PATH, body)['_href']
self.addCleanup(api_client.delete, repo_href)
return repo_href
开发者ID:danuzclaudes,项目名称:pulp-smash,代码行数:32,代码来源:test_rsync_distributor.py
示例19: setUpClass
def setUpClass(cls):
"""Create a content source."""
super(RefreshAndDeleteContentSourcesTestCase, cls).setUpClass()
cls.cfg = config.get_config()
if cls.cfg.version < Version('2.8.6'):
raise unittest.SkipTest('This test requires at least 2.8.6')
pulp_admin_login(cls.cfg)
cls.client = cli.Client(cls.cfg)
cls.content_source_id = uuid4()
content_source_path = generate_content_source(
cls.cfg,
cls.content_source_id,
enabled='1',
type='yum',
base_url=RPM_SIGNED_FEED_URL,
)
sudo = '' if is_root(cls.cfg) else 'sudo '
cls.responses = [
cls.client.run(
'pulp-admin content sources refresh'.split()
),
_get_content_source_ids(cls.cfg),
cls.client.run(
'pulp-admin content sources refresh --source-id {}'
.format(cls.content_source_id).split()
),
]
cls.client.run(
'{}rm -f {}'.format(sudo, content_source_path).split())
cls.responses.append(_get_content_source_ids(cls.cfg))
开发者ID:PulpQE,项目名称:pulp-smash,代码行数:30,代码来源:test_content_sources.py
示例20: test_01_create_task
def test_01_create_task(self):
"""Create a task."""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
attrs = {'description': utils.uuid4()}
response = self.client.patch(repo['_href'], attrs)
self.task.update(self.client.get(response['task']))
开发者ID:asmacdo,项目名称:pulp,代码行数:7,代码来源:test_tasks.py
注:本文中的pulp_smash.utils.uuid4函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论