本文整理汇总了Python中pulp.server.db.migrate.models.MigrationModule类的典型用法代码示例。如果您正苦于以下问题:Python MigrationModule类的具体用法?Python MigrationModule怎么用?Python MigrationModule使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了MigrationModule类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_migrate
def test_migrate(self, m_get_collection):
"""
Test last_updated and last_override_config fields added.
"""
collection = Mock()
found = [
# these three should trigger a save
{LAST_SYNC: '2016-05-04T18:19:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
{LAST_SYNC: '2016-05-04T18:20:01Z'},
{},
# this one should not trigger a save
{LAST_OVERRIDE_CONFIG: '2016-05-04T18:20:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
]
collection.find.return_value = deepcopy(found)
m_get_collection.return_value = collection
# test
module = MigrationModule(MIGRATION)._module
module.migrate()
# validation
m_get_collection.assert_called_once_with('repo_importers')
collection.find.assert_called_once_with()
self.assertTrue(LAST_UPDATED in dist for dist in collection.save.call_args_list)
self.assertTrue(LAST_OVERRIDE_CONFIG in dist for dist in collection.save.call_args_list)
self.assertEqual(
len(collection.save.call_args_list), 3)
开发者ID:alexxa,项目名称:pulp,代码行数:27,代码来源:test_0027_importer_schema_change.py
示例2: test_migrate
def test_migrate(self, m_get_collection, now_utc_datetime):
"""
Test last_updated and last_override_config fields added.
"""
collection = Mock()
found = [
{LAST_PUBLISH: '2016-05-04T18:19:01Z', LAST_UPDATED: '2016-05-03T18:19:01Z'},
{LAST_PUBLISH: '2016-05-04T18:20:01Z'},
{},
]
collection.find.return_value = deepcopy(found)
m_get_collection.return_value = collection
# test
module = MigrationModule(MIGRATION)._module
module.migrate()
# validation
m_get_collection.assert_called_once_with('repo_distributors')
collection.find.assert_called_once_with()
now_utc_datetime.assert_called_once_with()
self.assertTrue(LAST_UPDATED in dist for dist in collection.save.call_args_list)
self.assertTrue(LAST_OVERRIDE_CONFIG in dist for dist in collection.save.call_args_list)
self.assertEqual(
len(collection.save.call_args_list), 2)
开发者ID:alexxa,项目名称:pulp,代码行数:25,代码来源:test_0024_distributor_schema_change.py
示例3: test_migration
def test_migration(self):
# setup
collection = Bind.get_collection()
for n in range(0, MAX_BINDINGS):
if n % 2 == 0:
conf = {ID: n}
else:
conf = None
binding = {
ID: n,
CONSUMER_ID: n,
REPO_ID: n,
DISTRIBUTOR_ID: n,
BINDING_CONFIG: conf,
NOTIFY_AGENT: True,
}
collection.save(binding, safe=True)
# migrate
module = MigrationModule(MIGRATION)._module
module.migrate()
# verify
bindings = list(collection.find({}))
self.assertEqual(len(bindings), MAX_BINDINGS)
for binding in bindings:
conf = binding[BINDING_CONFIG]
bind_id = binding[ID]
if bind_id % 2 == 0:
# untouched
self.assertEqual(conf, {ID: bind_id})
else:
# fixed
self.assertEqual(conf, {})
开发者ID:credativ,项目名称:pulp,代码行数:32,代码来源:test_0006_binding_config.py
示例4: test_upgrade_idempotency
def test_upgrade_idempotency(self):
"""
Simplest way to check the migration can run twice is simply to run it twice. The
primary goal is to make sure an exception isn't raised.
"""
# Setup
coll = Bind.get_collection()
for counter in range(0, 3):
bind_dict = {
'consumer_id': 'consumer_%s' % counter,
'repo_id': 'repo_%s' % counter,
'distributor_id': 'distributor_%s' % counter,
}
coll.insert(bind_dict)
# Test
module = MigrationModule('pulp.server.db.migrations.0003_bind_additions')._module
module.migrate()
module.migrate()
# Verify
bindings = coll.find()
for b in bindings:
self.assertTrue('notify_agent' in b)
self.assertEqual(b['notify_agent'], True)
self.assertTrue('binding_config' in b)
self.assertEqual(b['binding_config'], None)
开发者ID:jeremycline,项目名称:pulp,代码行数:30,代码来源:test_0003_bind_additions.py
示例5: test_migrate
def test_migrate(self, distributor, parse_iso8601_datetime):
collection = Mock()
found = [
{LAST_PUBLISH: '2015-04-28T18:19:01Z'},
{LAST_PUBLISH: datetime.now()},
{LAST_PUBLISH: '2015-04-28T18:20:01Z'},
{LAST_PUBLISH: datetime.now()},
]
parsed = [1, 2]
collection.find.return_value = deepcopy(found)
distributor.get_collection.return_value = collection
parse_iso8601_datetime.side_effect = parsed
# test
module = MigrationModule(MIGRATION)._module
module.migrate()
# validation
distributor.get_collection.assert_called_once_with()
collection.find.assert_called_once_with()
self.assertEqual(
parse_iso8601_datetime.call_args_list,
[
call(found[0][LAST_PUBLISH]),
call(found[2][LAST_PUBLISH]),
])
self.assertEqual(
collection.save.call_args_list,
[
call({LAST_PUBLISH: parsed[0]}, safe=True),
call({LAST_PUBLISH: parsed[1]}, safe=True)
])
开发者ID:credativ,项目名称:pulp,代码行数:32,代码来源:test_0017_distributor_list_published.py
示例6: test_update_called
def test_update_called(self, mock_get_collection):
module = MigrationModule('pulp.server.db.migrations.0002_rename_http_notifier')._module
module.migrate()
# make sure the correct mongo query is being passed down
mock_get_collection.return_value.update.assert_called_once_with(
{'notifier_type_id': 'rest-api'}, {'$set': {'notifier_type_id': 'http'}}
)
开发者ID:jeremycline,项目名称:pulp,代码行数:8,代码来源:test_0002_rename_http_notifier.py
示例7: test_del_agent_queues_deletes_all_existing_queues
def test_del_agent_queues_deletes_all_existing_queues(self):
self.fake_broker.getQueue.side_effect = [Mock(), Mock()]
# test
migration = MigrationModule(MIGRATION)._module
migration._del_agent_queues(self.fake_broker)
expected_calls = [call(self.fake_broker, 'dog'), call(self.fake_broker, 'cat')]
self.mock_del_queue_catch_queue_in_use_exc.assert_has_calls(expected_calls)
开发者ID:ipanova,项目名称:pulp,代码行数:9,代码来源:test_0009_qpid_queues.py
示例8: test_del_agent_queues_skips_existing_queues
def test_del_agent_queues_skips_existing_queues(self):
self.fake_broker.getQueue.side_effect = [None, None]
# test
migration = MigrationModule(MIGRATION)._module
migration._del_agent_queues(self.fake_broker)
self.fake_broker.getQueue.assert_has_calls([call('dog'), call('cat')])
self.assertTrue(not self.mock_del_queue_catch_queue_in_use_exc.called)
开发者ID:ipanova,项目名称:pulp,代码行数:9,代码来源:test_0009_qpid_queues.py
示例9: test_time_to_utc_on_collection_skips_utc
def test_time_to_utc_on_collection_skips_utc(self, mock_connection):
migration = MigrationModule(MIGRATION)._module
collection = mock_connection.get_collection.return_value
unit = {'bar': '2014-07-09T11:09:07Z'}
collection.find.return_value = [unit]
migration.update_time_to_utc_on_collection('foo', 'bar')
mock_connection.get_collection.assert_called_once_with('foo')
self.assertFalse(collection.save.called)
开发者ID:alanoe,项目名称:pulp,代码行数:9,代码来源:test_migration_0010.py
示例10: test_migrate_agent_queues
def test_migrate_agent_queues(self, fake_add_agent_queues, fake_del_agent_queues):
fake_broker = Mock()
# test
migration = MigrationModule(MIGRATION)._module
migration._migrate_agent_queues(fake_broker)
# validation
fake_add_agent_queues.assert_called_with(fake_broker)
fake_del_agent_queues.assert_called_with(fake_broker)
开发者ID:jbennett7,项目名称:pulp,代码行数:10,代码来源:test_migration_0009.py
示例11: test_migrate
def test_migrate(self, mock_connection):
"""
Test the schema change happens like it should.
"""
role_schema = copy.deepcopy(CURRENT)
migration = MigrationModule(MIGRATION)._module
collection = mock_connection.get_collection.return_value
collection.find.return_value = role_schema
migration.migrate()
self.assertEquals(role_schema, TARGET)
开发者ID:alanoe,项目名称:pulp,代码行数:10,代码来源:test_migration_0013.py
示例12: test_idempotence
def test_idempotence(self, mock_connection):
"""
Test the idempotence of the migration
"""
role_schema = copy.deepcopy(TARGET)
migration = MigrationModule(MIGRATION)._module
collection = mock_connection.get_collection.return_value
collection.find.return_value = role_schema
migration.migrate()
self.assertFalse(collection.save.called)
self.assertEquals(role_schema, TARGET)
开发者ID:alanoe,项目名称:pulp,代码行数:11,代码来源:test_migration_0013.py
示例13: test_time_to_utc_on_collection
def test_time_to_utc_on_collection(self, mock_connection):
migration = MigrationModule(MIGRATION)._module
collection = mock_connection.get_collection.return_value
unit = {'bar': '2014-07-09T11:09:07-04:00'}
utc_value = '2014-07-09T15:09:07Z'
collection.find.return_value = [unit]
migration.update_time_to_utc_on_collection('foo', 'bar')
mock_connection.get_collection.assert_called_once_with('foo')
self.assertEquals(unit['bar'], utc_value)
collection.save.assert_called_once_with(unit)
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_migration_0010.py
示例14: test_migrate
def test_migrate(self, mock_connection, mock_update):
"""
Verify that only known & valid collections are updated
"""
migration = MigrationModule(MIGRATION)._module
collection_list = ['repo_distributors']
mock_connection.get_database.return_value.collection_names.return_value = collection_list
migration.migrate()
mock_update.assert_called_once_with('repo_distributors', 'last_publish')
开发者ID:alanoe,项目名称:pulp,代码行数:12,代码来源:test_migration_0010.py
示例15: test_migrate_reply_queue_not_found_does_not_delete_or_add_reply_queue
def test_migrate_reply_queue_not_found_does_not_delete_or_add_reply_queue(self):
fake_broker = Mock()
fake_broker.getQueue.return_value = None
# test
migration = MigrationModule(MIGRATION)._module
migration._migrate_reply_queue(fake_broker)
# validation
fake_broker.getQueue.assert_called_with(Services.REPLY_QUEUE)
self.assertTrue(not self.mock_del_queue_catch_queue_in_use_exc.called)
self.assertTrue(not fake_broker.addQueue.called)
开发者ID:ipanova,项目名称:pulp,代码行数:12,代码来源:test_0009_qpid_queues.py
示例16: test_migrate_reply_queue_not_found
def test_migrate_reply_queue_not_found(self):
fake_queue = Mock()
fake_broker = Mock()
fake_broker.getQueue.return_value = None
# test
migration = MigrationModule(MIGRATION)._module
migration._migrate_reply_queue(fake_broker)
# validation
fake_broker.getQueue.assert_called_with(Services.REPLY_QUEUE)
self.assertFalse(fake_broker.called)
self.assertFalse(fake_broker.called)
开发者ID:jbennett7,项目名称:pulp,代码行数:13,代码来源:test_migration_0009.py
示例17: test_migrate
def test_migrate(self, mock_connection):
"""
Test the schema change happens like it should.
"""
permissions_schema = [{"resource": "/", "id": "5356d55b37382030f4a80b5e",
"users": {"admin": [0, 1, 2, 3, 4]}}]
new_schema = [{"resource": "/", "id": "5356d55b37382030f4a80b5e",
"users": [{"username": "admin", "permissions": [0, 1, 2, 3, 4]}]}]
migration = MigrationModule(MIGRATION)._module
collection = mock_connection.get_collection.return_value
collection.find.return_value = permissions_schema
migration.migrate()
self.assertEquals(permissions_schema, new_schema)
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:13,代码来源:test_migration_0011.py
示例18: test_add_agent_queues_cat_only
def test_add_agent_queues_cat_only(self, fake_get):
fake_collection = Mock()
fake_collection.find = Mock(return_value=[{'id': 'dog'}, {'id': 'cat'}])
fake_get.return_value = fake_collection
fake_broker = Mock()
fake_broker.getQueue.side_effect = [None, Mock()]
# test
migration = MigrationModule(MIGRATION)._module
migration._add_agent_queues(fake_broker)
fake_broker.getQueue.assert_any('pulp.agent.dog')
fake_broker.getQueue.assert_any('pulp.agent.cat')
fake_broker.addQueue.assert_called__once_with('pulp.agent.dog', durable=True)
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:14,代码来源:test_migration_0009.py
示例19: test_del_agent_queues_cat_only
def test_del_agent_queues_cat_only(self, fake_get):
fake_collection = Mock()
fake_collection.find = Mock(return_value=[{'id': 'dog'}, {'id': 'cat'}])
fake_get.return_value = fake_collection
fake_broker = Mock()
fake_broker.getQueue.side_effect = [None, Mock()]
# test
migration = MigrationModule(MIGRATION)._module
migration._del_agent_queues(fake_broker)
fake_broker.getQueue.assert_any('dog')
fake_broker.getQueue.assert_any('cat')
fake_broker.delQueue.assert_calle_with('cat')
开发者ID:jbennett7,项目名称:pulp,代码行数:14,代码来源:test_migration_0009.py
示例20: test
def test(self):
# migrate
module = MigrationModule(MIGRATION)._module
module.migrate()
# validation
for collection in [connection.get_collection(n) for n in TEST_COLLECTIONS]:
for unit in collection.find({}):
self.assertTrue(LAST_UPDATED in unit)
unit_id = unit[ID]
last_updated = unit[LAST_UPDATED]
if unit_id % 2 == 0:
self.assertEqual(last_updated, 1)
else:
self.assertTrue(isinstance(last_updated, float))
开发者ID:credativ,项目名称:pulp,代码行数:14,代码来源:test_0005_unit_last_updated.py
注:本文中的pulp.server.db.migrate.models.MigrationModule类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论