本文整理汇总了Python中pulp.common.dateutils.format_iso8601_datetime函数的典型用法代码示例。如果您正苦于以下问题:Python format_iso8601_datetime函数的具体用法?Python format_iso8601_datetime怎么用?Python format_iso8601_datetime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了format_iso8601_datetime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: add_result
def add_result(repo_id, offset):
started = datetime.datetime.now(dateutils.local_tz())
completed = started + datetime.timedelta(days=offset)
r = RepoSyncResult.expected_result(repo_id, 'foo', 'bar', dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed), 1, 1, 1, '', '',
RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
开发者ID:cliffy94,项目名称:pulp,代码行数:7,代码来源:test_repo_sync_manager.py
示例2: add_result
def add_result(repo_id, dist_id, offset):
started = dateutils.now_utc_datetime_with_tzinfo()
completed = started + datetime.timedelta(days=offset)
r = RepoPublishResult.expected_result(
repo_id, dist_id, 'bar', dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed), 'test-summary', 'test-details',
RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:credativ,项目名称:pulp,代码行数:8,代码来源:test_publish.py
示例3: test_save_update_with_set_on_insert
def test_save_update_with_set_on_insert(self):
"""
Test the save method with set on insert arguments when the object is already in the
database.
"""
task_id = str(uuid4())
worker_name = 'worker_name'
tags = ['tag_1', 'tag_2']
state = constants.CALL_ACCEPTED_STATE
spawned_tasks = ['foo']
error = {'error': 'some_error'}
progress_report = {'what do we want?': 'progress!', 'when do we want it?': 'now!'}
task_type = 'some.task'
old_start_time = start_time = datetime.now()
finish_time = start_time + timedelta(minutes=5)
start_time = dateutils.format_iso8601_datetime(start_time)
finish_time = dateutils.format_iso8601_datetime(finish_time)
result = None
ts = TaskStatus(
task_id, worker_name, tags, state, spawned_tasks=spawned_tasks, error=error,
progress_report=progress_report, task_type=task_type, start_time=start_time,
finish_time=finish_time, result=result)
# Put the object in the database, and then change some of it settings.
ts.save()
new_worker_name = 'a different_worker'
new_state = constants.CALL_SUSPENDED_STATE
new_start_time = old_start_time + timedelta(minutes=10)
new_start_time = dateutils.format_iso8601_datetime(new_start_time)
ts.worker_name = new_worker_name
ts.state = new_state
ts.start_time = new_start_time
# This should update the worker_name on ts in the database, but should not update the state
# or start_time
ts.save_with_set_on_insert(fields_to_set_on_insert=['state', 'start_time'])
ts = TaskStatus.objects()
# There should only be one TaskStatus in the db
self.assertEqual(len(ts), 1)
ts = ts[0]
# Make sure all the attributes are correct
self.assertEqual(ts['task_id'], task_id)
# Queue should have been updated
self.assertEqual(ts['worker_name'], new_worker_name)
self.assertEqual(ts['tags'], tags)
# state should not have been updated
self.assertEqual(ts['state'], state)
self.assertEqual(ts['error'], error)
self.assertEqual(ts['spawned_tasks'], spawned_tasks)
self.assertEqual(ts['progress_report'], progress_report)
self.assertEqual(ts['task_type'], task_type)
# start_time should not have been updated
self.assertEqual(ts['start_time'], start_time)
self.assertEqual(ts['finish_time'], finish_time)
self.assertEqual(ts['result'], result)
# These are always None
self.assertEqual(ts['traceback'], None)
self.assertEqual(ts['exception'], None)
开发者ID:credativ,项目名称:pulp,代码行数:58,代码来源:test_dispatch.py
示例4: convert_schedule
def convert_schedule(save_func, call):
"""
Converts one scheduled call from the old schema to the new
:param save_func: a function that takes one parameter, a dictionary that
represents the scheduled call in its new schema. This
function should save the call to the database.
:type save_func: function
:param call: dictionary representing the scheduled call in its old
schema
:type call: dict
"""
call.pop('call_exit_states', None)
call['total_run_count'] = call.pop('call_count')
call['iso_schedule'] = call['schedule']
interval, start_time, occurrences = dateutils.parse_iso8601_interval(call['schedule'])
# this should be a pickled instance of celery.schedules.schedule
call['schedule'] = pickle.dumps(schedule(interval))
call_request = call.pop('serialized_call_request')
# we are no longer storing these pickled.
# these are cast to a string because python 2.6 sometimes fails to
# deserialize json from unicode.
call['args'] = pickle.loads(str(call_request['args']))
call['kwargs'] = pickle.loads(str(call_request['kwargs']))
# keeping this pickled because we don't really know how to use it yet
call['principal'] = call_request['principal']
# this always get calculated on-the-fly now
call.pop('next_run', None)
first_run = call['first_run'].replace(tzinfo=dateutils.utc_tz())
call['first_run'] = dateutils.format_iso8601_datetime(first_run)
last_run = call.pop('last_run')
if last_run:
last_run_at = last_run.replace(tzinfo=dateutils.utc_tz())
call['last_run_at'] = dateutils.format_iso8601_datetime(last_run_at)
else:
call['last_run_at'] = None
call['task'] = NAMES_TO_TASKS[call_request['callable_name']]
# this is a new field that is used to determine when the scheduler needs to
# re-read the collection of schedules.
call['last_updated'] = time.time()
# determine if this is a consumer-related schedule, which we can only identify
# by the consumer resource tag. If it is, save that tag value in the new
# "resource" field, which is the new way that we will identify the
# relationship between a schedule and some other object. This is not
# necessary for repos, because we have a better method above for identifying
# them (move_scheduled_syncs).
tags = call_request.get('tags', [])
for tag in tags:
if tag.startswith('pulp:consumer:'):
call['resource'] = tag
break
save_func(call)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:57,代码来源:0007_scheduled_task_conversion.py
示例5: add_result
def add_result(repo_id, dist_id, offset):
started = datetime.datetime.now(dateutils.local_tz())
completed = started + datetime.timedelta(days=offset)
r = RepoPublishResult.expected_result(
repo_id,
dist_id,
"bar",
dateutils.format_iso8601_datetime(started),
dateutils.format_iso8601_datetime(completed),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
开发者ID:jdob,项目名称:pulp,代码行数:14,代码来源:test_repo_publish_manager.py
示例6: test_save_update_defaults
def test_save_update_defaults(self):
"""
Test the save method with default arguments when the object is already in the database.
"""
task_id = str(uuid4())
worker_name = 'worker_name'
tags = ['tag_1', 'tag_2']
state = constants.CALL_ACCEPTED_STATE
spawned_tasks = ['foo']
error = {'error': 'some_error'}
progress_report = {'what do we want?': 'progress!', 'when do we want it?': 'now!'}
task_type = 'some.task'
start_time = datetime.now()
finish_time = start_time + timedelta(minutes=5)
start_time = dateutils.format_iso8601_datetime(start_time)
finish_time = dateutils.format_iso8601_datetime(finish_time)
result = None
ts = TaskStatus(
task_id, worker_name, tags, state, spawned_tasks=spawned_tasks, error=error,
progress_report=progress_report, task_type=task_type, start_time=start_time,
finish_time=finish_time, result=result)
# Let's go ahead and insert the object
ts.save()
# Now let's alter it a bit, and make sure the alteration makes it to the DB correctly.
new_state = constants.CALL_RUNNING_STATE
ts.state = new_state
# This should update ts in the database
ts.save()
ts = TaskStatus.objects()
# There should only be one TaskStatus in the db
self.assertEqual(len(ts), 1)
ts = ts[0]
# Make sure all the attributes are correct
self.assertEqual(ts['task_id'], task_id)
self.assertEqual(ts['worker_name'], worker_name)
self.assertEqual(ts['tags'], tags)
# The state should have been updated
self.assertEqual(ts['state'], new_state)
self.assertEqual(ts['error'], error)
self.assertEqual(ts['spawned_tasks'], spawned_tasks)
self.assertEqual(ts['progress_report'], progress_report)
self.assertEqual(ts['task_type'], task_type)
self.assertEqual(ts['start_time'], start_time)
self.assertEqual(ts['finish_time'], finish_time)
self.assertEqual(ts['result'], result)
# These are always None
self.assertEqual(ts['traceback'], None)
self.assertEqual(ts['exception'], None)
开发者ID:credativ,项目名称:pulp,代码行数:50,代码来源:test_dispatch.py
示例7: _load_repo_extras
def _load_repo_extras(repo, repos=None):
config = get_config()
repoapi = RepositoryAPI()
repo["url"] = os.path.join(config.cds.baseurl, repo["relative_path"])
repo["parent"] = None
repo["children"] = []
if repos is None:
repos = getattr(threading.local(), "repos", dict())
for repo2 in repos.values():
if repo2 == repo:
continue
elif repo["id"] in repo2["clone_ids"]:
# the clone_id attribute is broken, but we check it anyway
# just in case it gets fixed some day
repo["parent"] = repo2
elif repo2["id"] in repo["clone_ids"]:
repo["children"].append(repo2)
elif (
repo["source"] and repo["source"]["type"] == "local" and repo["source"]["url"].endswith("/%s" % repo2["id"])
):
# the child syncs from a local repo that ends with
# /<parent repo id>
repo["parent"] = repo2
elif (
repo2["source"]
and repo2["source"]["type"] == "local"
and repo2["source"]["url"].endswith("/%s" % repo["id"])
):
repo["children"].append(repo2)
repo["keys"] = dict()
for key in repoapi.listkeys(repo["id"]):
repo["keys"][os.path.basename(key)] = "%s/%s" % (config.cds.keyurl, key)
if repo["parent"]:
repo["updates"] = has_updates(repo)
if repo["last_sync"] and repo["sync_schedule"]:
repo["next_sync"] = format_iso8601_datetime(
parse_iso8601_datetime(repo["last_sync"]) + parse_iso8601_interval(repo["sync_schedule"])[0]
)
elif repo["sync_schedule"]:
repo["next_sync"] = format_iso8601_datetime(parse_iso8601_interval(repo["sync_schedule"])[1])
else:
repo["next_sync"] = None
repo["groupid"].sort()
开发者ID:stpierre,项目名称:sponge,代码行数:49,代码来源:repo.py
示例8: set_task_started
def set_task_started(task_id, timestamp=None):
"""
Update a task's state to reflect that it has started running.
:param task_id: The identity of the task to be updated.
:type task_id: basestring
:param timestamp: The (optional) ISO-8601 finished timestamp (UTC).
:type timestamp: str
"""
collection = TaskStatus.get_collection()
if not timestamp:
now = datetime.now(dateutils.utc_tz())
started = dateutils.format_iso8601_datetime(now)
else:
started = timestamp
select = {
'task_id': task_id
}
update = {
'$set': {'start_time': started}
}
collection.update(select, update, safe=True)
select = {
'task_id': task_id,
'state': {'$in': [constants.CALL_WAITING_STATE, constants.CALL_ACCEPTED_STATE]}
}
update = {
'$set': {'state': constants.CALL_RUNNING_STATE}
}
collection.update(select, update, safe=True)
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:34,代码来源:task_status_manager.py
示例9: set_task_failed
def set_task_failed(task_id, traceback=None, timestamp=None):
"""
Update a task's state to reflect that it has succeeded.
:param task_id: The identity of the task to be updated.
:type task_id: basestring
:ivar traceback: A string representation of the traceback resulting from the task execution.
:type traceback: basestring
:param timestamp: The (optional) ISO-8601 finished timestamp (UTC).
:type timestamp: str
"""
collection = TaskStatus.get_collection()
if not timestamp:
now = datetime.now(dateutils.utc_tz())
finished = dateutils.format_iso8601_datetime(now)
else:
finished = timestamp
update = {
'$set': {
'finish_time': finished,
'state': constants.CALL_ERROR_STATE,
'traceback': traceback
}
}
collection.update({'task_id': task_id}, update, safe=True)
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:27,代码来源:task_status_manager.py
示例10: set_task_succeeded
def set_task_succeeded(task_id, result=None, timestamp=None):
"""
Update a task's state to reflect that it has succeeded.
:param task_id: The identity of the task to be updated.
:type task_id: basestring
:param result: The optional value returned by the task execution.
:type result: anything
:param timestamp: The (optional) ISO-8601 finished timestamp (UTC).
:type timestamp: str
"""
collection = TaskStatus.get_collection()
if not timestamp:
now = datetime.now(dateutils.utc_tz())
finished = dateutils.format_iso8601_datetime(now)
else:
finished = timestamp
update = {
'$set': {
'finish_time': finished,
'state': constants.CALL_FINISHED_STATE,
'result': result
}
}
collection.update({'task_id': task_id}, update, safe=True)
开发者ID:VuokkoVuorinnen,项目名称:pulp,代码行数:27,代码来源:task_status_manager.py
示例11: on_failure
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
This overrides the error handler run by the worker when the task fails.
It updates state, finish_time and traceback of the relevant task status
for asynchronous tasks. Skip updating status for synchronous tasks.
:param exc: The exception raised by the task.
:param task_id: Unique id of the failed task.
:param args: Original arguments for the executed task.
:param kwargs: Original keyword arguments for the executed task.
:param einfo: celery's ExceptionInfo instance, containing serialized traceback.
"""
if isinstance(exc, PulpCodedException):
_logger.info(_('Task failed : [%(task_id)s] : %(msg)s') %
{'task_id': task_id, 'msg': str(exc)})
_logger.debug(traceback.format_exc())
else:
_logger.info(_('Task failed : [%s]') % task_id)
# celery will log the traceback
if not self.request.called_directly:
now = datetime.now(dateutils.utc_tz())
finish_time = dateutils.format_iso8601_datetime(now)
task_status = TaskStatus.objects.get(task_id=task_id)
task_status['state'] = constants.CALL_ERROR_STATE
task_status['finish_time'] = finish_time
task_status['traceback'] = einfo.traceback
if not isinstance(exc, PulpException):
exc = PulpException(str(exc))
task_status['error'] = exc.to_dict()
task_status.save()
common_utils.delete_working_directory()
开发者ID:hjensas,项目名称:pulp,代码行数:32,代码来源:tasks.py
示例12: __init__
def __init__(self, repo, publish_conduit, config, distributor_type):
"""
:param repo: Pulp managed Yum repository
:type repo: pulp.plugins.model.Repository
:param publish_conduit: Conduit providing access to relative Pulp functionality
:type publish_conduit: pulp.plugins.conduits.repo_publish.RepoPublishConduit
:param config: Pulp configuration for the distributor
:type config: pulp.plugins.config.PluginCallConfiguration
:param distributor_type: The type of the distributor that is being published
:type distributor_type: str
:ivar last_published: last time this distributor published the repo
:ivar last_delete: last time a unit was removed from this repository
:ivar repo: repository being operated on
:ivar predistributor: distributor object that is associated with this distributor. It's
publish history affects the type of publish is performed
:ivar symlink_list: list of symlinks to rsync
:ivar content_unit_file_list: list of content units to rsync
:ivar symlink_src: path to directory containing all symlinks
"""
super(Publisher, self).__init__("Repository publish", repo,
publish_conduit, config,
distributor_type=distributor_type)
distributor = Distributor.objects.get_or_404(repo_id=self.repo.id,
distributor_id=publish_conduit.distributor_id)
self.last_published = distributor["last_publish"]
self.last_deleted = repo.last_unit_removed
self.repo = repo
self.predistributor = self._get_predistributor()
if self.last_published:
string_date = dateutils.format_iso8601_datetime(self.last_published)
else:
string_date = None
if self.predistributor:
search_params = {'repo_id': repo.id,
'distributor_id': self.predistributor["id"],
'started': {"$gte": string_date}}
self.predist_history = RepoPublishResult.get_collection().find(search_params)
else:
self.predist_history = []
self.remote_path = self.get_remote_repo_path()
if self.is_fastforward():
start_date = self.last_published
end_date = None
if self.predistributor:
end_date = self.predistributor["last_publish"]
date_filter = self.create_date_range_filter(start_date=start_date, end_date=end_date)
else:
date_filter = None
self.symlink_list = []
self.content_unit_file_list = []
self.symlink_src = os.path.join(self.get_working_dir(), '.relative/')
self._add_necesary_steps(date_filter=date_filter, config=config)
开发者ID:pcreech,项目名称:pulp,代码行数:60,代码来源:publish.py
示例13: test_build_progress_report
def test_build_progress_report(self):
"""
Test the build_progress_report() method.
"""
state = progress.SyncProgressReport.STATE_ISOS_IN_PROGRESS
state_times = {progress.SyncProgressReport.STATE_ISOS_IN_PROGRESS: datetime.utcnow()}
num_isos = 5
num_isos_finished = 3
iso_error_messages = {'an.iso': "No!"}
error_message = 'This is an error message.'
traceback = 'This is a traceback.'
total_bytes = 1024
finished_bytes = 512
report = progress.SyncProgressReport(
self.conduit, state=state, state_times=state_times, num_isos=num_isos,
num_isos_finished=num_isos_finished, iso_error_messages=iso_error_messages,
error_message=error_message, traceback=traceback, total_bytes=total_bytes,
finished_bytes=finished_bytes)
report = report.build_progress_report()
# Make sure all the appropriate attributes were set
self.assertEqual(report['state'], state)
expected_state_times = {}
for key, value in state_times.items():
expected_state_times[key] = format_iso8601_datetime(value)
self.assertTrue(report['state_times'], expected_state_times)
self.assertEqual(report['num_isos'], num_isos)
self.assertEqual(report['num_isos_finished'], num_isos_finished)
self.assertEqual(report['iso_error_messages'], iso_error_messages)
self.assertEqual(report['error_message'], error_message)
self.assertEqual(report['traceback'], traceback)
self.assertEqual(report['total_bytes'], total_bytes)
self.assertEqual(report['finished_bytes'], finished_bytes)
开发者ID:asmacdo,项目名称:pulp_rpm,代码行数:34,代码来源:test_progress.py
示例14: test_task_status_update_fires_notification
def test_task_status_update_fires_notification(self, mock_send):
"""
Test that update_one() also fires a notification.
"""
task_id = self.get_random_uuid()
worker_name = 'special_worker_name'
tags = ['test-tag1', 'test-tag2']
state = 'waiting'
ts = TaskStatus(task_id, worker_name, tags, state)
ts.save()
# ensure event was fired for save()
mock_send.assert_called_once_with(ts, routing_key="tasks.%s" % task_id)
now = datetime.now(dateutils.utc_tz())
start_time = dateutils.format_iso8601_datetime(now)
delta = {'start_time': start_time,
'state': 'running',
'progress_report': {'report-id': 'my-progress'}}
self.assertEquals(len(mock_send.call_args_list), 1)
TaskStatus.objects(task_id=task_id).update_one(
set__start_time=delta['start_time'], set__state=delta['state'],
set__progress_report=delta['progress_report'])
# ensure event was fired for update_one()
self.assertEquals(len(mock_send.call_args_list), 2)
mock_send.assert_called_with(ts, routing_key="tasks.%s" % task_id)
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dispatch.py
示例15: test_task_status_update
def test_task_status_update(self):
"""
Tests the successful operation of task status update.
"""
task_id = self.get_random_uuid()
worker_name = 'special_worker_name'
tags = ['test-tag1', 'test-tag2']
state = 'waiting'
TaskStatus(task_id, worker_name, tags, state).save()
now = datetime.now(dateutils.utc_tz())
start_time = dateutils.format_iso8601_datetime(now)
delta = {'start_time': start_time,
'state': 'running',
'progress_report': {'report-id': 'my-progress'}}
TaskStatus.objects(task_id=task_id).update_one(
set__start_time=delta['start_time'], set__state=delta['state'],
set__progress_report=delta['progress_report'])
task_status = TaskStatus.objects(task_id=task_id).first()
self.assertEqual(task_status['start_time'], delta['start_time'])
# Make sure that parse_iso8601_datetime is able to parse the start_time without errors
dateutils.parse_iso8601_datetime(task_status['start_time'])
self.assertEqual(task_status['state'], delta['state'])
self.assertEqual(task_status['progress_report'], delta['progress_report'])
self.assertEqual(task_status['worker_name'], worker_name)
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dispatch.py
示例16: failed
def failed(self, reply):
"""
Notification (reply) indicating an RMI failed.
This information used to update the task status.
:param reply: A failure reply object.
:type reply: gofer.rmi.async.Failed
"""
_logger.info(_('Task RMI (failed): %(r)s'), {'r': reply})
call_context = dict(reply.data)
action = call_context.get('action')
task_id = call_context['task_id']
traceback = reply.xstate['trace']
finished = reply.timestamp
if not finished:
now = datetime.now(dateutils.utc_tz())
finished = dateutils.format_iso8601_datetime(now)
TaskStatus.objects(task_id=task_id).update_one(set__finish_time=finished,
set__state=constants.CALL_ERROR_STATE,
set__traceback=traceback)
if action == 'bind':
ReplyHandler._bind_failed(task_id, call_context)
return
if action == 'unbind':
ReplyHandler._unbind_failed(task_id, call_context)
return
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:28,代码来源:services.py
示例17: __call__
def __call__(self, *args, **kwargs):
"""
This overrides CeleryTask's __call__() method. We use this method
for task state tracking of Pulp tasks.
"""
# Check task status and skip running the task if task state is 'canceled'.
try:
task_status = TaskStatus.objects.get(task_id=self.request.id)
except DoesNotExist:
task_status = None
if task_status and task_status['state'] == constants.CALL_CANCELED_STATE:
_logger.debug("Task cancel received for task-id : [%s]" % self.request.id)
return
# Update start_time and set the task state to 'running' for asynchronous tasks.
# Skip updating status for eagerly executed tasks, since we don't want to track
# synchronous tasks in our database.
if not self.request.called_directly:
now = datetime.now(dateutils.utc_tz())
start_time = dateutils.format_iso8601_datetime(now)
# Using 'upsert' to avoid a possible race condition described in the apply_async method
# above.
TaskStatus.objects(task_id=self.request.id).update_one(
set__state=constants.CALL_RUNNING_STATE, set__start_time=start_time, upsert=True)
# Run the actual task
_logger.debug("Running task : [%s]" % self.request.id)
return super(Task, self).__call__(*args, **kwargs)
开发者ID:hjensas,项目名称:pulp,代码行数:26,代码来源:tasks.py
示例18: succeeded
def succeeded(self, reply):
"""
Notification (reply) indicating an RMI succeeded.
This information is relayed to the task coordinator.
:param reply: A successful reply object.
:type reply: gofer.rmi.async.Succeeded
"""
_logger.info(_('Task RMI (succeeded): %(r)s'), {'r': reply})
call_context = dict(reply.data)
action = call_context.get('action')
task_id = call_context['task_id']
result = dict(reply.retval)
finished = reply.timestamp
if not finished:
now = datetime.now(dateutils.utc_tz())
finished = dateutils.format_iso8601_datetime(now)
TaskStatus.objects(task_id=task_id).update_one(set__finish_time=finished,
set__state=constants.CALL_FINISHED_STATE,
set__result=result)
if action == 'bind':
if result['succeeded']:
ReplyHandler._bind_succeeded(task_id, call_context)
else:
ReplyHandler._bind_failed(task_id, call_context)
return
if action == 'unbind':
if result['succeeded']:
ReplyHandler._unbind_succeeded(call_context)
else:
ReplyHandler._unbind_failed(task_id, call_context)
return
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:33,代码来源:services.py
示例19: serialize
def serialize(self):
data = {}
for field in ('response', 'reasons', 'state', 'task_id', 'task_group_id',
'schedule_id', 'progress', 'result', 'tags'):
data[field] = getattr(self, field)
ex = getattr(self, 'exception')
if ex is not None:
data['exception'] = traceback.format_exception_only(type(ex), ex)
else:
data['exception'] = None
tb = getattr(self, 'traceback')
if tb is not None:
if isinstance(tb, (str, list, tuple)):
data['traceback'] = str(tb)
else:
data['traceback'] = traceback.format_tb(tb)
else:
data['traceback'] = None
for field in ('start_time', 'finish_time'):
dt = getattr(self, field)
if dt is not None:
data[field] = dateutils.format_iso8601_datetime(dt)
else:
data[field] = None
return data
开发者ID:ehelms,项目名称:pulp,代码行数:25,代码来源:call.py
示例20: test_first_run_string
def test_first_run_string(self):
first_run = dateutils.format_iso8601_datetime(
datetime.utcnow().replace(tzinfo=dateutils.utc_tz()) + timedelta(days=1))
call = ScheduledCall('PT1M', 'pulp.tasks.dosomething', first_run=first_run)
self.assertEqual(first_run, call.first_run)
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_dispatch.py
注:本文中的pulp.common.dateutils.format_iso8601_datetime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论