• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python models.MigrationModule类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.server.db.migrate.models.MigrationModule的典型用法代码示例。如果您正苦于以下问题:Python MigrationModule类的具体用法?Python MigrationModule怎么用?Python MigrationModule使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了MigrationModule类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_migrate

    def test_migrate(self, m_get_collection):
        """
        Test last_updated and last_override_config fields added.
        """
        collection = Mock()
        found = [
            # these three should trigger a save
            {LAST_SYNC: '2016-05-04T18:19:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
            {LAST_SYNC: '2016-05-04T18:20:01Z'},
            {},
            # this one should not trigger a save
            {LAST_OVERRIDE_CONFIG: '2016-05-04T18:20:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
        ]
        collection.find.return_value = deepcopy(found)
        m_get_collection.return_value = collection

        # test
        module = MigrationModule(MIGRATION)._module
        module.migrate()

        # validation
        m_get_collection.assert_called_once_with('repo_importers')
        collection.find.assert_called_once_with()
        self.assertTrue(LAST_UPDATED in dist for dist in collection.save.call_args_list)
        self.assertTrue(LAST_OVERRIDE_CONFIG in dist for dist in collection.save.call_args_list)
        self.assertEqual(
            len(collection.save.call_args_list), 3)
开发者ID:alexxa,项目名称:pulp,代码行数:27,代码来源:test_0027_importer_schema_change.py


示例2: test_migrate

    def test_migrate(self, m_get_collection, now_utc_datetime):
        """
        Test last_updated and last_override_config fields added.
        """
        collection = Mock()
        found = [
            {LAST_PUBLISH: '2016-05-04T18:19:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
            {LAST_PUBLISH: '2016-05-04T18:20:01Z'},
            {},
        ]
        collection.find.return_value = deepcopy(found)
        m_get_collection.return_value = collection

        # test
        module = MigrationModule(MIGRATION)._module
        module.migrate()

        # validation
        m_get_collection.assert_called_once_with('repo_distributors')
        collection.find.assert_called_once_with()
        now_utc_datetime.assert_called_once_with()
        self.assertTrue(LAST_UPDATED in dist for dist in collection.save.call_args_list)
        self.assertTrue(LAST_OVERRIDE_CONFIG in dist for dist in collection.save.call_args_list)
        self.assertEqual(
            len(collection.save.call_args_list), 2)
开发者ID:alexxa,项目名称:pulp,代码行数:25,代码来源:test_0024_distributor_schema_change.py


示例3: test_migration

 def test_migration(self):
     # setup
     collection = Bind.get_collection()
     for n in range(0, MAX_BINDINGS):
         if n % 2 == 0:
             conf = {ID: n}
         else:
             conf = None
         binding = {
             ID: n,
             CONSUMER_ID: n,
             REPO_ID: n,
             DISTRIBUTOR_ID: n,
             BINDING_CONFIG: conf,
             NOTIFY_AGENT: True,
         }
         collection.save(binding, safe=True)
     # migrate
     module = MigrationModule(MIGRATION)._module
     module.migrate()
     # verify
     bindings = list(collection.find({}))
     self.assertEqual(len(bindings), MAX_BINDINGS)
     for binding in bindings:
         conf = binding[BINDING_CONFIG]
         bind_id = binding[ID]
         if bind_id % 2 == 0:
             # untouched
             self.assertEqual(conf, {ID: bind_id})
         else:
             # fixed
             self.assertEqual(conf, {})
开发者ID:credativ,项目名称:pulp,代码行数:32,代码来源:test_0006_binding_config.py


示例4: test_upgrade_idempotency

    def test_upgrade_idempotency(self):
        """
        Simplest way to check the migration can run twice is simply to run it twice. The
        primary goal is to make sure an exception isn't raised.
        """

        # Setup
        coll = Bind.get_collection()

        for counter in range(0, 3):
            bind_dict = {
                'consumer_id': 'consumer_%s' % counter,
                'repo_id': 'repo_%s' % counter,
                'distributor_id': 'distributor_%s' % counter,
            }

            coll.insert(bind_dict)

        # Test
        module = MigrationModule('pulp.server.db.migrations.0003_bind_additions')._module
        module.migrate()
        module.migrate()

        # Verify
        bindings = coll.find()
        for b in bindings:
            self.assertTrue('notify_agent' in b)
            self.assertEqual(b['notify_agent'], True)
            self.assertTrue('binding_config' in b)
            self.assertEqual(b['binding_config'], None)
开发者ID:jeremycline,项目名称:pulp,代码行数:30,代码来源:test_0003_bind_additions.py


示例5: test_migrate

    def test_migrate(self, distributor, parse_iso8601_datetime):
        collection = Mock()
        found = [
            {LAST_PUBLISH: '2015-04-28T18:19:01Z'},
            {LAST_PUBLISH: datetime.now()},
            {LAST_PUBLISH: '2015-04-28T18:20:01Z'},
            {LAST_PUBLISH: datetime.now()},
        ]
        parsed = [1, 2]
        collection.find.return_value = deepcopy(found)
        distributor.get_collection.return_value = collection
        parse_iso8601_datetime.side_effect = parsed

        # test
        module = MigrationModule(MIGRATION)._module
        module.migrate()

        # validation
        distributor.get_collection.assert_called_once_with()
        collection.find.assert_called_once_with()
        self.assertEqual(
            parse_iso8601_datetime.call_args_list,
            [
                call(found[0][LAST_PUBLISH]),
                call(found[2][LAST_PUBLISH]),
            ])
        self.assertEqual(
            collection.save.call_args_list,
            [
                call({LAST_PUBLISH: parsed[0]}, safe=True),
                call({LAST_PUBLISH: parsed[1]}, safe=True)
            ])
开发者ID:credativ,项目名称:pulp,代码行数:32,代码来源:test_0017_distributor_list_published.py


示例6: test_update_called

    def test_update_called(self, mock_get_collection):
        module = MigrationModule('pulp.server.db.migrations.0002_rename_http_notifier')._module
        module.migrate()

        # make sure the correct mongo query is being passed down
        mock_get_collection.return_value.update.assert_called_once_with(
            {'notifier_type_id': 'rest-api'}, {'$set': {'notifier_type_id': 'http'}}
        )
开发者ID:jeremycline,项目名称:pulp,代码行数:8,代码来源:test_0002_rename_http_notifier.py


示例7: test_del_agent_queues_deletes_all_existing_queues

    def test_del_agent_queues_deletes_all_existing_queues(self):
        self.fake_broker.getQueue.side_effect = [Mock(), Mock()]

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._del_agent_queues(self.fake_broker)

        expected_calls = [call(self.fake_broker, 'dog'), call(self.fake_broker, 'cat')]
        self.mock_del_queue_catch_queue_in_use_exc.assert_has_calls(expected_calls)
开发者ID:ipanova,项目名称:pulp,代码行数:9,代码来源:test_0009_qpid_queues.py


示例8: test_del_agent_queues_skips_existing_queues

    def test_del_agent_queues_skips_existing_queues(self):
        self.fake_broker.getQueue.side_effect = [None, None]

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._del_agent_queues(self.fake_broker)

        self.fake_broker.getQueue.assert_has_calls([call('dog'), call('cat')])
        self.assertTrue(not self.mock_del_queue_catch_queue_in_use_exc.called)
开发者ID:ipanova,项目名称:pulp,代码行数:9,代码来源:test_0009_qpid_queues.py


示例9: test_time_to_utc_on_collection_skips_utc

    def test_time_to_utc_on_collection_skips_utc(self, mock_connection):
        migration = MigrationModule(MIGRATION)._module
        collection = mock_connection.get_collection.return_value
        unit = {'bar': '2014-07-09T11:09:07Z'}
        collection.find.return_value = [unit]

        migration.update_time_to_utc_on_collection('foo', 'bar')
        mock_connection.get_collection.assert_called_once_with('foo')
        self.assertFalse(collection.save.called)
开发者ID:alanoe,项目名称:pulp,代码行数:9,代码来源:test_migration_0010.py


示例10: test_migrate_agent_queues

    def test_migrate_agent_queues(self, fake_add_agent_queues, fake_del_agent_queues):
        fake_broker = Mock()

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._migrate_agent_queues(fake_broker)

        # validation
        fake_add_agent_queues.assert_called_with(fake_broker)
        fake_del_agent_queues.assert_called_with(fake_broker)
开发者ID:jbennett7,项目名称:pulp,代码行数:10,代码来源:test_migration_0009.py


示例11: test_migrate

 def test_migrate(self, mock_connection):
     """
     Test the schema change happens like it should.
     """
     role_schema = copy.deepcopy(CURRENT)
     migration = MigrationModule(MIGRATION)._module
     collection = mock_connection.get_collection.return_value
     collection.find.return_value = role_schema
     migration.migrate()
     self.assertEquals(role_schema, TARGET)
开发者ID:alanoe,项目名称:pulp,代码行数:10,代码来源:test_migration_0013.py


示例12: test_idempotence

 def test_idempotence(self, mock_connection):
     """
     Test the idempotence of the migration
     """
     role_schema = copy.deepcopy(TARGET)
     migration = MigrationModule(MIGRATION)._module
     collection = mock_connection.get_collection.return_value
     collection.find.return_value = role_schema
     migration.migrate()
     self.assertFalse(collection.save.called)
     self.assertEquals(role_schema, TARGET)
开发者ID:alanoe,项目名称:pulp,代码行数:11,代码来源:test_migration_0013.py


示例13: test_time_to_utc_on_collection

    def test_time_to_utc_on_collection(self, mock_connection):
        migration = MigrationModule(MIGRATION)._module
        collection = mock_connection.get_collection.return_value
        unit = {'bar': '2014-07-09T11:09:07-04:00'}
        utc_value = '2014-07-09T15:09:07Z'
        collection.find.return_value = [unit]

        migration.update_time_to_utc_on_collection('foo', 'bar')

        mock_connection.get_collection.assert_called_once_with('foo')
        self.assertEquals(unit['bar'], utc_value)
        collection.save.assert_called_once_with(unit)
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_migration_0010.py


示例14: test_migrate

    def test_migrate(self, mock_connection, mock_update):
        """
        Verify that only known & valid collections are updated
        """
        migration = MigrationModule(MIGRATION)._module
        collection_list = ['repo_distributors']

        mock_connection.get_database.return_value.collection_names.return_value = collection_list

        migration.migrate()

        mock_update.assert_called_once_with('repo_distributors', 'last_publish')
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_migration_0010.py


示例15: test_migrate_reply_queue_not_found_does_not_delete_or_add_reply_queue

    def test_migrate_reply_queue_not_found_does_not_delete_or_add_reply_queue(self):
        fake_broker = Mock()
        fake_broker.getQueue.return_value = None

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._migrate_reply_queue(fake_broker)

        # validation
        fake_broker.getQueue.assert_called_with(Services.REPLY_QUEUE)
        self.assertTrue(not self.mock_del_queue_catch_queue_in_use_exc.called)
        self.assertTrue(not fake_broker.addQueue.called)
开发者ID:ipanova,项目名称:pulp,代码行数:12,代码来源:test_0009_qpid_queues.py


示例16: test_migrate_reply_queue_not_found

    def test_migrate_reply_queue_not_found(self):
        fake_queue = Mock()
        fake_broker = Mock()
        fake_broker.getQueue.return_value = None

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._migrate_reply_queue(fake_broker)

        # validation
        fake_broker.getQueue.assert_called_with(Services.REPLY_QUEUE)
        self.assertFalse(fake_broker.called)
        self.assertFalse(fake_broker.called)
开发者ID:jbennett7,项目名称:pulp,代码行数:13,代码来源:test_migration_0009.py


示例17: test_migrate

 def test_migrate(self, mock_connection):
     """
     Test the schema change happens like it should.
     """
     permissions_schema = [{"resource": "/", "id": "5356d55b37382030f4a80b5e",
                            "users": {"admin": [0, 1, 2, 3, 4]}}]
     new_schema = [{"resource": "/", "id": "5356d55b37382030f4a80b5e",
                   "users": [{"username": "admin", "permissions": [0, 1, 2, 3, 4]}]}]
     migration = MigrationModule(MIGRATION)._module
     collection = mock_connection.get_collection.return_value
     collection.find.return_value = permissions_schema
     migration.migrate()
     self.assertEquals(permissions_schema, new_schema)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:13,代码来源:test_migration_0011.py


示例18: test_add_agent_queues_cat_only

    def test_add_agent_queues_cat_only(self, fake_get):
        fake_collection = Mock()
        fake_collection.find = Mock(return_value=[{'id': 'dog'}, {'id': 'cat'}])
        fake_get.return_value = fake_collection
        fake_broker = Mock()
        fake_broker.getQueue.side_effect = [None, Mock()]

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._add_agent_queues(fake_broker)

        fake_broker.getQueue.assert_any('pulp.agent.dog')
        fake_broker.getQueue.assert_any('pulp.agent.cat')
        fake_broker.addQueue.assert_called__once_with('pulp.agent.dog', durable=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:14,代码来源:test_migration_0009.py


示例19: test_del_agent_queues_cat_only

    def test_del_agent_queues_cat_only(self, fake_get):
        fake_collection = Mock()
        fake_collection.find = Mock(return_value=[{'id': 'dog'}, {'id': 'cat'}])
        fake_get.return_value = fake_collection
        fake_broker = Mock()
        fake_broker.getQueue.side_effect = [None, Mock()]

        # test
        migration = MigrationModule(MIGRATION)._module
        migration._del_agent_queues(fake_broker)

        fake_broker.getQueue.assert_any('dog')
        fake_broker.getQueue.assert_any('cat')
        fake_broker.delQueue.assert_calle_with('cat')
开发者ID:jbennett7,项目名称:pulp,代码行数:14,代码来源:test_migration_0009.py


示例20: test

 def test(self):
     # migrate
     module = MigrationModule(MIGRATION)._module
     module.migrate()
     # validation
     for collection in [connection.get_collection(n) for n in TEST_COLLECTIONS]:
         for unit in collection.find({}):
             self.assertTrue(LAST_UPDATED in unit)
             unit_id = unit[ID]
             last_updated = unit[LAST_UPDATED]
             if unit_id % 2 == 0:
                 self.assertEqual(last_updated, 1)
             else:
                 self.assertTrue(isinstance(last_updated, float))
开发者ID:credativ,项目名称:pulp,代码行数:14,代码来源:test_0005_unit_last_updated.py



注:本文中的pulp.server.db.migrate.models.MigrationModule类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python model.TaskStatus类代码示例发布时间:2022-05-25
下一篇:
Python models.get_migration_packages函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap