• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python repository.RepoPublishResult类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.db.model.repository.RepoPublishResult的典型用法代码示例。如果您正苦于以下问题:Python RepoPublishResult类的具体用法?Python RepoPublishResult怎么用?Python RepoPublishResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了RepoPublishResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_publish_history_ascending_sort

    def test_publish_history_ascending_sort(self):
        """
        Tests use the sort parameter to sort the results in ascending order by start time
        """

        # Setup
        self.repo_manager.create_repo('test_sort')
        self.distributor_manager.add_distributor('test_sort', 'mock-distributor', {}, True,
                                                 distributor_id='test_dist')
        # Create some consecutive publish entries
        date_string = '2013-06-01T12:00:0%sZ'
        for i in range(0, 10, 2):
            r = RepoPublishResult.expected_result(
                'test_sort', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
                'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Test that returned entries are in ascending order by time
        entries = self.publish_manager.publish_history('test_sort', 'test_dist',
                                                       sort=constants.SORT_ASCENDING)
        self.assertEqual(5, len(entries))
        for i in range(0, 4):
            first = dateutils.parse_iso8601_datetime(entries[i]['started'])
            second = dateutils.parse_iso8601_datetime(entries[i + 1]['started'])
            self.assertTrue(first < second)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py


示例2: test_publish_history_start_date

    def test_publish_history_start_date(self):

        # Setup
        self.repo_manager.create_repo('test_date')
        self.distributor_manager.add_distributor('test_date', 'mock-distributor', {}, True,
                                                 distributor_id='test_dist')
        # Create three consecutive publish entries
        date_string = '2013-06-01T12:00:0%sZ'
        for i in range(0, 6, 2):
            r = RepoPublishResult.expected_result(
                'test_date', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
                'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Verify
        self.assertEqual(3, len(self.publish_manager.publish_history('test_date', 'test_dist')))
        start_date = '2013-06-01T12:00:02Z'
        start_entries = self.publish_manager.publish_history('test_date', 'test_dist',
                                                             start_date=start_date)
        # Confirm the dates of the retrieved entries are later than or equal to the requested date
        self.assertEqual(2, len(start_entries))
        for entries in start_entries:
            retrieved = dateutils.parse_iso8601_datetime(entries['started'])
            given_start = dateutils.parse_iso8601_datetime(start_date)
            self.assertTrue(retrieved >= given_start)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py


示例3: _do_publish

    def _do_publish(repo, distributor_id, distributor_instance, transfer_repo, conduit,
                    call_config):

        distributor_coll = RepoDistributor.get_collection()
        publish_result_coll = RepoPublishResult.get_collection()
        repo_id = repo['id']

        # Perform the publish
        publish_start_timestamp = _now_timestamp()
        try:
            # Add the register_sigterm_handler decorator to the publish_repo call, so that we can
            # respond to signals by calling the Distributor's cancel_publish_repo() method.
            publish_repo = register_sigterm_handler(
                distributor_instance.publish_repo, distributor_instance.cancel_publish_repo)
            publish_report = publish_repo(transfer_repo, conduit, call_config)
        except Exception, e:
            publish_end_timestamp = _now_timestamp()

            # Reload the distributor in case the scratchpad is set by the plugin
            repo_distributor = distributor_coll.find_one(
                {'repo_id' : repo_id, 'id' : distributor_id})
            repo_distributor['last_publish'] = publish_end_timestamp
            distributor_coll.save(repo_distributor, safe=True)

            # Add a publish history entry for the run
            result = RepoPublishResult.error_result(
                repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
                publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
            publish_result_coll.save(result, safe=True)

            logger.exception(
                _('Exception caught from plugin during publish for repo [%(r)s]' % {'r' : repo_id}))
            raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:preethit,项目名称:pulp-1,代码行数:33,代码来源:publish.py


示例4: test_publish_history_descending_sort

    def test_publish_history_descending_sort(self):
        """
        Tests use the sort parameter to sort the results in descending order by start time
        """

        # Setup
        self.repo_manager.create_repo("test_sort")
        self.distributor_manager.add_distributor("test_sort", "mock-distributor", {}, True, distributor_id="test_dist")
        # Create some consecutive publish entries
        date_string = "2013-06-01T12:00:0%sZ"
        for i in range(0, 10, 2):
            r = RepoPublishResult.expected_result(
                "test_sort",
                "test_dist",
                "bar",
                date_string % str(i),
                date_string % str(i + 1),
                "test-summary",
                "test-details",
                RepoPublishResult.RESULT_SUCCESS,
            )
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Test that returned entries are in descending order by time
        entries = self.publish_manager.publish_history("test_sort", "test_dist", sort=constants.SORT_DESCENDING)
        self.assertEqual(5, len(entries))
        for i in range(0, 4):
            first = dateutils.parse_iso8601_datetime(entries[i]["started"])
            second = dateutils.parse_iso8601_datetime(entries[i + 1]["started"])
            self.assertTrue(first > second)
开发者ID:jdob,项目名称:pulp,代码行数:30,代码来源:test_repo_publish_manager.py


示例5: test_publish_history_end_date

    def test_publish_history_end_date(self):

        # Setup
        self.repo_manager.create_repo("test_date")
        self.distributor_manager.add_distributor("test_date", "mock-distributor", {}, True, distributor_id="test_dist")
        # Create three consecutive publish entries
        date_string = "2013-06-01T12:00:0%sZ"
        for i in range(0, 6, 2):
            r = RepoPublishResult.expected_result(
                "test_date",
                "test_dist",
                "bar",
                date_string % str(i),
                date_string % str(i + 1),
                "test-summary",
                "test-details",
                RepoPublishResult.RESULT_SUCCESS,
            )
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Verify that all entries retrieved have dates prior to the given end date
        end_date = "2013-06-01T12:00:03Z"
        end_entries = self.publish_manager.publish_history("test_date", "test_dist", end_date=end_date)
        # Confirm the dates of the retrieved entries are earlier than or equal to the requested date
        self.assertEqual(2, len(end_entries))
        for entries in end_entries:
            retrieved = dateutils.parse_iso8601_datetime(entries["started"])
            given_end = dateutils.parse_iso8601_datetime(end_date)
            self.assertTrue(retrieved <= given_end)
开发者ID:jdob,项目名称:pulp,代码行数:29,代码来源:test_repo_publish_manager.py


示例6: _do_publish

    def _do_publish(self, repo, distributor_id, distributor_instance, transfer_repo, conduit, call_config):

        distributor_coll = RepoDistributor.get_collection()
        publish_result_coll = RepoPublishResult.get_collection()
        repo_id = repo['id']

        # Perform the publish
        publish_start_timestamp = _now_timestamp()
        try:
            publish_report = distributor_instance.publish_repo(transfer_repo, conduit, call_config)
        except Exception, e:
            publish_end_timestamp = _now_timestamp()

            # Reload the distributor in case the scratchpad is set by the plugin
            repo_distributor = distributor_coll.find_one({'repo_id' : repo_id, 'id' : distributor_id})
            repo_distributor['last_publish'] = publish_end_timestamp
            distributor_coll.save(repo_distributor, safe=True)

            # Add a publish history entry for the run
            result = RepoPublishResult.error_result(repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
                                                    publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
            publish_result_coll.save(result, safe=True)

            _LOG.exception(_('Exception caught from plugin during publish for repo [%(r)s]' % {'r' : repo_id}))
            raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:fdammeke,项目名称:pulp,代码行数:25,代码来源:publish.py


示例7: add_result

def add_result(repo_id, dist_id, offset):
    started = dateutils.now_utc_datetime_with_tzinfo()
    completed = started + datetime.timedelta(days=offset)
    r = RepoPublishResult.expected_result(
        repo_id, dist_id, 'bar', dateutils.format_iso8601_datetime(started),
        dateutils.format_iso8601_datetime(completed), 'test-summary', 'test-details',
        RepoPublishResult.RESULT_SUCCESS)
    RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:credativ,项目名称:pulp,代码行数:8,代码来源:test_publish.py


示例8: _do_publish

def _do_publish(repo_obj, dist_id, dist_inst, transfer_repo, conduit, call_config):
    """
    Publish the repository using the given distributor.

    :param repo_obj: repository object
    :type  repo_obj: pulp.server.db.model.Repository
    :param dist_id: identifies the distributor
    :type  dist_id: str
    :param dist_inst: instance of the distributor
    :type  dist_inst: dict
    :param transfer_repo: dict representation of a repo for the plugins to use
    :type  transfer_repo: pulp.plugins.model.Repository
    :param conduit: allows the plugin to interact with core pulp
    :type  conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
    :param call_config: allows the plugin to retrieve values
    :type  call_config: pulp.plugins.config.PluginCallConfiguration

    :return: publish result containing information about the publish
    :rtype:  pulp.server.db.model.repository.RepoPublishResult

    :raises pulp_exceptions.PulpCodedException: if the publish report's success flag is falsey
    """
    distributor_coll = RepoDistributor.get_collection()
    publish_result_coll = RepoPublishResult.get_collection()
    publish_start_timestamp = _now_timestamp()
    try:
        # Add the register_sigterm_handler decorator to the publish_repo call, so that we can
        # respond to signals by calling the Distributor's cancel_publish_repo() method.
        publish_repo = register_sigterm_handler(dist_inst.publish_repo,
                                                dist_inst.cancel_publish_repo)
        publish_report = publish_repo(transfer_repo, conduit, call_config)
        if publish_report is not None and hasattr(publish_report, 'success_flag') \
                and not publish_report.success_flag:
            raise pulp_exceptions.PulpCodedException(
                error_code=error_codes.PLP0034, repository_id=repo_obj.repo_id,
                distributor_id=dist_id
            )

    except Exception, e:
        publish_end_timestamp = _now_timestamp()

        # Reload the distributor in case the scratchpad is set by the plugin
        repo_distributor = distributor_coll.find_one(
            {'repo_id': repo_obj.repo_id, 'id': dist_id})
        distributor_coll.save(repo_distributor, safe=True)

        # Add a publish history entry for the run
        result = RepoPublishResult.error_result(
            repo_obj.repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
            publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
        publish_result_coll.save(result, safe=True)

        _logger.exception(
            _('Exception caught from plugin during publish for repo [%(r)s]'
              % {'r': repo_obj.repo_id}))
        raise
开发者ID:zjhuntin,项目名称:pulp,代码行数:56,代码来源:repository.py


示例9: add_result

def add_result(repo_id, dist_id, offset):
    started = datetime.datetime.now(dateutils.local_tz())
    completed = started + datetime.timedelta(days=offset)
    r = RepoPublishResult.expected_result(
        repo_id,
        dist_id,
        "bar",
        dateutils.format_iso8601_datetime(started),
        dateutils.format_iso8601_datetime(completed),
        "test-summary",
        "test-details",
        RepoPublishResult.RESULT_SUCCESS,
    )
    RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:jdob,项目名称:pulp,代码行数:14,代码来源:test_repo_publish_manager.py


示例10: test_publish_failure_report

    def test_publish_failure_report(self):
        """
        Tests a publish call that indicates a graceful failure.
        """
        # Setup
        publish_config = {'foo' : 'bar'}
        self.repo_manager.create_repo('repo-1')
        self.distributor_manager.add_distributor('repo-1', 'mock-distributor', publish_config, False, distributor_id='dist-1')

        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = PublishReport(False, 'Summary of the publish', 'Details of the publish')

        # Test
        self.publish_manager.publish('repo-1', 'dist-1', None)

        # Verify
        entries = list(RepoPublishResult.get_collection().find({'repo_id' : 'repo-1'}))
        self.assertEqual(1, len(entries))
        self.assertEqual('repo-1', entries[0]['repo_id'])
        self.assertEqual('dist-1', entries[0]['distributor_id'])
        self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
        self.assertTrue(entries[0]['started'] is not None)
        self.assertTrue(entries[0]['completed'] is not None)
        self.assertEqual(RepoPublishResult.RESULT_FAILED, entries[0]['result'])
        self.assertTrue(entries[0]['summary'] is not None)
        self.assertTrue(entries[0]['details'] is not None)
        self.assertTrue(entries[0]['error_message'] is None)
        self.assertTrue(entries[0]['exception'] is None)
        self.assertTrue(entries[0]['traceback'] is None)

        # Cleanup
        mock_plugins.reset()
开发者ID:hennessy80,项目名称:pulp,代码行数:31,代码来源:test_repo_publish_manager.py


示例11: __init__

    def __init__(self, repo, publish_conduit, config, distributor_type):
        """
        :param repo: Pulp managed Yum repository
        :type  repo: pulp.plugins.model.Repository
        :param publish_conduit: Conduit providing access to relative Pulp functionality
        :type  publish_conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
        :param config: Pulp configuration for the distributor
        :type  config: pulp.plugins.config.PluginCallConfiguration
        :param distributor_type: The type of the distributor that is being published
        :type distributor_type: str

        :ivar last_published: last time this distributor published the repo
        :ivar last_delete: last time a unit was removed from this repository
        :ivar repo: repository being operated on
        :ivar predistributor: distributor object that is associated with this distributor. It's
                              publish history affects the type of publish is performed
        :ivar symlink_list: list of symlinks to rsync
        :ivar content_unit_file_list: list of content units to rsync
        :ivar symlink_src: path to directory containing all symlinks
        """

        super(Publisher, self).__init__("Repository publish", repo,
                                        publish_conduit, config,
                                        distributor_type=distributor_type)

        distributor = Distributor.objects.get_or_404(repo_id=self.repo.id,
                                                     distributor_id=publish_conduit.distributor_id)
        self.last_published = distributor["last_publish"]
        self.last_deleted = repo.last_unit_removed
        self.repo = repo
        self.predistributor = self._get_predistributor()

        if self.last_published:
            string_date = dateutils.format_iso8601_datetime(self.last_published)
        else:
            string_date = None
        if self.predistributor:
            search_params = {'repo_id': repo.id,
                             'distributor_id': self.predistributor["id"],
                             'started': {"$gte": string_date}}
            self.predist_history = RepoPublishResult.get_collection().find(search_params)
        else:
            self.predist_history = []

        self.remote_path = self.get_remote_repo_path()

        if self.is_fastforward():
            start_date = self.last_published
            end_date = None
            if self.predistributor:
                end_date = self.predistributor["last_publish"]
            date_filter = self.create_date_range_filter(start_date=start_date, end_date=end_date)
        else:
            date_filter = None

        self.symlink_list = []
        self.content_unit_file_list = []
        self.symlink_src = os.path.join(self.get_working_dir(), '.relative/')

        self._add_necesary_steps(date_filter=date_filter, config=config)
开发者ID:pcreech,项目名称:pulp,代码行数:60,代码来源:publish.py


示例12: publish_history

def publish_history(start_date, end_date, repo_id, distributor_id):
    """
    Returns a cursor containing the publish history entries for the given repo and distributor.

    :param start_date: if specified, no events prior to this date will be returned.
    :type  start_date: iso8601 datetime string
    :param end_date: if specified, no events after this date will be returned.
    :type  end_date: iso8601 datetime string
    :param repo_id: identifies the repo
    :type  repo_id: str
    :param distributor_id: identifies the distributor to retrieve history for
    :type  distributor_id: str

    :return: object containing publish history results
    :rtype:  pymongo.cursor.Cursor

    :raise pulp_exceptions.MissingResource: if repo/distributor pair is invalid
    """
    model.Repository.objects.get_repo_or_missing_resource(repo_id)
    dist = RepoDistributor.get_collection().find_one({'repo_id': repo_id, 'id': distributor_id})
    if dist is None:
        raise pulp_exceptions.MissingResource(distributor_id)

    search_params = {'repo_id': repo_id, 'distributor_id': distributor_id}
    date_range = {}
    if start_date:
        date_range['$gte'] = start_date
    if end_date:
        date_range['$lte'] = end_date
    if len(date_range) > 0:
        search_params['started'] = date_range
    return RepoPublishResult.get_collection().find(search_params)
开发者ID:zjhuntin,项目名称:pulp,代码行数:32,代码来源:repository.py


示例13: test_publish_with_error

    def test_publish_with_error(self):
        """
        Tests a publish when the plugin raises an error.
        """

        # Setup
        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = Exception()

        self.repo_manager.create_repo("gonna-bail")
        self.distributor_manager.add_distributor("gonna-bail", "mock-distributor", {}, False, distributor_id="bad-dist")

        self.assertRaises(Exception, self.publish_manager.publish, "gonna-bail", "bad-dist")

        # Verify
        repo_distributor = RepoDistributor.get_collection().find_one({"repo_id": "gonna-bail", "id": "bad-dist"})

        self.assertTrue(repo_distributor is not None)
        self.assertTrue(assert_last_sync_time(repo_distributor["last_publish"]))

        entries = list(RepoPublishResult.get_collection().find({"repo_id": "gonna-bail"}))
        self.assertEqual(1, len(entries))
        self.assertEqual("gonna-bail", entries[0]["repo_id"])
        self.assertEqual("bad-dist", entries[0]["distributor_id"])
        self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
        self.assertTrue(entries[0]["started"] is not None)
        self.assertTrue(entries[0]["completed"] is not None)
        self.assertEqual(RepoPublishResult.RESULT_ERROR, entries[0]["result"])
        self.assertTrue(entries[0]["summary"] is None)
        self.assertTrue(entries[0]["details"] is None)
        self.assertTrue(entries[0]["error_message"] is not None)
        self.assertTrue(entries[0]["exception"] is not None)
        self.assertTrue(entries[0]["traceback"] is not None)

        # Cleanup
        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = None
开发者ID:lzap,项目名称:pulp,代码行数:35,代码来源:test_publish.py


示例14: test_publish_failure_report

    def test_publish_failure_report(self):
        """
        Tests a publish call that indicates a graceful failure.
        """
        # Setup
        publish_config = {"foo": "bar"}
        self.repo_manager.create_repo("repo-1")
        self.distributor_manager.add_distributor(
            "repo-1", "mock-distributor", publish_config, False, distributor_id="dist-1"
        )

        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = PublishReport(
            False, "Summary of the publish", "Details of the publish"
        )

        # Test
        self.publish_manager.publish("repo-1", "dist-1", None)

        # Verify
        entries = list(RepoPublishResult.get_collection().find({"repo_id": "repo-1"}))
        self.assertEqual(1, len(entries))
        self.assertEqual("repo-1", entries[0]["repo_id"])
        self.assertEqual("dist-1", entries[0]["distributor_id"])
        self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
        self.assertTrue(entries[0]["started"] is not None)
        self.assertTrue(entries[0]["completed"] is not None)
        self.assertEqual(RepoPublishResult.RESULT_FAILED, entries[0]["result"])
        self.assertTrue(entries[0]["summary"] is not None)
        self.assertTrue(entries[0]["details"] is not None)
        self.assertTrue(entries[0]["error_message"] is None)
        self.assertTrue(entries[0]["exception"] is None)
        self.assertTrue(entries[0]["traceback"] is None)

        # Cleanup
        mock_plugins.reset()
开发者ID:jdob,项目名称:pulp,代码行数:35,代码来源:test_repo_publish_manager.py


示例15: test_publish

    def test_publish(self, mock_finished, mock_started):
        """
        Tests publish under normal conditions when everything is configured
        correctly.
        """

        # Setup
        publish_config = {"foo": "bar"}
        self.repo_manager.create_repo("repo-1")
        self.distributor_manager.add_distributor(
            "repo-1", "mock-distributor", publish_config, False, distributor_id="dist-1"
        )
        self.distributor_manager.add_distributor(
            "repo-1", "mock-distributor-2", publish_config, False, distributor_id="dist-2"
        )

        # Test
        distributor, config = self.publish_manager._get_distributor_instance_and_config("repo-1", "dist-1")
        self.publish_manager.publish("repo-1", "dist-1", distributor, config, None)

        # Verify

        #   Database
        repo_distributor = RepoDistributor.get_collection().find_one({"repo_id": "repo-1", "id": "dist-1"})
        self.assertTrue(repo_distributor["last_publish"] is not None)
        self.assertTrue(assert_last_sync_time(repo_distributor["last_publish"]))

        #   History
        entries = list(RepoPublishResult.get_collection().find({"repo_id": "repo-1"}))
        self.assertEqual(1, len(entries))
        self.assertEqual("repo-1", entries[0]["repo_id"])
        self.assertEqual("dist-1", entries[0]["distributor_id"])
        self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
        self.assertTrue(entries[0]["started"] is not None)
        self.assertTrue(entries[0]["completed"] is not None)
        self.assertEqual(RepoPublishResult.RESULT_SUCCESS, entries[0]["result"])
        self.assertTrue(entries[0]["summary"] is not None)
        self.assertTrue(entries[0]["details"] is not None)
        self.assertTrue(entries[0]["error_message"] is None)
        self.assertTrue(entries[0]["exception"] is None)
        self.assertTrue(entries[0]["traceback"] is None)

        #   Call into the correct distributor
        call_args = mock_plugins.MOCK_DISTRIBUTOR.publish_repo.call_args[0]

        self.assertEqual("repo-1", call_args[0].id)
        self.assertTrue(call_args[1] is not None)
        self.assertEqual({}, call_args[2].plugin_config)
        self.assertEqual(publish_config, call_args[2].repo_plugin_config)
        self.assertEqual({}, call_args[2].override_config)

        self.assertEqual(0, mock_plugins.MOCK_DISTRIBUTOR_2.publish_repo.call_count)

        self.assertEqual(1, mock_started.call_count)
        self.assertEqual("repo-1", mock_started.call_args[0][0])

        self.assertEqual(1, mock_finished.call_count)
        self.assertEqual("repo-1", mock_finished.call_args[0][0]["repo_id"])
开发者ID:ehelms,项目名称:pulp,代码行数:58,代码来源:test_repo_publish_manager.py


示例16: test_publish

    def test_publish(self, mock_finished, mock_started, mock_get_working_directory, dt):
        """
        Tests publish under normal conditions when everything is configured
        correctly.
        """

        # Setup
        publish_config = {'foo': 'bar'}
        self.repo_manager.create_repo('repo-1')
        self.distributor_manager.add_distributor('repo-1', 'mock-distributor', publish_config,
                                                 False, distributor_id='dist-1')
        self.distributor_manager.add_distributor('repo-1', 'mock-distributor-2', publish_config,
                                                 False, distributor_id='dist-2')
        dt.utcnow.return_value = 1234

        # Test
        self.publish_manager.publish('repo-1', 'dist-1', None)

        # Verify

        #   Database
        repo_distributor = RepoDistributor.get_collection().find_one({'repo_id': 'repo-1',
                                                                      'id': 'dist-1'})
        self.assertTrue(repo_distributor['last_publish'] is not None)
        self.assertEqual(repo_distributor['last_publish'], dt.utcnow.return_value)

        #   History
        entries = list(RepoPublishResult.get_collection().find({'repo_id': 'repo-1'}))
        self.assertEqual(1, len(entries))
        self.assertEqual('repo-1', entries[0]['repo_id'])
        self.assertEqual('dist-1', entries[0]['distributor_id'])
        self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
        self.assertTrue(entries[0]['started'] is not None)
        self.assertTrue(entries[0]['completed'] is not None)
        self.assertEqual(RepoPublishResult.RESULT_SUCCESS, entries[0]['result'])
        self.assertTrue(entries[0]['summary'] is not None)
        self.assertTrue(entries[0]['details'] is not None)
        self.assertTrue(entries[0]['error_message'] is None)
        self.assertTrue(entries[0]['exception'] is None)
        self.assertTrue(entries[0]['traceback'] is None)

        #   Call into the correct distributor
        call_args = mock_plugins.MOCK_DISTRIBUTOR.publish_repo.call_args[0]

        self.assertEqual('repo-1', call_args[0].id)
        self.assertTrue(call_args[1] is not None)
        self.assertEqual({}, call_args[2].plugin_config)
        self.assertEqual(publish_config, call_args[2].repo_plugin_config)
        self.assertEqual({}, call_args[2].override_config)

        self.assertEqual(0, mock_plugins.MOCK_DISTRIBUTOR_2.publish_repo.call_count)

        self.assertEqual(1, mock_started.call_count)
        self.assertEqual('repo-1', mock_started.call_args[0][0])

        self.assertEqual(1, mock_finished.call_count)
        self.assertEqual('repo-1', mock_finished.call_args[0][0]['repo_id'])
开发者ID:credativ,项目名称:pulp,代码行数:57,代码来源:test_publish.py


示例17: test_publish_no_plugin_report

    def test_publish_no_plugin_report(self):
        """
        Tests publishing against a sloppy plugin that doesn't return a report.
        """

        # Setup
        self.repo_manager.create_repo('sloppy')
        self.distributor_manager.add_distributor('sloppy', 'mock-distributor', {}, True, distributor_id='slop')

        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = None # lame plugin

        # Test
        self.publish_manager.publish('sloppy', 'slop')

        # Verify
        entries = list(RepoPublishResult.get_collection().find({'repo_id' : 'sloppy'}))
        self.assertEqual(1, len(entries))
        self.assertEqual('Unknown', entries[0]['summary'])
        self.assertEqual('Unknown', entries[0]['details'])
开发者ID:hennessy80,项目名称:pulp,代码行数:19,代码来源:test_repo_publish_manager.py


示例18: test_publish_no_plugin_report

    def test_publish_no_plugin_report(self):
        """
        Tests publishing against a sloppy plugin that doesn't return a report.
        """

        # Setup
        self.repo_manager.create_repo("sloppy")
        self.distributor_manager.add_distributor("sloppy", "mock-distributor", {}, True, distributor_id="slop")

        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = None  # lame plugin

        # Test
        self.publish_manager.publish("sloppy", "slop")

        # Verify
        entries = list(RepoPublishResult.get_collection().find({"repo_id": "sloppy"}))
        self.assertEqual(1, len(entries))
        self.assertEqual("Unknown", entries[0]["summary"])
        self.assertEqual("Unknown", entries[0]["details"])
开发者ID:jdob,项目名称:pulp,代码行数:19,代码来源:test_repo_publish_manager.py


示例19: publish_history

    def publish_history(self, repo_id, distributor_id, limit=None):
        """
        Returns publish history entries for the give repo, sorted from most
        recent to oldest. If there are no entries, an empty list is returned.

        @param repo_id: identifies the repo
        @type  repo_id: str

        @param distributor_id: identifies the distributor to retrieve history for
        @type  distributor_id: str

        @param limit: maximum number of results to return
        @type  limit: int

        @return: list of publish history result instances
        @rtype:  list of L{pulp.server.db.model.repository.RepoPublishResult}

        @raise MissingResource: if repo_id does not reference a valid repo
        """

        # Validation
        repo = Repo.get_collection().find_one({'id' : repo_id})
        if repo is None:
            raise MissingResource(repo_id)

        dist = RepoDistributor.get_collection().find_one({'repo_id' : repo_id, 'id' : distributor_id})
        if dist is None:
            raise MissingResource(distributor_id)

        if limit is None:
            limit = 10 # default here for each of REST API calls into here

        # Retrieve the entries
        cursor = RepoPublishResult.get_collection().find({'repo_id' : repo_id, 'distributor_id' : distributor_id})
        cursor.limit(limit)
        cursor.sort('completed', pymongo.DESCENDING)

        return list(cursor)
开发者ID:ehelms,项目名称:pulp,代码行数:38,代码来源:publish.py


示例20: test_publish_with_error

    def test_publish_with_error(self, mock_get_working_directory):
        """
        Tests a publish when the plugin raises an error.
        """

        # Setup
        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = Exception()

        self.repo_manager.create_repo('gonna-bail')
        self.distributor_manager.add_distributor('gonna-bail', 'mock-distributor', {}, False,
                                                 distributor_id='bad-dist')

        self.assertRaises(Exception, self.publish_manager.publish,
                          'gonna-bail', 'bad-dist')

        # Verify
        repo_distributor = RepoDistributor.get_collection().find_one(
            {'repo_id': 'gonna-bail', 'id': 'bad-dist'})

        self.assertTrue(repo_distributor is not None)
        self.assertEqual(repo_distributor['last_publish'], None)

        entries = list(RepoPublishResult.get_collection().find({'repo_id': 'gonna-bail'}))
        self.assertEqual(1, len(entries))
        self.assertEqual('gonna-bail', entries[0]['repo_id'])
        self.assertEqual('bad-dist', entries[0]['distributor_id'])
        self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
        self.assertTrue(entries[0]['started'] is not None)
        self.assertTrue(entries[0]['completed'] is not None)
        self.assertEqual(RepoPublishResult.RESULT_ERROR, entries[0]['result'])
        self.assertTrue(entries[0]['summary'] is None)
        self.assertTrue(entries[0]['details'] is None)
        self.assertTrue(entries[0]['error_message'] is not None)
        self.assertTrue(entries[0]['exception'] is not None)
        self.assertTrue(entries[0]['traceback'] is not None)

        # Cleanup
        mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = None
开发者ID:credativ,项目名称:pulp,代码行数:38,代码来源:test_publish.py



注:本文中的pulp.server.db.model.repository.RepoPublishResult类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python repository.RepoSyncResult类代码示例发布时间:2022-05-25
下一篇:
Python repository.RepoImporter类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap