本文整理汇总了Python中pulp.server.managers.factory.content_upload_manager函数的典型用法代码示例。如果您正苦于以下问题:Python content_upload_manager函数的具体用法?Python content_upload_manager怎么用?Python content_upload_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了content_upload_manager函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_syntactic_sugar_methods
def test_syntactic_sugar_methods(self):
"""
Tests the syntactic sugar methods for retrieving specific managers.
"""
# Setup
factory.initialize()
# Test
self.assertTrue(isinstance(factory.authentication_manager(), AuthenticationManager))
self.assertTrue(isinstance(factory.cert_generation_manager(), CertGenerationManager))
self.assertTrue(isinstance(factory.certificate_manager(), CertificateManager))
self.assertTrue(isinstance(factory.password_manager(), PasswordManager))
self.assertTrue(isinstance(factory.permission_manager(), PermissionManager))
self.assertTrue(isinstance(factory.permission_query_manager(), PermissionQueryManager))
self.assertTrue(isinstance(factory.role_manager(), RoleManager))
self.assertTrue(isinstance(factory.role_query_manager(), RoleQueryManager))
self.assertTrue(isinstance(factory.user_manager(), UserManager))
self.assertTrue(isinstance(factory.user_query_manager(), UserQueryManager))
self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
self.assertTrue(isinstance(factory.repo_unit_association_manager(),
RepoUnitAssociationManager))
self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
self.assertTrue(isinstance(factory.content_manager(), ContentManager))
self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
self.assertTrue(isinstance(factory.topic_publish_manager(), TopicPublishManager))
开发者ID:credativ,项目名称:pulp,代码行数:29,代码来源:test_factory.py
示例2: put
def put(self, request, upload_id, offset):
"""
Upload to a specific file upload.
:param request: WSGI request object, body contains bits to upload
:type request: django.core.handlers.wsgi.WSGIRequest
:param upload_id: id of the initialized upload
:type upload_id: str
:param offset: place in the uploaded file to start writing
:type offset: str of an integer
:return: response containing null
:rtype: django.http.HttpResponse
:raises: pulp.server.exceptions.MissingResource if upload ID does not exist
:raises: InvalidValue if offset cannot be converted to an integer
"""
try:
offset = int(offset)
except ValueError:
raise InvalidValue(["offset"])
upload_manager = factory.content_upload_manager()
# If the upload ID doesn't exists, either because it was not initialized
# or was deleted, the call to the manager will raise missing resource
upload_manager.save_data(upload_id, offset, request.body)
return generate_json_response(None)
开发者ID:credativ,项目名称:pulp,代码行数:28,代码来源:content.py
示例3: setUp
def setUp(self):
base.PulpServerTests.setUp(self)
mock_plugins.install()
self.upload_manager = manager_factory.content_upload_manager()
self.repo_manager = manager_factory.repo_manager()
self.importer_manager = manager_factory.repo_importer_manager()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:test_server_managers_content_upload.py
示例4: POST
def POST(self, repo_id):
"""
Import an uploaded unit into the given repository.
:param repo_id: The id of the repository the upload should be imported into
:type repo_id: basestring
:return: A json serialized dictionary with two keys. 'success_flag' indexes a boolean
value that indicates whether the import was successful, and 'summary' will
contain the summary as reported by the Importer.
:rtype: basestring
"""
# Collect user input
params = self.params()
upload_id = params['upload_id']
unit_type_id = params['unit_type_id']
unit_key = params['unit_key']
unit_metadata = params.pop('unit_metadata', None)
# Coordinator configuration
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag('import_upload')]
upload_manager = manager_factory.content_upload_manager()
call_request = CallRequest(upload_manager.import_uploaded_unit,
[repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
tags=tags, archive=True)
call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
report = execution.execute(call_request)
return self.ok(report)
开发者ID:ashcrow,项目名称:pulp,代码行数:30,代码来源:repositories.py
示例5: POST
def POST(self, repo_id):
# Collect user input
params = self.params()
upload_id = params["upload_id"]
unit_type_id = params["unit_type_id"]
unit_key = params["unit_key"]
unit_metadata = params.pop("unit_metadata", None)
# Coordinator configuration
resources = {
dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}
}
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id), action_tag("import_upload")]
upload_manager = manager_factory.content_upload_manager()
call_request = CallRequest(
upload_manager.import_uploaded_unit,
[repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
resources=resources,
tags=tags,
archive=True,
)
return execution.execute_ok(self, call_request)
开发者ID:ehelms,项目名称:pulp,代码行数:25,代码来源:repositories.py
示例6: setUp
def setUp(self):
super(BaseUploadTest, self).setUp()
self.upload_manager = manager_factory.content_upload_manager()
upload_storage_dir = self.upload_manager._upload_storage_dir()
if os.path.exists(upload_storage_dir):
shutil.rmtree(upload_storage_dir)
os.makedirs(upload_storage_dir)
dummy_plugins.install()
开发者ID:ashcrow,项目名称:pulp,代码行数:11,代码来源:test_contents_controller.py
示例7: get
def get(self, request):
"""
Return a serialized response containing a dict with a list of upload_ids.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:return: Serialized response containing a list of upload ids
:rtype: django.http.HttpResponse
"""
upload_manager = factory.content_upload_manager()
upload_ids = upload_manager.list_upload_ids()
return generate_json_response({"upload_ids": upload_ids})
开发者ID:credativ,项目名称:pulp,代码行数:13,代码来源:content.py
示例8: setUp
def setUp(self):
base.PulpServerTests.setUp(self)
mock_plugins.install()
self.upload_manager = manager_factory.content_upload_manager()
self.repo_manager = manager_factory.repo_manager()
self.importer_manager = manager_factory.repo_importer_manager()
upload_storage_dir = self.upload_manager._upload_storage_dir()
if os.path.exists(upload_storage_dir):
shutil.rmtree(upload_storage_dir)
os.makedirs(upload_storage_dir)
开发者ID:ryanschneider,项目名称:pulp,代码行数:13,代码来源:test_content_upload_manager.py
示例9: delete
def delete(self, request, upload_id):
"""
Delete a single upload.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:param upload_id: id of the upload to be deleted
:type upload_id: str
:return: response with None
:rtype: django.http.HttpResponse
"""
upload_manager = factory.content_upload_manager()
upload_manager.delete_upload(upload_id)
return generate_json_response(None)
开发者ID:credativ,项目名称:pulp,代码行数:15,代码来源:content.py
示例10: post
def post(self, request, *args, **kwargs):
"""
Initialize an upload and return a serialized dict containing the upload data.
:param request: WSGI request object
:type request: django.core.handlers.wsgi.WSGIRequest
:return : Serialized response containing a url to delete an upload and a unique id.
:rtype : django.http.HttpResponse
"""
upload_manager = factory.content_upload_manager()
upload_id = upload_manager.initialize_upload()
href = reverse("content_upload_resource", kwargs={"upload_id": upload_id})
response = generate_json_response({"_href": href, "upload_id": upload_id})
response_redirect = generate_redirect_response(response, href)
return response_redirect
开发者ID:credativ,项目名称:pulp,代码行数:15,代码来源:content.py
示例11: PUT
def PUT(self, upload_id, offset):
# If the upload ID doesn't exists, either because it was not initialized
# or was deleted, the call to the manager will raise missing resource
try:
offset = int(offset)
except ValueError:
raise InvalidValue(['offset'])
upload_manager = factory.content_upload_manager()
data = self.data()
upload_manager.save_data(upload_id, offset, data)
return self.ok(None)
开发者ID:jlsherrill,项目名称:pulp,代码行数:15,代码来源:contents.py
示例12: test_syntactic_sugar_methods
def test_syntactic_sugar_methods(self):
"""
Tests the syntactic sugar methods for retrieving specific managers.
"""
# Test
self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
self.assertTrue(isinstance(factory.repo_unit_association_manager(), RepoUnitAssociationManager))
self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
self.assertTrue(isinstance(factory.content_manager(), ContentManager))
self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
开发者ID:stpierre,项目名称:pulp,代码行数:15,代码来源:test_manager_factory.py
示例13: POST
def POST(self, repo_id):
# Collect user input
params = self.params()
upload_id = params['upload_id']
unit_type_id = params['unit_type_id']
unit_key = params['unit_key']
unit_metadata = params.pop('unit_metadata', None)
# Coordinator configuration
tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
action_tag('import_upload')]
upload_manager = manager_factory.content_upload_manager()
call_request = CallRequest(upload_manager.import_uploaded_unit,
[repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
tags=tags, archive=True)
call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)
execution.execute(call_request)
return self.ok(None)
开发者ID:hennessy80,项目名称:pulp,代码行数:21,代码来源:repositories.py
示例14: DELETE
def DELETE(self, upload_id):
upload_manager = factory.content_upload_manager()
upload_manager.delete_upload(upload_id)
return self.ok(None)
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py
示例15: POST
def POST(self):
upload_manager = factory.content_upload_manager()
upload_id = upload_manager.initialize_upload()
location = serialization.link.child_link_obj(upload_id)
return self.created(location['_href'], {'_href' : location['_href'], 'upload_id' : upload_id})
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py
示例16: GET
def GET(self):
upload_manager = factory.content_upload_manager()
upload_ids = upload_manager.list_upload_ids()
return self.ok({'upload_ids' : upload_ids})
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py
注:本文中的pulp.server.managers.factory.content_upload_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论