• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python distributor.ISODistributor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp_rpm.plugins.distributors.export_distributor.distributor.ISODistributor的典型用法代码示例。如果您正苦于以下问题:Python ISODistributor类的具体用法?Python ISODistributor怎么用?Python ISODistributor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了ISODistributor类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_publish_repo

    def test_publish_repo(self, mock_validate, export_publisher, mock_ensure_downloaded):

        mock_validate.return_value = (True, None)
        distributor = ISODistributor()
        export_publisher.return_value = mock.Mock()
        export_publisher.return_value.process_lifecycle.return_value = 'foo'

        self.assertEquals('foo', distributor.publish_repo(self.repo, self.conduit, self.config))
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:8,代码来源:test_distributor.py


示例2: test_publish_repo

    def test_publish_repo(self, mock_validate, export_publisher):

        mock_validate.return_value = (True, None)
        distributor = ISODistributor()
        export_publisher.return_value = mock.Mock()
        export_publisher.return_value.publish.return_value = 'foo'

        self.assertEquals('foo', distributor.publish_repo(self.repo, self.conduit, self.config))
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:8,代码来源:test_distributor.py


示例3: test_cancel_publish_repo

    def test_cancel_publish_repo(self):
        """
        Test cancel_publish_repo, which is not currently fully supported
        """
        distributor = ISODistributor()
        distributor._publisher = mock.Mock()

        distributor.cancel_publish_repo()

        self.assertTrue(distributor._publisher.cancel.called)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:10,代码来源:test_distributor.py


示例4: test_cancel_publish_repo

    def test_cancel_publish_repo(self, mock_cancel_createrepo):
        """
        Test cancel_publish_repo, which is not currently fully supported
        """
        distributor = ISODistributor()
        distributor.working_dir = '/working/dir'

        distributor.cancel_publish_repo()

        mock_cancel_createrepo.assert_called_once_with(distributor.working_dir)
开发者ID:preethit,项目名称:pulp_rpm,代码行数:10,代码来源:test_distributor.py


示例5: test_set_progress

    def test_set_progress(self):
        """
        Test set_progress, which simply checks if the progress_callback is None before calling it
        """
        # Setup
        mock_callback = mock.Mock()
        distributor = ISODistributor()

        # Test
        distributor.set_progress('id', 'status', mock_callback)
        mock_callback.assert_called_once_with('id', 'status')
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:11,代码来源:test_distributor.py


示例6: TestPublishIsos

class TestPublishIsos(unittest.TestCase):
    """
    Tests the _publish_isos method in GroupISODistributor. This method decides what the publishing
    directories should be, cleans them up, and hands everything off to the publish_iso method in
    export_utils.
    """
    def setUp(self):
        self.distributor = ISODistributor()
        self.repo = Repository('repo_id', working_dir='/working/dir')
        self.config = {PUBLISH_HTTP_KEYWORD: True, PUBLISH_HTTPS_KEYWORD: True}

        self.publish_iso = export_utils.publish_isos
        export_utils.publish_isos = mock.Mock()

    def tearDown(self):
        export_utils.publish_isos = self.publish_iso

    @mock.patch('shutil.rmtree', autospec=True)
    def test_publish_isos(self, mock_rmtree):
        """
        Test that publish_isos is called with the expected arguments
        """
        # Setup
        http_publish_dir = os.path.join(EXPORT_HTTP_DIR, self.repo.id)
        https_publish_dir = os.path.join(EXPORT_HTTPS_DIR, self.repo.id)

        # Test
        self.distributor._publish_isos(self.repo, PluginCallConfiguration({}, self.config))
        self.assertEqual(2, mock_rmtree.call_count)
        self.assertEqual(http_publish_dir, mock_rmtree.call_args_list[0][0][0])
        self.assertEqual(https_publish_dir, mock_rmtree.call_args_list[1][0][0])
        export_utils.publish_isos.assert_called_once_with(self.repo.working_dir, self.repo.id,
                                                          http_publish_dir, https_publish_dir, None,
                                                          None)

    @mock.patch('shutil.rmtree', autospec=True)
    def test_publish_http_https_false(self, mock_rmtree):
        """
        Test that when the config has publishing http and https set to false, publish_isos is called
        with None for https_dir and http_dir
        """
        # Setup
        self.config[PUBLISH_HTTPS_KEYWORD] = False
        self.config[PUBLISH_HTTP_KEYWORD] = False
        self.distributor._publish_isos(self.repo, PluginCallConfiguration({}, self.config))
        http_publish_dir = os.path.join(EXPORT_HTTP_DIR, self.repo.id)
        https_publish_dir = os.path.join(EXPORT_HTTPS_DIR, self.repo.id)

        # Test
        self.assertEqual(2, mock_rmtree.call_count)
        self.assertEqual(http_publish_dir, mock_rmtree.call_args_list[0][0][0])
        self.assertEqual(https_publish_dir, mock_rmtree.call_args_list[1][0][0])
        export_utils.publish_isos.assert_called_once_with(self.repo.working_dir, self.repo.id, None,
                                                          None, None, None)
开发者ID:preethit,项目名称:pulp_rpm,代码行数:54,代码来源:test_distributor.py


示例7: test_set_progress_no_callback

    def test_set_progress_no_callback(self):
        """
        Assert that set_progress don't not attempt to call the callback when it is None
        """
        # Setup
        distributor = ISODistributor()

        # Test
        try:
            distributor.set_progress('id', 'status', None)
        except AttributeError:
            self.fail('set_progress should not try to call None')
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:12,代码来源:test_distributor.py


示例8: setUp

    def setUp(self):
        self.distributor = ISODistributor()
        self.repo = Repository('repo_id', working_dir='/working/dir')
        self.config = {PUBLISH_HTTP_KEYWORD: True, PUBLISH_HTTPS_KEYWORD: True}

        self.publish_iso = export_utils.publish_isos
        export_utils.publish_isos = mock.Mock()
开发者ID:preethit,项目名称:pulp_rpm,代码行数:7,代码来源:test_distributor.py


示例9: test_validate_config

    def test_validate_config(self):
        """
        Test the validate_config method in ISODistributor, which just hands the config off to a helper
        method in export_utils
        """
        # Setup
        validate_config = export_utils.validate_export_config
        export_utils.validate_export_config = mock.MagicMock()
        distributor = ISODistributor()

        # Test. All validate_config should do is hand the config argument to the export_utils validator
        distributor.validate_config(None, 'config', None)
        export_utils.validate_export_config.assert_called_once_with('config')

        # Clean up
        export_utils.validate_export_config = validate_config
开发者ID:pombredanne,项目名称:rcm-pulp-rpm,代码行数:16,代码来源:test_export_distributor.py


示例10: test_metadata

 def test_metadata(self):
     """
     Test the overridden metadata method in ISODistributor
     """
     metadata = ISODistributor.metadata()
     expected_types = [TYPE_ID_RPM, TYPE_ID_SRPM, TYPE_ID_DRPM, TYPE_ID_ERRATA,
                       TYPE_ID_DISTRO, TYPE_ID_PKG_CATEGORY, TYPE_ID_PKG_GROUP]
     self.assertEquals(metadata['id'], TYPE_ID_DISTRIBUTOR_EXPORT)
     self.assertEqual(set(expected_types), set(metadata['types']))
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:9,代码来源:test_distributor.py


示例11: test_distributor_removed

    def test_distributor_removed(self, mock_configuration):
        master_dir = os.path.join(self.working_dir, 'master')
        http_dir = os.path.join(self.working_dir, 'http')
        https_dir = os.path.join(self.working_dir, 'https')
        repo_working_dir = os.path.join(self.working_dir, 'repodir')
        self.repo.working_dir = repo_working_dir

        mock_configuration.get_master_publish_dir.return_value = master_dir
        mock_configuration.HTTP_EXPORT_DIR = http_dir
        mock_configuration.HTTPS_EXPORT_DIR = https_dir
        http_dir = os.path.join(http_dir, self.repo.id)
        https_dir = os.path.join(https_dir, self.repo.id)
        os.makedirs(http_dir)
        os.makedirs(https_dir)
        os.makedirs(master_dir)
        os.makedirs(repo_working_dir)

        distributor = ISODistributor()
        distributor.distributor_removed(self.repo, self.config)
        self.assertFalse(os.path.exists(http_dir))
        self.assertFalse(os.path.exists(https_dir))
        self.assertFalse(os.path.exists(master_dir))
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:22,代码来源:test_distributor.py


示例12: TestPublishRepo

