• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python factory.content_upload_manager函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.managers.factory.content_upload_manager函数的典型用法代码示例。如果您正苦于以下问题:Python content_upload_manager函数的具体用法?Python content_upload_manager怎么用?Python content_upload_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了content_upload_manager函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """
        # Setup
        factory.initialize()

        # Test
        self.assertTrue(isinstance(factory.authentication_manager(), AuthenticationManager))
        self.assertTrue(isinstance(factory.cert_generation_manager(), CertGenerationManager))
        self.assertTrue(isinstance(factory.certificate_manager(), CertificateManager))
        self.assertTrue(isinstance(factory.password_manager(), PasswordManager))
        self.assertTrue(isinstance(factory.permission_manager(), PermissionManager))
        self.assertTrue(isinstance(factory.permission_query_manager(), PermissionQueryManager))
        self.assertTrue(isinstance(factory.role_manager(), RoleManager))
        self.assertTrue(isinstance(factory.role_query_manager(), RoleQueryManager))
        self.assertTrue(isinstance(factory.user_manager(), UserManager))
        self.assertTrue(isinstance(factory.user_query_manager(), UserQueryManager))
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(),
                                   RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
        self.assertTrue(isinstance(factory.topic_publish_manager(), TopicPublishManager))
开发者ID:credativ,项目名称:pulp,代码行数:29,代码来源:test_factory.py


示例2: put

    def put(self, request, upload_id, offset):
        """
        Upload to a specific file upload.

        :param request:   WSGI request object, body contains bits to upload
        :type  request:   django.core.handlers.wsgi.WSGIRequest
        :param upload_id: id of the initialized upload
        :type  upload_id: str
        :param offset:    place in the uploaded file to start writing
        :type  offset:    str of an integer
        :return:          response containing null
        :rtype:           django.http.HttpResponse

        :raises:          pulp.server.exceptions.MissingResource if upload ID does not exist
        :raises:          InvalidValue if offset cannot be converted to an integer
        """

        try:
            offset = int(offset)
        except ValueError:
            raise InvalidValue(["offset"])

        upload_manager = factory.content_upload_manager()

        # If the upload ID doesn't exists, either because it was not initialized
        # or was deleted, the call to the manager will raise missing resource
        upload_manager.save_data(upload_id, offset, request.body)
        return generate_json_response(None)
开发者ID:credativ,项目名称:pulp,代码行数:28,代码来源:content.py


示例3: setUp

    def setUp(self):
        base.PulpServerTests.setUp(self)
        mock_plugins.install()

        self.upload_manager = manager_factory.content_upload_manager()
        self.repo_manager = manager_factory.repo_manager()
        self.importer_manager = manager_factory.repo_importer_manager()
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:7,代码来源:test_server_managers_content_upload.py


示例4: POST

    def POST(self, repo_id):
        """
        Import an uploaded unit into the given repository.

        :param repo_id: The id of the repository the upload should be imported into
        :type  repo_id: basestring
        :return:        A json serialized dictionary with two keys. 'success_flag' indexes a boolean
                        value that indicates whether the import was successful, and 'summary' will
                        contain the summary as reported by the Importer.
        :rtype:         basestring
        """
        # Collect user input
        params = self.params()
        upload_id = params['upload_id']
        unit_type_id = params['unit_type_id']
        unit_key = params['unit_key']
        unit_metadata = params.pop('unit_metadata', None)

        # Coordinator configuration
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
                action_tag('import_upload')]

        upload_manager = manager_factory.content_upload_manager()
        call_request = CallRequest(upload_manager.import_uploaded_unit,
            [repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
            tags=tags, archive=True)
        call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)

        report = execution.execute(call_request)
        return self.ok(report)
开发者ID:ashcrow,项目名称:pulp,代码行数:30,代码来源:repositories.py


示例5: POST

    def POST(self, repo_id):

        # Collect user input
        params = self.params()
        upload_id = params["upload_id"]
        unit_type_id = params["unit_type_id"]
        unit_key = params["unit_key"]
        unit_metadata = params.pop("unit_metadata", None)

        # Coordinator configuration
        resources = {
            dispatch_constants.RESOURCE_REPOSITORY_TYPE: {repo_id: dispatch_constants.RESOURCE_UPDATE_OPERATION}
        }
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id), action_tag("import_upload")]

        upload_manager = manager_factory.content_upload_manager()
        call_request = CallRequest(
            upload_manager.import_uploaded_unit,
            [repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
            resources=resources,
            tags=tags,
            archive=True,
        )

        return execution.execute_ok(self, call_request)
开发者ID:ehelms,项目名称:pulp,代码行数:25,代码来源:repositories.py


示例6: setUp

    def setUp(self):
        super(BaseUploadTest, self).setUp()
        self.upload_manager = manager_factory.content_upload_manager()

        upload_storage_dir = self.upload_manager._upload_storage_dir()

        if os.path.exists(upload_storage_dir):
            shutil.rmtree(upload_storage_dir)
        os.makedirs(upload_storage_dir)

        dummy_plugins.install()
开发者ID:ashcrow,项目名称:pulp,代码行数:11,代码来源:test_contents_controller.py


示例7: get

    def get(self, request):
        """
        Return a serialized response containing a dict with a list of upload_ids.

        :param request: WSGI request object
        :type request: django.core.handlers.wsgi.WSGIRequest

        :return: Serialized response containing a list of upload ids
        :rtype: django.http.HttpResponse
        """
        upload_manager = factory.content_upload_manager()
        upload_ids = upload_manager.list_upload_ids()
        return generate_json_response({"upload_ids": upload_ids})
开发者ID:credativ,项目名称:pulp,代码行数:13,代码来源:content.py


示例8: setUp

    def setUp(self):
        base.PulpServerTests.setUp(self)
        mock_plugins.install()

        self.upload_manager = manager_factory.content_upload_manager()
        self.repo_manager = manager_factory.repo_manager()
        self.importer_manager = manager_factory.repo_importer_manager()

        upload_storage_dir = self.upload_manager._upload_storage_dir()

        if os.path.exists(upload_storage_dir):
            shutil.rmtree(upload_storage_dir)
        os.makedirs(upload_storage_dir)
开发者ID:ryanschneider,项目名称:pulp,代码行数:13,代码来源:test_content_upload_manager.py


示例9: delete

    def delete(self, request, upload_id):
        """
        Delete a single upload.

        :param request: WSGI request object
        :type  request: django.core.handlers.wsgi.WSGIRequest
        :param upload_id: id of the upload to be deleted
        :type  upload_id: str

        :return: response with None
        :rtype: django.http.HttpResponse
        """
        upload_manager = factory.content_upload_manager()
        upload_manager.delete_upload(upload_id)
        return generate_json_response(None)
开发者ID:credativ,项目名称:pulp,代码行数:15,代码来源:content.py


示例10: post

    def post(self, request, *args, **kwargs):
        """
        Initialize an upload and return a serialized dict containing the upload data.

        :param request: WSGI request object
        :type request: django.core.handlers.wsgi.WSGIRequest
        :return : Serialized response containing a url to delete an upload and a unique id.
        :rtype : django.http.HttpResponse
        """
        upload_manager = factory.content_upload_manager()
        upload_id = upload_manager.initialize_upload()
        href = reverse("content_upload_resource", kwargs={"upload_id": upload_id})
        response = generate_json_response({"_href": href, "upload_id": upload_id})
        response_redirect = generate_redirect_response(response, href)
        return response_redirect
开发者ID:credativ,项目名称:pulp,代码行数:15,代码来源:content.py


示例11: PUT

    def PUT(self, upload_id, offset):

        # If the upload ID doesn't exists, either because it was not initialized
        # or was deleted, the call to the manager will raise missing resource

        try:
            offset = int(offset)
        except ValueError:
            raise InvalidValue(['offset'])

        upload_manager = factory.content_upload_manager()
        data = self.data()
        upload_manager.save_data(upload_id, offset, data)

        return self.ok(None)
开发者ID:jlsherrill,项目名称:pulp,代码行数:15,代码来源:contents.py


示例12: test_syntactic_sugar_methods

    def test_syntactic_sugar_methods(self):
        """
        Tests the syntactic sugar methods for retrieving specific managers.
        """

        # Test
        self.assertTrue(isinstance(factory.repo_manager(), RepoManager))
        self.assertTrue(isinstance(factory.repo_unit_association_manager(), RepoUnitAssociationManager))
        self.assertTrue(isinstance(factory.repo_publish_manager(), RepoPublishManager))
        self.assertTrue(isinstance(factory.repo_query_manager(), RepoQueryManager))
        self.assertTrue(isinstance(factory.repo_sync_manager(), RepoSyncManager))
        self.assertTrue(isinstance(factory.content_manager(), ContentManager))
        self.assertTrue(isinstance(factory.content_query_manager(), ContentQueryManager))
        self.assertTrue(isinstance(factory.content_upload_manager(), ContentUploadManager))
        self.assertTrue(isinstance(factory.consumer_manager(), ConsumerManager))
开发者ID:stpierre,项目名称:pulp,代码行数:15,代码来源:test_manager_factory.py


示例13: POST

    def POST(self, repo_id):

        # Collect user input
        params = self.params()
        upload_id = params['upload_id']
        unit_type_id = params['unit_type_id']
        unit_key = params['unit_key']
        unit_metadata = params.pop('unit_metadata', None)

        # Coordinator configuration
        tags = [resource_tag(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id),
                action_tag('import_upload')]

        upload_manager = manager_factory.content_upload_manager()
        call_request = CallRequest(upload_manager.import_uploaded_unit,
            [repo_id, unit_type_id, unit_key, unit_metadata, upload_id],
            tags=tags, archive=True)
        call_request.updates_resource(dispatch_constants.RESOURCE_REPOSITORY_TYPE, repo_id)

        execution.execute(call_request)
        return self.ok(None)
开发者ID:hennessy80,项目名称:pulp,代码行数:21,代码来源:repositories.py


示例14: DELETE

    def DELETE(self, upload_id):
        upload_manager = factory.content_upload_manager()
        upload_manager.delete_upload(upload_id)

        return self.ok(None)
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py


示例15: POST

 def POST(self):
     upload_manager = factory.content_upload_manager()
     upload_id = upload_manager.initialize_upload()
     location = serialization.link.child_link_obj(upload_id)
     return self.created(location['_href'], {'_href' : location['_href'], 'upload_id' : upload_id})
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py


示例16: GET

    def GET(self):
        upload_manager = factory.content_upload_manager()
        upload_ids = upload_manager.list_upload_ids()

        return self.ok({'upload_ids' : upload_ids})
开发者ID:jlsherrill,项目名称:pulp,代码行数:5,代码来源:contents.py



注:本文中的pulp.server.managers.factory.content_upload_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python factory.initialize函数代码示例发布时间:2022-05-25
下一篇:
Python factory.content_query_manager函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap