本文整理汇总了Python中pulp_smash.pulp3.utils.gen_repo函数的典型用法代码示例。如果您正苦于以下问题:Python gen_repo函数的具体用法?Python gen_repo怎么用?Python gen_repo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了gen_repo函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_all
def test_all(self):
"""Test whether a particular repository version can be published.
1. Create a repository with at least 2 repository versions.
2. Create a publication by supplying the latest ``repository_version``.
3. Assert that the publication ``repository_version`` attribute points
to the latest repository version.
4. Create a publication by supplying the non-latest
``repository_version``.
5. Assert that the publication ``repository_version`` attribute points
to the supplied repository version.
6. Assert that an exception is raised when providing two different
repository versions to be published at same time.
"""
cfg = config.get_config()
client = api.Client(cfg, api.json_handler)
body = gen_rpm_remote()
remote = client.post(RPM_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
sync(cfg, remote, repo)
publisher = client.post(RPM_PUBLISHER_PATH, gen_rpm_publisher())
self.addCleanup(client.delete, publisher['_href'])
# Step 1
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
for rpm_content in client.get(RPM_CONTENT_PATH)['results']:
client.post(
repo['_versions_href'],
{'add_content_units': [rpm_content['_href']]}
)
version_hrefs = tuple(ver['_href'] for ver in get_versions(repo))
non_latest = choice(version_hrefs[:-1])
# Step 2
publication = publish(cfg, publisher, repo)
# Step 3
self.assertEqual(publication['repository_version'], version_hrefs[-1])
# Step 4
publication = publish(cfg, publisher, repo, non_latest)
# Step 5
self.assertEqual(publication['repository_version'], non_latest)
# Step 6
with self.assertRaises(HTTPError):
body = {
'repository': repo['_href'],
'repository_version': non_latest
}
client.post(urljoin(publisher['_href'], 'publish/'), body)
开发者ID:daviddavis,项目名称:pulp_rpm,代码行数:59,代码来源:test_publish.py
示例2: populate_pulp
def populate_pulp(cfg, url=DOCKER_V2_FEED_URL):
"""Add docker contents to Pulp.
:param pulp_smash.config.PulpSmashConfig: Information about a Pulp application.
:param url: The docker repository URL. Defaults to
:data:`pulp_smash.constants.DOCKER_FIXTURE_URL`
:returns: A list of dicts, where each dict describes one file content in Pulp.
"""
client = api.Client(cfg, api.json_handler)
remote = {}
repo = {}
try:
remote.update(
client.post(
DOCKER_REMOTE_PATH,
gen_remote(url, upstream_name=DOCKER_UPSTREAM_NAME)
)
)
repo.update(client.post(REPO_PATH, gen_repo()))
sync(cfg, remote, repo)
finally:
if remote:
client.delete(remote['_href'])
if repo:
client.delete(repo['_href'])
return client.get(DOCKER_CONTENT_PATH)['results']
开发者ID:pulp,项目名称:pulp_docker,代码行数:27,代码来源:utils.py
示例3: test_all
def test_all(self):
"""Sync/publish a repo that ``updateinfo.xml`` contains references.
This test targets the following issue:
`Pulp #3998 <https://pulp.plan.io/issues/3998>`_.
"""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
body = gen_rpm_remote(url=RPM_REFERENCES_UPDATEINFO_URL)
remote = self.client.post(RPM_REMOTE_PATH, body)
self.addCleanup(self.client.delete, remote['_href'])
sync(self.cfg, remote, repo)
repo = self.client.get(repo['_href'])
self.assertIsNotNone(repo['_latest_version_href'])
content_summary = get_content_summary(repo)
self.assertDictEqual(
content_summary,
RPM_FIXTURE_SUMMARY,
content_summary
)
publication = publish(self.cfg, repo)
self.addCleanup(self.client.delete, publication['_href'])
开发者ID:pulp,项目名称:pulp_rpm,代码行数:28,代码来源:test_publish.py
示例4: populate_pulp
def populate_pulp(cfg, url=None):
"""Add file contents to Pulp.
:param pulp_smash.config.PulpSmashConfig: Information about a Pulp
application.
:param url: The URL to a file repository's ``PULP_MANIFEST`` file. Defaults
to :data:`pulp_smash.constants.FILE_FEED_URL` + ``PULP_MANIFEST``.
:returns: A list of dicts, where each dict describes one file content in
Pulp.
"""
if url is None:
url = urljoin(FILE_FIXTURE_URL, 'PULP_MANIFEST')
client = api.Client(cfg, api.json_handler)
remote = {}
repo = {}
try:
remote.update(client.post(FILE_REMOTE_PATH, gen_remote(url)))
repo.update(client.post(REPO_PATH, gen_repo()))
sync(cfg, remote, repo)
finally:
if remote:
client.delete(remote['_href'])
if repo:
client.delete(repo['_href'])
return client.get(FILE_CONTENT_PATH)['results']
开发者ID:bmbouter,项目名称:pulp,代码行数:25,代码来源:utils.py
示例5: test_01_create_task
def test_01_create_task(self):
"""Create a task."""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
attrs = {'description': utils.uuid4()}
response = self.client.patch(repo['_href'], attrs)
self.task.update(self.client.get(response['task']))
开发者ID:asmacdo,项目名称:pulp,代码行数:7,代码来源:test_tasks.py
示例6: test_all
def test_all(self):
"""Test whether the content present in a repo version is immutable.
Do the following:
1. Create a repository that has at least one repository version.
2. Attempt to update the content of a repository version.
3. Assert that an HTTP exception is raised.
4. Assert that the repository version was not updated.
"""
cfg = config.get_config()
client = api.Client(cfg, api.json_handler)
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
body = gen_file_remote()
remote = client.post(FILE_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
sync(cfg, remote, repo)
latest_version_href = client.get(repo['_href'])['_latest_version_href']
with self.assertRaises(HTTPError):
client.post(latest_version_href)
repo = client.get(repo['_href'])
self.assertEqual(latest_version_href, repo['_latest_version_href'])
开发者ID:asmacdo,项目名称:pulp,代码行数:27,代码来源:test_repo_versions.py
示例7: test_exclude_unavailable_projects
def test_exclude_unavailable_projects(self):
"""
Test that sync doesn't fail if some of the excluded projects aren't available.
Do the following:
1. Update the remote to exclude a specifier that is smaller than the previous one.
2. Sync the remote again.
3. Assert that the content counts in the repo have increased again, to the count
of the smaller excludes specifier.
"""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
body = gen_python_remote(
includes=PYTHON_MD_PROJECT_SPECIFIER,
excludes=PYTHON_UNAVAILABLE_PROJECT_SPECIFIER
)
remote = self.client.post(PYTHON_REMOTE_PATH, body)
self.addCleanup(self.client.delete, remote['_href'])
sync(self.cfg, remote, repo)
repo = self.client.get(repo['_href'])
self.assertEqual(
get_content_summary(repo)[PYTHON_CONTENT_NAME],
PYTHON_MD_PACKAGE_COUNT - PYTHON_UNAVAILABLE_PACKAGE_COUNT
)
开发者ID:pulp,项目名称:pulp_python,代码行数:29,代码来源:test_sync.py
示例8: test_file_decriptors
def test_file_decriptors(self):
"""Test whether file descriptors are closed properly.
This test targets the following issue:
`Pulp #4073 <https://pulp.plan.io/issues/4073>`_
Do the following:
1. Check if 'lsof' is installed. If it is not, skip this test.
2. Create and sync a repo.
3. Run the 'lsof' command to verify that files in the
path ``/var/lib/pulp/`` are closed after the sync.
4. Assert that issued command returns `0` opened files.
"""
cli_client = cli.Client(self.cfg, cli.echo_handler)
# check if 'lsof' is available
if cli_client.run(('which', 'lsof')).returncode != 0:
raise unittest.SkipTest('lsof package is not present')
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
remote = self.client.post(DEB_REMOTE_PATH, gen_deb_remote())
self.addCleanup(self.client.delete, remote['_href'])
sync(self.cfg, remote, repo)
cmd = 'lsof -t +D {}'.format(MEDIA_PATH).split()
response = cli_client.run(cmd).stdout
self.assertEqual(len(response), 0, response)
开发者ID:pulp,项目名称:pulp_deb,代码行数:30,代码来源:test_sync.py
示例9: test_single_request_upload
def test_single_request_upload(self):
"""Test single request upload."""
cfg = config.get_config()
# Pulp does not support single request upload for a RPM already present
# in Pulp.
delete_orphans(cfg)
file = {'file': utils.http_get(RPM_UNSIGNED_URL)}
client = api.Client(cfg, api.page_handler)
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
client.post(
urljoin(BASE_PATH, 'rpm/upload/'),
files=file,
data={'repository': repo['_href']}
)
repo = client.get(repo['_href'])
# Assertion about repo version.
self.assertIsNotNone(repo['_latest_version_href'], repo)
# Assertions about artifcats.
artifact = client.get(ARTIFACTS_PATH)
self.assertEqual(len(artifact), 1, artifact)
self.assertEqual(
artifact[0]['sha256'],
hashlib.sha256(file['file']).hexdigest(),
artifact
)
# Assertion about content unit.
content = client.get(RPM_CONTENT_PATH)
self.assertEqual(len(content), 1, content)
开发者ID:dralley,项目名称:pulp_rpm,代码行数:33,代码来源:test_upload.py
示例10: test_all
def test_all(self):
"""Verify publisher and remote can be used with different repos.
This test explores the design choice stated in `Pulp #3341`_ that
remove the FK from publishers and remotes to repository.
Allowing remotes and publishers to be used with different
repositories.
.. _Pulp #3341: https://pulp.plan.io/issues/3341
Do the following:
1. Create a remote, and a publisher.
2. Create 2 repositories.
3. Sync both repositories using the same remote.
4. Assert that the two repositories have the same contents.
5. Publish both repositories using the same publisher.
6. Assert that each generated publication has the same publisher, but
are associated with different repositories.
"""
cfg = config.get_config()
# Create a remote and publisher.
client = api.Client(cfg, api.json_handler)
body = gen_file_remote()
remote = client.post(FILE_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
publisher = client.post(FILE_PUBLISHER_PATH, gen_publisher())
self.addCleanup(client.delete, publisher['_href'])
# Create and sync repos.
repos = []
for _ in range(2):
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
sync(cfg, remote, repo)
repos.append(client.get(repo['_href']))
# Compare contents of repositories.
contents = []
for repo in repos:
contents.append(get_content(repo)[FILE_CONTENT_NAME])
self.assertEqual(
{content['_href'] for content in contents[0]},
{content['_href'] for content in contents[1]},
)
# Publish repositories.
publications = []
for repo in repos:
publications.append(publish(cfg, publisher, repo))
self.assertEqual(
publications[0]['publisher'],
publications[1]['publisher']
)
self.assertNotEqual(
publications[0]['repository_version'],
publications[1]['repository_version']
)
开发者ID:asmacdo,项目名称:pulp,代码行数:60,代码来源:test_unlinking_repo.py
示例11: test_content
def test_content(self):
"""Test pagination for repository versions."""
# Add content to Pulp, create a repo, and add content to repo. We
# sample 21 contents, because with page_size set to 10, this produces 3
# pages, where the three three pages have unique combinations of values
# for the "previous" and "next" links.
populate_pulp(self.cfg, urljoin(FILE_MANY_FIXTURE_URL, 'PULP_MANIFEST'))
sample_size = min(FILE_MANY_FIXTURE_COUNT, 21)
contents = sample(self.client.get(FILE_CONTENT_PATH), sample_size)
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
def add_content():
"""Repeatedly pop an item from ``contents``, and add to repo."""
while True:
try:
content = contents.pop()
self.client.post(
repo['_versions_href'],
{'add_content_units': [content['_href']]}
)
except IndexError:
break
threads = tuple(Thread(target=add_content) for _ in range(8))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Verify pagination works for getting repo versions.
repo = self.client.get(repo['_href'])
repo_versions = get_versions(repo, {'page_size': 10})
self.assertEqual(len(repo_versions), sample_size, repo_versions)
开发者ID:bmbouter,项目名称:pulp,代码行数:34,代码来源:test_pagination.py
示例12: test_sync
def test_sync(self):
"""Sync repositories with the docker plugin.
In order to sync a repository a remote has to be associated within
this repository. When a repository is created this version field is set
as None. After a sync the repository version is updated.
Do the following:
1. Create a repository, and a remote.
2. Assert that repository version is None.
3. Sync the remote.
4. Assert that repository version is not None.
5. Sync the remote one more time.
6. Assert that repository version is different from the previous one.
"""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
remote = self.client.post(DOCKER_REMOTE_PATH, gen_docker_remote())
self.addCleanup(self.client.delete, remote['_href'])
# Sync the repository.
self.assertIsNone(repo['_latest_version_href'])
sync(self.cfg, remote, repo)
repo = self.client.get(repo['_href'])
self.assertIsNotNone(repo['_latest_version_href'])
# Sync the repository again.
latest_version_href = repo['_latest_version_href']
sync(self.cfg, remote, repo)
repo = self.client.get(repo['_href'])
self.assertNotEqual(latest_version_href, repo['_latest_version_href'])
开发者ID:pulp,项目名称:pulp_docker,代码行数:33,代码来源:test_sync.py
示例13: test_all
def test_all(self):
"""Verify multi-resourcing locking.
Do the following:
1. Create a repository, and a remote.
2. Update the remote to point to a different url.
3. Immediately run a sync. The sync should fire after the update and
sync from the second url.
4. Assert that remote url was updated.
5. Assert that the number of units present in the repository is
according to the updated url.
"""
cfg = config.get_config()
client = api.Client(cfg, api.json_handler)
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
body = gen_file_remote(url=FILE_LARGE_FIXTURE_MANIFEST_URL)
remote = client.post(FILE_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
url = {'url': FILE_FIXTURE_MANIFEST_URL}
client.patch(remote['_href'], url)
sync(cfg, remote, repo)
repo = client.get(repo['_href'])
remote = client.get(remote['_href'])
self.assertEqual(remote['url'], url['url'])
self.assertDictEqual(get_content_summary(repo), FILE_FIXTURE_SUMMARY)
开发者ID:asmacdo,项目名称:pulp,代码行数:32,代码来源:test_tasking.py
示例14: populate_pulp
def populate_pulp(cfg, remote=None):
"""
Add python content to Pulp.
Args:
cfg (pulp_smash.config.PulpSmashConfig): Information about a Pulp application
remote (dict): A dict of information about the remote.
Returns:
list: A list of dicts, where each dict describes one python content in Pulp.
"""
if remote is None:
remote = gen_python_remote()
client = api.Client(cfg, api.json_handler)
repo = {}
try:
remote.update(client.post(PYTHON_REMOTE_PATH, remote))
repo.update(client.post(REPO_PATH, gen_repo()))
sync(cfg, remote, repo)
finally:
if remote:
client.delete(remote['_href'])
if repo:
client.delete(repo['_href'])
return client.get(PYTHON_CONTENT_PATH)['results']
开发者ID:pulp,项目名称:pulp_python,代码行数:26,代码来源:utils.py
示例15: test_include_unavailable_projects
def test_include_unavailable_projects(self):
"""
Test that the sync doesn't fail if some included projects aren't available.
Do the following:
1. Create a remote.
2. Sync the remote.
3. Assert that the content counts in the repo match the correct count for the specifier.
"""
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
body = gen_python_remote(includes=PYTHON_UNAVAILABLE_PROJECT_SPECIFIER)
remote = self.client.post(PYTHON_REMOTE_PATH, body)
self.addCleanup(self.client.delete, remote['_href'])
sync(self.cfg, remote, repo)
repo = self.client.get(repo['_href'])
self.assertEqual(
get_content_summary(repo)[PYTHON_CONTENT_NAME],
PYTHON_UNAVAILABLE_PACKAGE_COUNT
)
开发者ID:pulp,项目名称:pulp_python,代码行数:25,代码来源:test_sync.py
示例16: test_all
def test_all(self):
"""Test whether content unit used by a repo version can be deleted.
Do the following:
1. Sync content to a repository.
2. Attempt to delete a content unit present in a repository version.
Assert that a HTTP exception was raised.
3. Assert that number of content units present on the repository
does not change after the attempt to delete one content unit.
"""
cfg = config.get_config()
client = api.Client(cfg, api.json_handler)
body = gen_rpm_remote()
remote = client.post(RPM_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
sync(cfg, remote, repo)
repo = client.get(repo['_href'])
content = get_content(repo)
with self.assertRaises(HTTPError):
client.delete(choice(content)['_href'])
self.assertEqual(len(content), len(get_content(repo)))
开发者ID:daviddavis,项目名称:pulp_rpm,代码行数:28,代码来源:test_crud_content_unit.py
示例17: test_all
def test_all(self):
"""Test whether sync/publish for content already in Pulp."""
cfg = config.get_config()
client = api.Client(cfg, api.page_handler)
# step 1. delete orphans to assure that no content is present on disk,
# or database.
delete_orphans(cfg)
remote = client.post(
FILE_REMOTE_PATH,
gen_remote(FILE_FIXTURE_MANIFEST_URL)
)
self.addCleanup(client.delete, remote['_href'])
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
publisher = client.post(FILE_PUBLISHER_PATH, gen_publisher())
self.addCleanup(client.delete, publisher['_href'])
for _ in range(2):
sync(cfg, remote, repo)
repo = client.get(repo['_href'])
publish(cfg, publisher, repo)
开发者ID:asmacdo,项目名称:pulp,代码行数:25,代码来源:test_content_path.py
示例18: test_different_repository
def test_different_repository(self):
"""Test ``base_version`` for different repositories.
Do the following:
1. Create a new repository A and sync it.
2. Create a new repository B and a new version for this repository
specify repository A version 1 as the ``base_version``.
3. Check that repository A version 1 and repository B version 1 have
the same content.
"""
# create repo A
repo = self.create_sync_repo()
version_content = []
version_content.append(
sorted(
[
self.remove_created_key(item)
for item in get_content(repo)[FILE_CONTENT_NAME]
],
key=lambda item: item['_href'],
)
)
self.assertIsNone(get_versions(repo)[0]['base_version'])
# get repo A version 1 to be used as base_version
base_version = get_versions(repo)[0]['_href']
# create repo B
repo = self.client.post(REPO_PATH, gen_repo())
self.addCleanup(self.client.delete, repo['_href'])
# create a version for repo B using repo A version 1 as base_version
self.client.post(
repo['_versions_href'],
{'base_version': base_version}
)
repo = self.client.get(repo['_href'])
# assert that base_version of repo B points to version 1 of repo A
self.assertEqual(get_versions(repo)[0]['base_version'], base_version)
# assert that content on version 1 of repo A is equal to content on
# version 1 repo B
version_content.append(
sorted(
[
self.remove_created_key(item)
for item in get_content(repo)[FILE_CONTENT_NAME]
],
key=lambda item: item['_href'],
)
)
self.assertEqual(
version_content[0],
version_content[1],
version_content
)
开发者ID:asmacdo,项目名称:pulp,代码行数:59,代码来源:test_repo_versions.py
示例19: test_all
def test_all(self):
"""Sync two copies of the same packages, make sure we end up with only one copy.
Do the following:
1. Create a repository and a remote.
2. Sync the remote.
3. Assert that the content summary matches what is expected.
4. Create a new remote w/ using fixture containing updated errata (packages with the same
NEVRA as the existing package content, but different pkgId)
5. Sync the remote again.
6. Assert that repository version is different from the previous one but has the same
content summary.
7. Assert that the packages have changed since the last sync.
"""
client = api.Client(self.cfg, api.json_handler)
repo = client.post(REPO_PATH, gen_repo())
self.addCleanup(client.delete, repo['_href'])
# Create a remote with the unsigned RPM fixture url.
body = gen_rpm_remote(url=RPM_UNSIGNED_FIXTURE_URL)
remote = client.post(RPM_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
# Sync the repository.
self.assertIsNone(repo['_latest_version_href'])
sync(self.cfg, remote, repo)
repo = client.get(repo['_href'])
self.assertDictEqual(get_content_summary(repo), RPM_FIXTURE_CONTENT_SUMMARY)
# Save a copy of the original packages.
original_packages = {
content['errata_id']: content for content in get_content(repo)
if content['type'] == 'packages'
}
# Create a remote with a different test fixture with the same NEVRA but different digests.
body = gen_rpm_remote(url=RPM_SIGNED_FIXTURE_URL)
remote = client.post(RPM_REMOTE_PATH, body)
self.addCleanup(client.delete, remote['_href'])
# Sync the repository again.
sync(self.cfg, remote, repo)
repo = client.get(repo['_href'])
self.assertDictEqual(get_content_summary(repo), RPM_FIXTURE_CONTENT_SUMMARY)
self.assertEqual(len(get_added_content(repo)), 0)
# Test that the packages have been modified.
mutated_packages = {
content['errata_id']: content for content in get_content(repo)
if content['type'] == 'update'
}
self.assertNotEqual(mutated_packages, original_packages)
self.assertNotEqual(mutated_packages[RPM_PACKAGE_NAME]['pkgId'],
original_packages[RPM_PACKAGE_NAME]['pkgId'])
开发者ID:daviddavis,项目名称:pulp_rpm,代码行数:57,代码来源:test_sync.py
示例20: setUpClass
def setUpClass(cls):
"""Create class-wide variables.
In order to create a publisher a repository has to be created first.
"""
cls.cfg = config.get_config()
cls.client = api.Client(cls.cfg, api.json_handler)
cls.publisher = {}
cls.repo = cls.client.post(REPO_PATH, gen_repo())
开发者ID:daviddavis,项目名称:pulp_rpm,代码行数:9,代码来源:test_crud_publishers.py
注:本文中的pulp_smash.pulp3.utils.gen_repo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论