本文整理汇总了Python中pulp.server.db.model.repository.RepoPublishResult类的典型用法代码示例。如果您正苦于以下问题:Python RepoPublishResult类的具体用法?Python RepoPublishResult怎么用?Python RepoPublishResult使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了RepoPublishResult类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_publish_history_ascending_sort
def test_publish_history_ascending_sort(self):
"""
Tests use the sort parameter to sort the results in ascending order by start time
"""
# Setup
self.repo_manager.create_repo('test_sort')
self.distributor_manager.add_distributor('test_sort', 'mock-distributor', {}, True,
distributor_id='test_dist')
# Create some consecutive publish entries
date_string = '2013-06-01T12:00:0%sZ'
for i in range(0, 10, 2):
r = RepoPublishResult.expected_result(
'test_sort', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
# Test that returned entries are in ascending order by time
entries = self.publish_manager.publish_history('test_sort', 'test_dist',
sort=constants.SORT_ASCENDING)
self.assertEqual(5, len(entries))
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]['started'])
second = dateutils.parse_iso8601_datetime(entries[i + 1]['started'])
self.assertTrue(first < second)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py
示例2: test_publish_history_start_date
def test_publish_history_start_date(self):
# Setup
self.repo_manager.create_repo('test_date')
self.distributor_manager.add_distributor('test_date', 'mock-distributor', {}, True,
distributor_id='test_dist')
# Create three consecutive publish entries
date_string = '2013-06-01T12:00:0%sZ'
for i in range(0, 6, 2):
r = RepoPublishResult.expected_result(
'test_date', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
# Verify
self.assertEqual(3, len(self.publish_manager.publish_history('test_date', 'test_dist')))
start_date = '2013-06-01T12:00:02Z'
start_entries = self.publish_manager.publish_history('test_date', 'test_dist',
start_date=start_date)
# Confirm the dates of the retrieved entries are later than or equal to the requested date
self.assertEqual(2, len(start_entries))
for entries in start_entries:
retrieved = dateutils.parse_iso8601_datetime(entries['started'])
given_start = dateutils.parse_iso8601_datetime(start_date)
self.assertTrue(retrieved >= given_start)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py
示例3: _do_publish
def _do_publish(repo, distributor_id, distributor_instance, transfer_repo, conduit,
call_config):
distributor_coll = RepoDistributor.get_collection()
publish_result_coll = RepoPublishResult.get_collection()
repo_id = repo['id']
# Perform the publish
publish_start_timestamp = _now_timestamp()
try:
# Add the register_sigterm_handler decorator to the publish_repo call, so that we can
# respond to signals by calling the Distributor's cancel_publish_repo() method.
publish_repo = register_sigterm_handler(
distributor_instance.publish_repo, distributor_instance.cancel_publish_repo)
publish_report = publish_repo(transfer_repo, conduit, call_config)
except Exception, e:
publish_end_timestamp = _now_timestamp()
# Reload the distributor in case the scratchpad is set by the plugin
repo_distributor = distributor_coll.find_one(
{'repo_id' : repo_id, 'id' : distributor_id})
repo_distributor['last_publish'] = publish_end_timestamp
distributor_coll.save(repo_distributor, safe=True)
# Add a publish history entry for the run
result = RepoPublishResult.error_result(
repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
publish_result_coll.save(result, safe=True)
logger.exception(
_('Exception caught from plugin during publish for repo [%(r)s]' % {'r' : repo_id}))
raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:preethit,项目名称:pulp-1,代码行数:33,代码来源:publish.py
示例4: test_publish_history_descending_sort
def test_publish_history_descending_sort(self):
"""
Tests use the sort parameter to sort the results in descending order by start time
"""
# Setup
self.repo_manager.create_repo("test_sort")
self.distributor_manager.add_distributor("test_sort", "mock-distributor", {}, True, distributor_id="test_dist")
# Create some consecutive publish entries
date_string = "2013-06-01T12:00:0%sZ"
for i in range(0, 10, 2):
r = RepoPublishResult.expected_result(
"test_sort",
"test_dist",
"bar",
date_string % str(i),
date_string % str(i + 1),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
# Test that returned entries are in descending order by time
entries = self.publish_manager.publish_history("test_sort", "test_dist", sort=constants.SORT_DESCENDING)
self.assertEqual(5, len(entries))
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]["started"])
second = dateutils.parse_iso8601_datetime(entries[i + 1]["started"])
self.assertTrue(first > second)
开发者ID:jdob,项目名称:pulp,代码行数:30,代码来源:test_repo_publish_manager.py
示例5: test_publish_history_end_date
def test_publish_history_end_date(self):
# Setup
self.repo_manager.create_repo("test_date")
self.distributor_manager.add_distributor("test_date", "mock-distributor", {}, True, distributor_id="test_dist")
# Create three consecutive publish entries
date_string = "2013-06-01T12:00:0%sZ"
for i in range(0, 6, 2):
r = RepoPublishResult.expected_result(
"test_date",
"test_dist",
"bar",
date_string % str(i),
date_string % str(i + 1),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
# Verify that all entries retrieved have dates prior to the given end date
end_date = "2013-06-01T12:00:03Z"
end_entries = self.publish_manager.publish_history("test_date", "test_dist", end_date=end_date)
# Confirm the dates of the retrieved entries are earlier than or equal to the requested date
self.assertEqual(2, len(end_entries))
for entries in end_entries:
retrieved = dateutils.parse_iso8601_datetime(entries["started"])
given_end = dateutils.parse_iso8601_datetime(end_date)
self.assertTrue(retrieved <= given_end)
开发者ID:jdob,项目名称:pulp,代码行数:29,代码来源:test_repo_publish_manager.py
示例6: _do_publish
def _do_publish(self, repo, distributor_id, distributor_instance, transfer_repo, conduit, call_config):
distributor_coll = RepoDistributor.get_collection()
publish_result_coll = RepoPublishResult.get_collection()
repo_id = repo['id']
# Perform the publish
publish_start_timestamp = _now_timestamp()
try:
publish_report = distributor_instance.publish_repo(transfer_repo, conduit, call_config)
except Exception, e:
publish_end_timestamp = _now_timestamp()
# Reload the distributor in case the scratchpad is set by the plugin
repo_distributor = distributor_coll.find_one({'repo_id' : repo_id, 'id' : distributor_id})
repo_distributor['last_publish'] = publish_end_timestamp
distributor_coll.save(repo_distributor, safe=True)
# Add a publish history entry for the run
result = RepoPublishResult.error_result(repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
publish_result_coll.save(result, safe=True)
_LOG.exception(_('Exception caught from plugin during publish for repo [%(r)s]' % {'r' : repo_id}))
raise PulpExecutionException(), None, sys.exc_info()[2]
开发者ID:fdammeke,项目名称:pulp,代码行数:25,代码来源:publish.py
示例7: add_result
def add_result(repo_id, dist_id, offset):
started = dateutils.now_utc_datetime_with_tzinfo()
completed = started + datetime.timedelta(days=offset)
r = RepoPublishResult.expected_result(
repo_id, dist_id, 'bar', dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed), 'test-summary', 'test-details',
RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:credativ,项目名称:pulp,代码行数:8,代码来源:test_publish.py
示例8: _do_publish
def _do_publish(repo_obj, dist_id, dist_inst, transfer_repo, conduit, call_config):
"""
Publish the repository using the given distributor.
:param repo_obj: repository object
:type repo_obj: pulp.server.db.model.Repository
:param dist_id: identifies the distributor
:type dist_id: str
:param dist_inst: instance of the distributor
:type dist_inst: dict
:param transfer_repo: dict representation of a repo for the plugins to use
:type transfer_repo: pulp.plugins.model.Repository
:param conduit: allows the plugin to interact with core pulp
:type conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
:param call_config: allows the plugin to retrieve values
:type call_config: pulp.plugins.config.PluginCallConfiguration
:return: publish result containing information about the publish
:rtype: pulp.server.db.model.repository.RepoPublishResult
:raises pulp_exceptions.PulpCodedException: if the publish report's success flag is falsey
"""
distributor_coll = RepoDistributor.get_collection()
publish_result_coll = RepoPublishResult.get_collection()
publish_start_timestamp = _now_timestamp()
try:
# Add the register_sigterm_handler decorator to the publish_repo call, so that we can
# respond to signals by calling the Distributor's cancel_publish_repo() method.
publish_repo = register_sigterm_handler(dist_inst.publish_repo,
dist_inst.cancel_publish_repo)
publish_report = publish_repo(transfer_repo, conduit, call_config)
if publish_report is not None and hasattr(publish_report, 'success_flag') \
and not publish_report.success_flag:
raise pulp_exceptions.PulpCodedException(
error_code=error_codes.PLP0034, repository_id=repo_obj.repo_id,
distributor_id=dist_id
)
except Exception, e:
publish_end_timestamp = _now_timestamp()
# Reload the distributor in case the scratchpad is set by the plugin
repo_distributor = distributor_coll.find_one(
{'repo_id': repo_obj.repo_id, 'id': dist_id})
distributor_coll.save(repo_distributor, safe=True)
# Add a publish history entry for the run
result = RepoPublishResult.error_result(
repo_obj.repo_id, repo_distributor['id'], repo_distributor['distributor_type_id'],
publish_start_timestamp, publish_end_timestamp, e, sys.exc_info()[2])
publish_result_coll.save(result, safe=True)
_logger.exception(
_('Exception caught from plugin during publish for repo [%(r)s]'
% {'r': repo_obj.repo_id}))
raise
开发者ID:zjhuntin,项目名称:pulp,代码行数:56,代码来源:repository.py
示例9: add_result
def add_result(repo_id, dist_id, offset):
started = datetime.datetime.now(dateutils.local_tz())
completed = started + datetime.timedelta(days=offset)
r = RepoPublishResult.expected_result(
repo_id,
dist_id,
"bar",
dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:jdob,项目名称:pulp,代码行数:14,代码来源:test_repo_publish_manager.py
示例10: test_publish_failure_report
def test_publish_failure_report(self):
"""
Tests a publish call that indicates a graceful failure.
"""
# Setup
publish_config = {'foo' : 'bar'}
self.repo_manager.create_repo('repo-1')
self.distributor_manager.add_distributor('repo-1', 'mock-distributor', publish_config, False, distributor_id='dist-1')
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = PublishReport(False, 'Summary of the publish', 'Details of the publish')
# Test
self.publish_manager.publish('repo-1', 'dist-1', None)
# Verify
entries = list(RepoPublishResult.get_collection().find({'repo_id' : 'repo-1'}))
self.assertEqual(1, len(entries))
self.assertEqual('repo-1', entries[0]['repo_id'])
self.assertEqual('dist-1', entries[0]['distributor_id'])
self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
self.assertTrue(entries[0]['started'] is not None)
self.assertTrue(entries[0]['completed'] is not None)
self.assertEqual(RepoPublishResult.RESULT_FAILED, entries[0]['result'])
self.assertTrue(entries[0]['summary'] is not None)
self.assertTrue(entries[0]['details'] is not None)
self.assertTrue(entries[0]['error_message'] is None)
self.assertTrue(entries[0]['exception'] is None)
self.assertTrue(entries[0]['traceback'] is None)
# Cleanup
mock_plugins.reset()
开发者ID:hennessy80,项目名称:pulp,代码行数:31,代码来源:test_repo_publish_manager.py
示例11: __init__
def __init__(self, repo, publish_conduit, config, distributor_type):
"""
:param repo: Pulp managed Yum repository
:type repo: pulp.plugins.model.Repository
:param publish_conduit: Conduit providing access to relative Pulp functionality
:type publish_conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
:param config: Pulp configuration for the distributor
:type config: pulp.plugins.config.PluginCallConfiguration
:param distributor_type: The type of the distributor that is being published
:type distributor_type: str
:ivar last_published: last time this distributor published the repo
:ivar last_delete: last time a unit was removed from this repository
:ivar repo: repository being operated on
:ivar predistributor: distributor object that is associated with this distributor. It's
publish history affects the type of publish is performed
:ivar symlink_list: list of symlinks to rsync
:ivar content_unit_file_list: list of content units to rsync
:ivar symlink_src: path to directory containing all symlinks
"""
super(Publisher, self).__init__("Repository publish", repo,
publish_conduit, config,
distributor_type=distributor_type)
distributor = Distributor.objects.get_or_404(repo_id=self.repo.id,
distributor_id=publish_conduit.distributor_id)
self.last_published = distributor["last_publish"]
self.last_deleted = repo.last_unit_removed
self.repo = repo
self.predistributor = self._get_predistributor()
if self.last_published:
string_date = dateutils.format_iso8601_datetime(self.last_published)
else:
string_date = None
if self.predistributor:
search_params = {'repo_id': repo.id,
'distributor_id': self.predistributor["id"],
'started': {"$gte": string_date}}
self.predist_history = RepoPublishResult.get_collection().find(search_params)
else:
self.predist_history = []
self.remote_path = self.get_remote_repo_path()
if self.is_fastforward():
start_date = self.last_published
end_date = None
if self.predistributor:
end_date = self.predistributor["last_publish"]
date_filter = self.create_date_range_filter(start_date=start_date, end_date=end_date)
else:
date_filter = None
self.symlink_list = []
self.content_unit_file_list = []
self.symlink_src = os.path.join(self.get_working_dir(), '.relative/')
self._add_necesary_steps(date_filter=date_filter, config=config)
开发者ID:pcreech,项目名称:pulp,代码行数:60,代码来源:publish.py
示例12: publish_history
def publish_history(start_date, end_date, repo_id, distributor_id):
"""
Returns a cursor containing the publish history entries for the given repo and distributor.
:param start_date: if specified, no events prior to this date will be returned.
:type start_date: iso8601 datetime string
:param end_date: if specified, no events after this date will be returned.
:type end_date: iso8601 datetime string
:param repo_id: identifies the repo
:type repo_id: str
:param distributor_id: identifies the distributor to retrieve history for
:type distributor_id: str
:return: object containing publish history results
:rtype: pymongo.cursor.Cursor
:raise pulp_exceptions.MissingResource: if repo/distributor pair is invalid
"""
model.Repository.objects.get_repo_or_missing_resource(repo_id)
dist = RepoDistributor.get_collection().find_one({'repo_id': repo_id, 'id': distributor_id})
if dist is None:
raise pulp_exceptions.MissingResource(distributor_id)
search_params = {'repo_id': repo_id, 'distributor_id': distributor_id}
date_range = {}
if start_date:
date_range['$gte'] = start_date
if end_date:
date_range['$lte'] = end_date
if len(date_range) > 0:
search_params['started'] = date_range
return RepoPublishResult.get_collection().find(search_params)
开发者ID:zjhuntin,项目名称:pulp,代码行数:32,代码来源:repository.py
示例13: test_publish_with_error
def test_publish_with_error(self):
"""
Tests a publish when the plugin raises an error.
"""
# Setup
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = Exception()
self.repo_manager.create_repo("gonna-bail")
self.distributor_manager.add_distributor("gonna-bail", "mock-distributor", {}, False, distributor_id="bad-dist")
self.assertRaises(Exception, self.publish_manager.publish, "gonna-bail", "bad-dist")
# Verify
repo_distributor = RepoDistributor.get_collection().find_one({"repo_id": "gonna-bail", "id": "bad-dist"})
self.assertTrue(repo_distributor is not None)
self.assertTrue(assert_last_sync_time(repo_distributor["last_publish"]))
entries = list(RepoPublishResult.get_collection().find({"repo_id": "gonna-bail"}))
self.assertEqual(1, len(entries))
self.assertEqual("gonna-bail", entries[0]["repo_id"])
self.assertEqual("bad-dist", entries[0]["distributor_id"])
self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
self.assertTrue(entries[0]["started"] is not None)
self.assertTrue(entries[0]["completed"] is not None)
self.assertEqual(RepoPublishResult.RESULT_ERROR, entries[0]["result"])
self.assertTrue(entries[0]["summary"] is None)
self.assertTrue(entries[0]["details"] is None)
self.assertTrue(entries[0]["error_message"] is not None)
self.assertTrue(entries[0]["exception"] is not None)
self.assertTrue(entries[0]["traceback"] is not None)
# Cleanup
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = None
开发者ID:lzap,项目名称:pulp,代码行数:35,代码来源:test_publish.py
示例14: test_publish_failure_report
def test_publish_failure_report(self):
"""
Tests a publish call that indicates a graceful failure.
"""
# Setup
publish_config = {"foo": "bar"}
self.repo_manager.create_repo("repo-1")
self.distributor_manager.add_distributor(
"repo-1", "mock-distributor", publish_config, False, distributor_id="dist-1"
)
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = PublishReport(
False, "Summary of the publish", "Details of the publish"
)
# Test
self.publish_manager.publish("repo-1", "dist-1", None)
# Verify
entries = list(RepoPublishResult.get_collection().find({"repo_id": "repo-1"}))
self.assertEqual(1, len(entries))
self.assertEqual("repo-1", entries[0]["repo_id"])
self.assertEqual("dist-1", entries[0]["distributor_id"])
self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
self.assertTrue(entries[0]["started"] is not None)
self.assertTrue(entries[0]["completed"] is not None)
self.assertEqual(RepoPublishResult.RESULT_FAILED, entries[0]["result"])
self.assertTrue(entries[0]["summary"] is not None)
self.assertTrue(entries[0]["details"] is not None)
self.assertTrue(entries[0]["error_message"] is None)
self.assertTrue(entries[0]["exception"] is None)
self.assertTrue(entries[0]["traceback"] is None)
# Cleanup
mock_plugins.reset()
开发者ID:jdob,项目名称:pulp,代码行数:35,代码来源:test_repo_publish_manager.py
示例15: test_publish
def test_publish(self, mock_finished, mock_started):
"""
Tests publish under normal conditions when everything is configured
correctly.
"""
# Setup
publish_config = {"foo": "bar"}
self.repo_manager.create_repo("repo-1")
self.distributor_manager.add_distributor(
"repo-1", "mock-distributor", publish_config, False, distributor_id="dist-1"
)
self.distributor_manager.add_distributor(
"repo-1", "mock-distributor-2", publish_config, False, distributor_id="dist-2"
)
# Test
distributor, config = self.publish_manager._get_distributor_instance_and_config("repo-1", "dist-1")
self.publish_manager.publish("repo-1", "dist-1", distributor, config, None)
# Verify
# Database
repo_distributor = RepoDistributor.get_collection().find_one({"repo_id": "repo-1", "id": "dist-1"})
self.assertTrue(repo_distributor["last_publish"] is not None)
self.assertTrue(assert_last_sync_time(repo_distributor["last_publish"]))
# History
entries = list(RepoPublishResult.get_collection().find({"repo_id": "repo-1"}))
self.assertEqual(1, len(entries))
self.assertEqual("repo-1", entries[0]["repo_id"])
self.assertEqual("dist-1", entries[0]["distributor_id"])
self.assertEqual("mock-distributor", entries[0]["distributor_type_id"])
self.assertTrue(entries[0]["started"] is not None)
self.assertTrue(entries[0]["completed"] is not None)
self.assertEqual(RepoPublishResult.RESULT_SUCCESS, entries[0]["result"])
self.assertTrue(entries[0]["summary"] is not None)
self.assertTrue(entries[0]["details"] is not None)
self.assertTrue(entries[0]["error_message"] is None)
self.assertTrue(entries[0]["exception"] is None)
self.assertTrue(entries[0]["traceback"] is None)
# Call into the correct distributor
call_args = mock_plugins.MOCK_DISTRIBUTOR.publish_repo.call_args[0]
self.assertEqual("repo-1", call_args[0].id)
self.assertTrue(call_args[1] is not None)
self.assertEqual({}, call_args[2].plugin_config)
self.assertEqual(publish_config, call_args[2].repo_plugin_config)
self.assertEqual({}, call_args[2].override_config)
self.assertEqual(0, mock_plugins.MOCK_DISTRIBUTOR_2.publish_repo.call_count)
self.assertEqual(1, mock_started.call_count)
self.assertEqual("repo-1", mock_started.call_args[0][0])
self.assertEqual(1, mock_finished.call_count)
self.assertEqual("repo-1", mock_finished.call_args[0][0]["repo_id"])
开发者ID:ehelms,项目名称:pulp,代码行数:58,代码来源:test_repo_publish_manager.py
示例16: test_publish
def test_publish(self, mock_finished, mock_started, mock_get_working_directory, dt):
"""
Tests publish under normal conditions when everything is configured
correctly.
"""
# Setup
publish_config = {'foo': 'bar'}
self.repo_manager.create_repo('repo-1')
self.distributor_manager.add_distributor('repo-1', 'mock-distributor', publish_config,
False, distributor_id='dist-1')
self.distributor_manager.add_distributor('repo-1', 'mock-distributor-2', publish_config,
False, distributor_id='dist-2')
dt.utcnow.return_value = 1234
# Test
self.publish_manager.publish('repo-1', 'dist-1', None)
# Verify
# Database
repo_distributor = RepoDistributor.get_collection().find_one({'repo_id': 'repo-1',
'id': 'dist-1'})
self.assertTrue(repo_distributor['last_publish'] is not None)
self.assertEqual(repo_distributor['last_publish'], dt.utcnow.return_value)
# History
entries = list(RepoPublishResult.get_collection().find({'repo_id': 'repo-1'}))
self.assertEqual(1, len(entries))
self.assertEqual('repo-1', entries[0]['repo_id'])
self.assertEqual('dist-1', entries[0]['distributor_id'])
self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
self.assertTrue(entries[0]['started'] is not None)
self.assertTrue(entries[0]['completed'] is not None)
self.assertEqual(RepoPublishResult.RESULT_SUCCESS, entries[0]['result'])
self.assertTrue(entries[0]['summary'] is not None)
self.assertTrue(entries[0]['details'] is not None)
self.assertTrue(entries[0]['error_message'] is None)
self.assertTrue(entries[0]['exception'] is None)
self.assertTrue(entries[0]['traceback'] is None)
# Call into the correct distributor
call_args = mock_plugins.MOCK_DISTRIBUTOR.publish_repo.call_args[0]
self.assertEqual('repo-1', call_args[0].id)
self.assertTrue(call_args[1] is not None)
self.assertEqual({}, call_args[2].plugin_config)
self.assertEqual(publish_config, call_args[2].repo_plugin_config)
self.assertEqual({}, call_args[2].override_config)
self.assertEqual(0, mock_plugins.MOCK_DISTRIBUTOR_2.publish_repo.call_count)
self.assertEqual(1, mock_started.call_count)
self.assertEqual('repo-1', mock_started.call_args[0][0])
self.assertEqual(1, mock_finished.call_count)
self.assertEqual('repo-1', mock_finished.call_args[0][0]['repo_id'])
开发者ID:credativ,项目名称:pulp,代码行数:57,代码来源:test_publish.py
示例17: test_publish_no_plugin_report
def test_publish_no_plugin_report(self):
"""
Tests publishing against a sloppy plugin that doesn't return a report.
"""
# Setup
self.repo_manager.create_repo('sloppy')
self.distributor_manager.add_distributor('sloppy', 'mock-distributor', {}, True, distributor_id='slop')
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = None # lame plugin
# Test
self.publish_manager.publish('sloppy', 'slop')
# Verify
entries = list(RepoPublishResult.get_collection().find({'repo_id' : 'sloppy'}))
self.assertEqual(1, len(entries))
self.assertEqual('Unknown', entries[0]['summary'])
self.assertEqual('Unknown', entries[0]['details'])
开发者ID:hennessy80,项目名称:pulp,代码行数:19,代码来源:test_repo_publish_manager.py
示例18: test_publish_no_plugin_report
def test_publish_no_plugin_report(self):
"""
Tests publishing against a sloppy plugin that doesn't return a report.
"""
# Setup
self.repo_manager.create_repo("sloppy")
self.distributor_manager.add_distributor("sloppy", "mock-distributor", {}, True, distributor_id="slop")
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.return_value = None # lame plugin
# Test
self.publish_manager.publish("sloppy", "slop")
# Verify
entries = list(RepoPublishResult.get_collection().find({"repo_id": "sloppy"}))
self.assertEqual(1, len(entries))
self.assertEqual("Unknown", entries[0]["summary"])
self.assertEqual("Unknown", entries[0]["details"])
开发者ID:jdob,项目名称:pulp,代码行数:19,代码来源:test_repo_publish_manager.py
示例19: publish_history
def publish_history(self, repo_id, distributor_id, limit=None):
"""
Returns publish history entries for the give repo, sorted from most
recent to oldest. If there are no entries, an empty list is returned.
@param repo_id: identifies the repo
@type repo_id: str
@param distributor_id: identifies the distributor to retrieve history for
@type distributor_id: str
@param limit: maximum number of results to return
@type limit: int
@return: list of publish history result instances
@rtype: list of L{pulp.server.db.model.repository.RepoPublishResult}
@raise MissingResource: if repo_id does not reference a valid repo
"""
# Validation
repo = Repo.get_collection().find_one({'id' : repo_id})
if repo is None:
raise MissingResource(repo_id)
dist = RepoDistributor.get_collection().find_one({'repo_id' : repo_id, 'id' : distributor_id})
if dist is None:
raise MissingResource(distributor_id)
if limit is None:
limit = 10 # default here for each of REST API calls into here
# Retrieve the entries
cursor = RepoPublishResult.get_collection().find({'repo_id' : repo_id, 'distributor_id' : distributor_id})
cursor.limit(limit)
cursor.sort('completed', pymongo.DESCENDING)
return list(cursor)
开发者ID:ehelms,项目名称:pulp,代码行数:38,代码来源:publish.py
示例20: test_publish_with_error
def test_publish_with_error(self, mock_get_working_directory):
"""
Tests a publish when the plugin raises an error.
"""
# Setup
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = Exception()
self.repo_manager.create_repo('gonna-bail')
self.distributor_manager.add_distributor('gonna-bail', 'mock-distributor', {}, False,
distributor_id='bad-dist')
self.assertRaises(Exception, self.publish_manager.publish,
'gonna-bail', 'bad-dist')
# Verify
repo_distributor = RepoDistributor.get_collection().find_one(
{'repo_id': 'gonna-bail', 'id': 'bad-dist'})
self.assertTrue(repo_distributor is not None)
self.assertEqual(repo_distributor['last_publish'], None)
entries = list(RepoPublishResult.get_collection().find({'repo_id': 'gonna-bail'}))
self.assertEqual(1, len(entries))
self.assertEqual('gonna-bail', entries[0]['repo_id'])
self.assertEqual('bad-dist', entries[0]['distributor_id'])
self.assertEqual('mock-distributor', entries[0]['distributor_type_id'])
self.assertTrue(entries[0]['started'] is not None)
self.assertTrue(entries[0]['completed'] is not None)
self.assertEqual(RepoPublishResult.RESULT_ERROR, entries[0]['result'])
self.assertTrue(entries[0]['summary'] is None)
self.assertTrue(entries[0]['details'] is None)
self.assertTrue(entries[0]['error_message'] is not None)
self.assertTrue(entries[0]['exception'] is not None)
self.assertTrue(entries[0]['traceback'] is not None)
# Cleanup
mock_plugins.MOCK_DISTRIBUTOR.publish_repo.side_effect = None
开发者ID:credativ,项目名称:pulp,代码行数:38,代码来源:test_publish.py
注:本文中的pulp.server.db.model.repository.RepoPublishResult类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论