本文整理汇总了Python中pulp.common.dateutils.parse_iso8601_datetime函数的典型用法代码示例。如果您正苦于以下问题:Python parse_iso8601_datetime函数的具体用法?Python parse_iso8601_datetime怎么用?Python parse_iso8601_datetime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_iso8601_datetime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_spawned_task_status
def test_spawned_task_status(self, mock_request):
async_result = AsyncResult('foo-id')
retval = tasks.TaskResult(error=PulpException('error-foo'),
result='bar')
retval.spawned_tasks = [async_result]
task_id = str(uuid.uuid4())
args = [1, 'b', 'iii']
kwargs = {'1': 'for the money', 'tags': ['test_tags'], 'routing_key': WORKER_2}
mock_request.called_directly = False
task_status = TaskStatus(task_id).save()
self.assertEqual(task_status['state'], 'waiting')
self.assertEqual(task_status['finish_time'], None)
task = tasks.Task()
task.on_success(retval, task_id, args, kwargs)
new_task_status = TaskStatus.objects(task_id=task_id).first()
self.assertEqual(new_task_status['state'], 'finished')
self.assertEqual(new_task_status['result'], 'bar')
self.assertEqual(new_task_status['error']['description'], 'error-foo')
self.assertFalse(new_task_status['finish_time'] is None)
# Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
self.assertEqual(new_task_status['spawned_tasks'], ['foo-id'])
开发者ID:hgschmie,项目名称:pulp,代码行数:27,代码来源:test_tasks.py
示例2: test_on_success_handler_async_result
def test_on_success_handler_async_result(self, mock_request):
"""
Make sure that overridden on_success handler updates task status correctly
"""
retval = AsyncResult('foo-id')
task_id = str(uuid.uuid4())
args = [1, 'b', 'iii']
kwargs = {'1': 'for the money', 'tags': ['test_tags'], 'queue': RESERVED_WORKER_2}
mock_request.called_directly = False
task_status = TaskStatusManager.create_task_status(task_id, 'some_queue')
self.assertEqual(task_status['state'], 'waiting')
self.assertEqual(task_status['finish_time'], None)
task = tasks.Task()
task.on_success(retval, task_id, args, kwargs)
new_task_status = TaskStatusManager.find_by_task_id(task_id)
self.assertEqual(new_task_status['state'], 'finished')
self.assertEqual(new_task_status['result'], None)
self.assertFalse(new_task_status['finish_time'] is None)
# Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
self.assertEqual(new_task_status['spawned_tasks'], ['foo-id'])
开发者ID:jbennett7,项目名称:pulp,代码行数:25,代码来源:test_tasks.py
示例3: test_publish_history_end_date
def test_publish_history_end_date(self):
# Setup
self.repo_manager.create_repo("test_date")
self.distributor_manager.add_distributor("test_date", "mock-distributor", {}, True, distributor_id="test_dist")
# Create three consecutive publish entries
date_string = "2013-06-01T12:00:0%sZ"
for i in range(0, 6, 2):
r = RepoPublishResult.expected_result(
"test_date",
"test_dist",
"bar",
date_string % str(i),
date_string % str(i + 1),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
# Verify that all entries retrieved have dates prior to the given end date
end_date = "2013-06-01T12:00:03Z"
end_entries = self.publish_manager.publish_history("test_date", "test_dist", end_date=end_date)
# Confirm the dates of the retrieved entries are earlier than or equal to the requested date
self.assertEqual(2, len(end_entries))
for entries in end_entries:
retrieved = dateutils.parse_iso8601_datetime(entries["started"])
given_end = dateutils.parse_iso8601_datetime(end_date)
self.assertTrue(retrieved <= given_end)
开发者ID:jdob,项目名称:pulp,代码行数:29,代码来源:test_repo_publish_manager.py
示例4: validate
def validate(self, value):
super(ISO8601StringField, self).validate(value)
try:
dateutils.parse_iso8601_datetime(value)
except ISO8601Error, e:
self.error(str(e))
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:7,代码来源:fields.py
示例5: test_publish_history_start_date
def test_publish_history_start_date(self):
# Setup
self.repo_manager.create_repo('test_date')
self.distributor_manager.add_distributor('test_date', 'mock-distributor', {}, True,
distributor_id='test_dist')
# Create three consecutive publish entries
date_string = '2013-06-01T12:00:0%sZ'
for i in range(0, 6, 2):
r = RepoPublishResult.expected_result(
'test_date', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
# Verify
self.assertEqual(3, len(self.publish_manager.publish_history('test_date', 'test_dist')))
start_date = '2013-06-01T12:00:02Z'
start_entries = self.publish_manager.publish_history('test_date', 'test_dist',
start_date=start_date)
# Confirm the dates of the retrieved entries are later than or equal to the requested date
self.assertEqual(2, len(start_entries))
for entries in start_entries:
retrieved = dateutils.parse_iso8601_datetime(entries['started'])
given_start = dateutils.parse_iso8601_datetime(start_date)
self.assertTrue(retrieved >= given_start)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py
示例6: test_update_task_status
def test_update_task_status(self):
"""
Tests the successful operation of update_task_status().
"""
task_id = self.get_random_uuid()
queue = 'special_queue'
tags = ['test-tag1', 'test-tag2']
state = 'waiting'
TaskStatusManager.create_task_status(task_id, queue, tags, state)
now = datetime.now(dateutils.utc_tz())
start_time = dateutils.format_iso8601_datetime(now)
delta = {'start_time': start_time,
'state': 'running',
'disregard': 'ignored',
'progress_report': {'report-id': 'my-progress'}}
updated = TaskStatusManager.update_task_status(task_id, delta)
task_status = TaskStatusManager.find_by_task_id(task_id)
self.assertEqual(task_status['start_time'], delta['start_time'])
# Make sure that parse_iso8601_datetime is able to parse the start_time without errors
dateutils.parse_iso8601_datetime(task_status['start_time'])
self.assertEqual(task_status['state'], delta['state'])
self.assertEqual(task_status['progress_report'], delta['progress_report'])
self.assertEqual(task_status['queue'], queue)
self.assertEqual(updated['start_time'], delta['start_time'])
self.assertEqual(updated['state'], delta['state'])
self.assertEqual(updated['progress_report'], delta['progress_report'])
self.assertTrue('disregard' not in updated)
self.assertTrue('disregard' not in task_status)
开发者ID:aweiteka,项目名称:pulp,代码行数:30,代码来源:test_task_status_manager.py
示例7: test_get
def test_get(self, mock_path, mock_ok, mock_utils_get):
call = ScheduledCall('PT1M', 'pulp.tasks.frequent')
mock_utils_get.return_value = [call]
ret = self.controller._get(call.id)
schedule = mock_ok.call_args[0][0]
self.assertEqual(ret, mock_ok.return_value)
self.assertEqual(len(mock_ok.call_args[0]), 1)
# spot-check the schedule
self.assertEqual(schedule['_id'], call.id)
self.assertEqual(schedule['schedule'], 'PT1M')
self.assertEqual(schedule['task'], 'pulp.tasks.frequent')
self.assertEqual(schedule['_href'], mock_path.return_value)
# next_run is calculated on-demand, and there is a small chance that it
# will be re-calculated in the call.for_display() call as 1 second later
# than it was calculated above. Thus we will test that equality here
# with a tolerance of 1 second
for_display = call.for_display()
call_next_run = dateutils.parse_iso8601_datetime(call.next_run)
display_next_run = dateutils.parse_iso8601_datetime(for_display['next_run'])
self.assertTrue(display_next_run - call_next_run <= timedelta(seconds=1))
# now check overall equality with the actual for_display value
del schedule['_href']
del schedule['next_run']
del for_display['next_run']
self.assertEqual(schedule, for_display)
# make sure we called the manager layer correctly
mock_utils_get.assert_called_once_with([call.id])
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:33,代码来源:test_schedule.py
示例8: test_publish_history_ascending_sort
def test_publish_history_ascending_sort(self):
"""
Tests use the sort parameter to sort the results in ascending order by start time
"""
# Setup
self.repo_manager.create_repo('test_sort')
self.distributor_manager.add_distributor('test_sort', 'mock-distributor', {}, True,
distributor_id='test_dist')
# Create some consecutive publish entries
date_string = '2013-06-01T12:00:0%sZ'
for i in range(0, 10, 2):
r = RepoPublishResult.expected_result(
'test_sort', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
RepoPublishResult.get_collection().insert(r, safe=True)
# Test that returned entries are in ascending order by time
entries = self.publish_manager.publish_history('test_sort', 'test_dist',
sort=constants.SORT_ASCENDING)
self.assertEqual(5, len(entries))
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]['started'])
second = dateutils.parse_iso8601_datetime(entries[i + 1]['started'])
self.assertTrue(first < second)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py
示例9: test_sync_history_end_date
def test_sync_history_end_date(self):
"""
Tests the functionality of requesting sync history before a given date
"""
# Setup
self.repo_manager.create_repo('test_repo')
# A date string to fake some dates
date_string = '2013-06-01T12:00:0%sZ'
# Create 3 entries, with each date entry one second later
for i in range(0, 6, 2):
r = RepoSyncResult.expected_result('test_repo', 'foo', 'bar', date_string % str(i),
date_string % str(i + 1), 1, 1, 1, '', '',
RepoSyncResult.RESULT_SUCCESS)
RepoSyncResult.get_collection().save(r, safe=True)
# Verify three entries in test_repo
self.assertEqual(3, len(self.sync_manager.sync_history('test_repo')))
# Retrieve the first two entries
end_date = '2013-06-01T12:00:03Z'
end_entries = self.sync_manager.sync_history('test_repo', end_date=end_date)
# Confirm the dates of the retrieved entries are earlier than or equal to the requested date
self.assertEqual(2, len(end_entries))
for entry in end_entries:
retrieved = dateutils.parse_iso8601_datetime(entry['started'])
given_end = dateutils.parse_iso8601_datetime(end_date)
self.assertTrue(retrieved <= given_end)
开发者ID:beav,项目名称:pulp,代码行数:26,代码来源:test_sync.py
示例10: test_task_status_update
def test_task_status_update(self):
"""
Tests the successful operation of task status update.
"""
task_id = self.get_random_uuid()
worker_name = 'special_worker_name'
tags = ['test-tag1', 'test-tag2']
state = 'waiting'
TaskStatus(task_id, worker_name, tags, state).save()
now = datetime.now(dateutils.utc_tz())
start_time = dateutils.format_iso8601_datetime(now)
delta = {'start_time': start_time,
'state': 'running',
'progress_report': {'report-id': 'my-progress'}}
TaskStatus.objects(task_id=task_id).update_one(
set__start_time=delta['start_time'], set__state=delta['state'],
set__progress_report=delta['progress_report'])
task_status = TaskStatus.objects(task_id=task_id).first()
self.assertEqual(task_status['start_time'], delta['start_time'])
# Make sure that parse_iso8601_datetime is able to parse the start_time without errors
dateutils.parse_iso8601_datetime(task_status['start_time'])
self.assertEqual(task_status['state'], delta['state'])
self.assertEqual(task_status['progress_report'], delta['progress_report'])
self.assertEqual(task_status['worker_name'], worker_name)
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dispatch.py
示例11: test_publish_history_descending_sort
def test_publish_history_descending_sort(self):
"""
Tests use the sort parameter to sort the results in descending order by start time
"""
# Setup
self.repo_manager.create_repo("test_sort")
self.distributor_manager.add_distributor("test_sort", "mock-distributor", {}, True, distributor_id="test_dist")
# Create some consecutive publish entries
date_string = "2013-06-01T12:00:0%sZ"
for i in range(0, 10, 2):
r = RepoPublishResult.expected_result(
"test_sort",
"test_dist",
"bar",
date_string % str(i),
date_string % str(i + 1),
"test-summary",
"test-details",
RepoPublishResult.RESULT_SUCCESS,
)
RepoPublishResult.get_collection().insert(r, safe=True)
# Test that returned entries are in descending order by time
entries = self.publish_manager.publish_history("test_sort", "test_dist", sort=constants.SORT_DESCENDING)
self.assertEqual(5, len(entries))
for i in range(0, 4):
first = dateutils.parse_iso8601_datetime(entries[i]["started"])
second = dateutils.parse_iso8601_datetime(entries[i + 1]["started"])
self.assertTrue(first > second)
开发者ID:jdob,项目名称:pulp,代码行数:30,代码来源:test_repo_publish_manager.py
示例12: test_on_failure_handler
def test_on_failure_handler(self, mock_request):
"""
Make sure that overridden on_failure handler updates task status correctly
"""
exc = Exception()
task_id = str(uuid.uuid4())
args = [1, 'b', 'iii']
kwargs = {'1': 'for the money', 'tags': ['test_tags']}
class EInfo(object):
"""
on_failure handler expects an instance of celery's ExceptionInfo class
as one of the attributes. It stores string representation of traceback
in it's traceback instance variable. This is a stub to imitate that behavior.
"""
def __init__(self):
self.traceback = "string_repr_of_traceback"
einfo = EInfo()
mock_request.called_directly = False
task_status = TaskStatusManager.create_task_status(task_id, 'some_queue')
self.assertEqual(task_status['state'], 'waiting')
self.assertEqual(task_status['finish_time'], None)
self.assertEqual(task_status['traceback'], None)
task = tasks.Task()
task.on_failure(exc, task_id, args, kwargs, einfo)
new_task_status = TaskStatusManager.find_by_task_id(task_id)
self.assertEqual(new_task_status['state'], 'error')
self.assertFalse(new_task_status['finish_time'] is None)
# Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
self.assertEqual(new_task_status['traceback'], einfo.traceback)
开发者ID:jbennett7,项目名称:pulp,代码行数:34,代码来源:test_tasks.py
示例13: test_with_past_runs
def test_with_past_runs(self, mock_time):
# setup an hourly call that first ran not quite 2 hours ago, ran again
# less than one hour ago, and should be scheduled to run at the end of
# this hour
mock_time.return_value = 1389389758.547976
call = ScheduledCall('2014-01-10T20:00Z/PT1H', 'pulp.tasks.dosomething',
total_run_count=2, last_run_at='2014-01-10T21:00Z')
next_run = call.calculate_next_run()
self.assertEqual(dateutils.parse_iso8601_datetime('2014-01-10T22:00Z'),
dateutils.parse_iso8601_datetime(next_run))
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_dispatch.py
示例14: iso8601_datetime_validator
def iso8601_datetime_validator(x):
"""
Validates that a user-entered value is a correct iso8601 date
:param x: input value to be validated
:type x: str
:raise ValueError: if the input is not a valid iso8601 string
"""
try:
dateutils.parse_iso8601_datetime(x)
except Exception:
raise ValueError(_('value must be a valid iso8601 string (yyyy-mm-ddThh:mm:ssZ)'))
开发者ID:alanoe,项目名称:pulp,代码行数:13,代码来源:validators.py
示例15: _calculate_times
def _calculate_times(self):
"""
Calculates and returns several time-related values that tend to be needed
at the same time.
:return: tuple of numbers described below...
now_s: current time as seconds since the epoch
first_run_s: time of the first run as seconds since the epoch,
calculated based on self.first_run
since_first_s: how many seconds have elapsed since the first
run
run_every_s: how many seconds should elapse between runs of
this schedule
last_scheduled_run_s: the most recent time at which this
schedule should have run based on its schedule, as
seconds since the epoch
expected_runs: number of runs that should have happened based
on the first_run time and the interval
:rtype: tuple
"""
now_s = time.time()
first_run_dt = dateutils.to_utc_datetime(dateutils.parse_iso8601_datetime(self.first_run))
first_run_s = calendar.timegm(first_run_dt.utctimetuple())
since_first_s = now_s - first_run_s
run_every_s = timedelta_seconds(self.as_schedule_entry().schedule.run_every)
# don't want this to be negative
expected_runs = max(int(since_first_s / run_every_s), 0)
last_scheduled_run_s = first_run_s + expected_runs * run_every_s
return now_s, first_run_s, since_first_s, run_every_s, last_scheduled_run_s, expected_runs
开发者ID:aweiteka,项目名称:pulp,代码行数:31,代码来源:dispatch.py
示例16: from_progress_report
def from_progress_report(cls, report):
"""
Parses the output from the build_progress_report method into an instance
of this class. The intention is to use this client-side to reconstruct
the instance as it is retrieved from the server.
The build_final_report call on instances returned from this call will
not function as it requires the server-side conduit to be provided.
Additionally, any exceptions and tracebacks will be a text representation
instead of formal objects.
:param report: progress report retrieved from the server's task
:type report: dict
:return: instance populated with the state in the report
:rtype: ISOProgressReport
"""
# Restore the state transition times to datetime objects
for key, value in report['state_times'].items():
report['state_times'][key] = parse_iso8601_datetime(value)
# python 2.4 does not support unicode keys so convert to string
new_report = {}
for key in report:
new_report[str(key)] = report[key]
r = cls(None, **new_report)
return r
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:27,代码来源:progress.py
示例17: assert_last_sync_time
def assert_last_sync_time(time_in_iso):
now = dateutils.now_utc_datetime_with_tzinfo()
finished = dateutils.parse_iso8601_datetime(time_in_iso)
# Compare them within a threshold since they won't be exact
difference = now - finished
return difference.seconds < 2
开发者ID:beav,项目名称:pulp,代码行数:7,代码来源:test_sync.py
示例18: test_now
def test_now(self):
call = ScheduledCall('PT1H', 'pulp.tasks.dosomething')
now = datetime.utcnow().replace(tzinfo=dateutils.utc_tz())
next_run = dateutils.parse_iso8601_datetime(call.calculate_next_run())
self.assertTrue(next_run - now < timedelta(seconds=1))
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_dispatch.py
示例19: last_publish
def last_publish(self, repo_id, distributor_id):
"""
Returns the timestamp of the last publish call, regardless of its
success or failure. If the repo has never been published, returns None.
@param repo_id: identifies the repo
@type repo_id: str
@param distributor_id: identifies the repo's distributor
@type distributor_id: str
@return: timestamp of the last publish
@rtype: datetime or None
@raise MissingResource: if there is no distributor identified by the
given repo ID and distributor ID
"""
# Validation
coll = RepoDistributor.get_collection()
repo_distributor = coll.find_one({'repo_id' : repo_id, 'id' : distributor_id})
if repo_distributor is None:
raise MissingResource(repo_id)
# Convert to datetime instance
date_str = repo_distributor['last_publish']
if date_str is None:
return date_str
else:
instance = dateutils.parse_iso8601_datetime(date_str)
return instance
开发者ID:fdammeke,项目名称:pulp,代码行数:33,代码来源:publish.py
示例20: test_no_first_run
def test_no_first_run(self):
call = ScheduledCall('PT1M', 'pulp.tasks.dosomething')
first_run = dateutils.parse_iso8601_datetime(call.first_run)
# generously make sure the calculated first_run is within 1 second of now
now = datetime.utcnow().replace(tzinfo=dateutils.utc_tz())
self.assertTrue(abs(now - first_run) < timedelta(seconds=1))
开发者ID:aweiteka,项目名称:pulp,代码行数:8,代码来源:test_dispatch.py
注:本文中的pulp.common.dateutils.parse_iso8601_datetime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论