class TestPublishRepo(unittest.TestCase):
    """
    Tests publish_repo in ISODistributor
    """
    def setUp(self):
        self.config_dict = {
            PUBLISH_HTTP_KEYWORD: False,
            PUBLISH_HTTPS_KEYWORD: True
        }

        # Set up the distributor
        self.distributor = ISODistributor()
        self.distributor._publish_isos = mock.Mock(spec=ISODistributor._publish_isos)

        # Arguments for the distributor
        self.repo = Repository(id='repo-id', working_dir='/working/dir')
        self.mock_conduit = mock.Mock(spec=RepoPublishConduit)
        self.config = PluginCallConfiguration({}, self.config_dict)

        # It's difficult to mock patch the export_utils, so do it here.
        self.cleanup_working_dir = export_utils.cleanup_working_dir
        self.validate_export_config = export_utils.validate_export_config
        self.export_complete_repo = export_utils.export_complete_repo
        self.export_incremental = export_utils.export_incremental_content
        self.retrieve_repo_config = export_utils.retrieve_repo_config
        self.generate_listing_files = util.generate_listing_files
        self.rmtree = shutil.rmtree
        self.makdirs = os.makedirs

        export_utils.cleanup_working_dir = mock.Mock(spec=export_utils.cleanup_working_dir)
        export_utils.validate_export_config = mock.Mock(return_value=(True, None))
        export_utils.export_complete_repo = mock.Mock(return_value=({}, {'errors': []}))
        export_utils.export_incremental_content = mock.Mock(return_value=({}, {'errors': ()}))
        export_utils.retrieve_repo_config = mock.Mock(return_value=('/working/dir/repo', None))
        util.generate_listing_files = mock.Mock()
        shutil.rmtree = mock.Mock(spec=shutil.rmtree)
        os.makedirs = mock.Mock(spec=os.makedirs)

    def tearDown(self):
        export_utils.cleanup_working_dir = self.cleanup_working_dir
        export_utils.validate_export_config = self.validate_export_config
        export_utils.export_complete_repo = self.export_complete_repo
        export_utils.export_incremental_content = self.export_incremental
        export_utils.retrieve_repo_config = self.retrieve_repo_config
        util.generate_listing_files = self.generate_listing_files
        shutil.rmtree = self.rmtree
        os.makedirs = self.makdirs

    def test_failed_override_config(self):
        """
        Tests that when invalid override configuration is given, an exception is raised.
        """
        # Setup
        export_utils.validate_export_config.return_value = (False, 'failed validation')

        # Test
        self.assertRaises(PulpDataException, self.distributor.publish_repo, self.repo,
                          self.mock_conduit, self.config)

    def test_working_dir_cleanup(self):
        """
        Check that the working directory is cleaned before use. This is done because the ISOs are
        currently stored there
        """
        self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
        shutil.rmtree.assert_called_once_with(self.repo.working_dir, ignore_errors=True)
        os.makedirs.assert_called_once_with(self.repo.working_dir)

    def test_export_with_export_dir(self):
        """
        Test that _publish_isos isn't called when there is an export directory in the config, and that
        the correct working directory is used.
        """
        # Set the config to have an export directory
        self.config_dict[EXPORT_DIRECTORY_KEYWORD] = '/my/export/dir'
        config = PluginCallConfiguration({}, self.config_dict)

        # Test
        self.distributor.publish_repo(self.repo, self.mock_conduit, config)
        self.assertEqual(0, self.distributor._publish_isos.call_count)
        self.assertEqual(1, self.mock_conduit.build_success_report.call_count)

    def test_export_iso_publish(self):
        """
        Test that _publish_iso gets called when an export dir isn't in the config
        """
        self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
        self.assertEqual(1, self.distributor._publish_isos.call_count)
        self.assertEqual(self.repo, self.distributor._publish_isos.call_args[0][0])
        self.assertEqual(self.config, self.distributor._publish_isos.call_args[0][1])
        self.assertEqual(1, self.mock_conduit.build_success_report.call_count)

    def test_export_complete_repo(self):
        """
        Test that when a date filter doesn't exist, export_complete_repo is called
        """
        self.distributor.publish_repo(self.repo, self.mock_conduit, self.config)
        self.assertEqual(1, export_utils.export_complete_repo.call_count)
        self.assertEqual('repo-id', export_utils.export_complete_repo.call_args[0][0])
        self.assertEqual('/working/dir/repo', export_utils.export_complete_repo.call_args[0][1])
#.........这里部分代码省略.........
开发者ID:preethit,项目名称:pulp_rpm,代码行数:101,代码来源:test_distributor.py



注:本文中的pulp_rpm.plugins.distributors.export_distributor.distributor.ISODistributor类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python export_utils.validate_export_config函数代码示例发布时间:2022-05-25
下一篇:
Python repolib.bind函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap