本文整理汇总了Python中pulp.common.dateutils.parse_iso8601_interval函数的典型用法代码示例。如果您正苦于以下问题:Python parse_iso8601_interval函数的具体用法?Python parse_iso8601_interval怎么用?Python parse_iso8601_interval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了parse_iso8601_interval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_updated_scheduled_next_run
def test_updated_scheduled_next_run(self):
call_request = CallRequest(itinerary_call)
interval = datetime.timedelta(minutes=2)
now = datetime.datetime.now(tz=dateutils.utc_tz())
old_schedule = dateutils.format_iso8601_interval(interval, now)
scheduled_id = self.scheduler.add(call_request, old_schedule)
self.assertNotEqual(scheduled_id, None)
scheduled_call = self.scheduled_call_collection.find_one({'_id': ObjectId(scheduled_id)})
self.assertNotEqual(scheduled_call, None)
old_interval, start_time = dateutils.parse_iso8601_interval(old_schedule)[:2]
start_time = dateutils.to_naive_utc_datetime(start_time)
self.assertEqual(scheduled_call['last_run'], None)
self.assertEqual(scheduled_call['first_run'], start_time + old_interval)
self.assertEqual(scheduled_call['next_run'], start_time + old_interval)
interval = datetime.timedelta(minutes=1)
new_schedule = dateutils.format_iso8601_interval(interval, now)
self.scheduler.update(scheduled_id, schedule=new_schedule)
updated_scheduled_call = self.scheduled_call_collection.find_one({'_id': ObjectId(scheduled_id)})
new_interval = dateutils.parse_iso8601_interval(new_schedule)[0]
self.assertEqual(updated_scheduled_call['last_run'], None)
self.assertEqual(updated_scheduled_call['first_run'], start_time + old_interval)
self.assertEqual(updated_scheduled_call['next_run'], start_time + new_interval)
开发者ID:ashcrow,项目名称:pulp,代码行数:32,代码来源:test_dispatch_scheduler.py
示例2: is_valid_schedule
def is_valid_schedule(schedule):
"""
Validate an iso8601 interval schedule.
@param schedule: schedule string to validate
@return: True if the schedule is valid, False otherwise
@rtype: bool
"""
if not isinstance(schedule, basestring):
return False
try:
dateutils.parse_iso8601_interval(schedule)
except isodate.ISO8601Error:
return False
return True
开发者ID:jlsherrill,项目名称:pulp,代码行数:14,代码来源:scheduler.py
示例3: _load_repo_extras
def _load_repo_extras(repo, repos=None):
config = get_config()
repoapi = RepositoryAPI()
repo["url"] = os.path.join(config.cds.baseurl, repo["relative_path"])
repo["parent"] = None
repo["children"] = []
if repos is None:
repos = getattr(threading.local(), "repos", dict())
for repo2 in repos.values():
if repo2 == repo:
continue
elif repo["id"] in repo2["clone_ids"]:
# the clone_id attribute is broken, but we check it anyway
# just in case it gets fixed some day
repo["parent"] = repo2
elif repo2["id"] in repo["clone_ids"]:
repo["children"].append(repo2)
elif (
repo["source"] and repo["source"]["type"] == "local" and repo["source"]["url"].endswith("/%s" % repo2["id"])
):
# the child syncs from a local repo that ends with
# /<parent repo id>
repo["parent"] = repo2
elif (
repo2["source"]
and repo2["source"]["type"] == "local"
and repo2["source"]["url"].endswith("/%s" % repo["id"])
):
repo["children"].append(repo2)
repo["keys"] = dict()
for key in repoapi.listkeys(repo["id"]):
repo["keys"][os.path.basename(key)] = "%s/%s" % (config.cds.keyurl, key)
if repo["parent"]:
repo["updates"] = has_updates(repo)
if repo["last_sync"] and repo["sync_schedule"]:
repo["next_sync"] = format_iso8601_datetime(
parse_iso8601_datetime(repo["last_sync"]) + parse_iso8601_interval(repo["sync_schedule"])[0]
)
elif repo["sync_schedule"]:
repo["next_sync"] = format_iso8601_datetime(parse_iso8601_interval(repo["sync_schedule"])[1])
else:
repo["next_sync"] = None
repo["groupid"].sort()
开发者ID:stpierre,项目名称:sponge,代码行数:49,代码来源:repo.py
示例4: test_interval_recurrences
def test_interval_recurrences(self):
d = datetime.timedelta(hours=4, minutes=2, seconds=59)
c = 4
s = dateutils.format_iso8601_interval(d, recurrences=c)
i, t, r = dateutils.parse_iso8601_interval(s)
self.assertEqual(d, i)
self.assertEqual(c, r)
开发者ID:ehelms,项目名称:pulp,代码行数:7,代码来源:test_dateutils.py
示例5: test_interval_start_time
def test_interval_start_time(self):
d = datetime.timedelta(minutes=2)
t = datetime.datetime(year=2014, month=11, day=5, hour=0, minute=23)
s = dateutils.format_iso8601_interval(d, t)
i, e, r = dateutils.parse_iso8601_interval(s)
self.assertEqual(d, i)
self.assertEqual(t, e)
开发者ID:ehelms,项目名称:pulp,代码行数:7,代码来源:test_dateutils.py
示例6: update
def update(self, schedule_id, **schedule_updates):
"""
Update a scheduled call request
Valid schedule updates:
* call_request
* schedule
* failure_threshold
* remaining_runs
* enabled
@param schedule_id: id of the schedule for the call request
@type schedule_id: str
@param schedule_updates: updates for scheduled call
@type schedule_updates: dict
"""
if isinstance(schedule_id, basestring):
schedule_id = ObjectId(schedule_id)
scheduled_call_collection = ScheduledCall.get_collection()
if scheduled_call_collection.find_one(schedule_id) is None:
raise pulp_exceptions.MissingResource(schedule=str(schedule_id))
validate_schedule_updates(schedule_updates)
call_request = schedule_updates.pop('call_request', None)
if call_request is not None:
schedule_updates['serialized_call_request'] = call_request.serialize()
schedule = schedule_updates.get('schedule', None)
if schedule is not None:
interval, start, runs = dateutils.parse_iso8601_interval(schedule)
schedule_updates.setdefault('remaining_runs', runs) # honor explicit update
# XXX (jconnor) it'd be nice to update the next_run if the schedule
# has changed, but it requires mucking with the internals of the
# of the scheduled call instance, which is all encapsulated in the
# ScheduledCall constructor
# the next_run field will be correctly updated after the next run
scheduled_call_collection.update({'_id': schedule_id}, {'$set': schedule_updates}, safe=True)
开发者ID:ehelms,项目名称:pulp,代码行数:33,代码来源:scheduler.py
示例7: interval_iso6801_validator
def interval_iso6801_validator(x):
"""
Validates that a user-entered value is a correct iso8601 date with
an interval. This call will raise an exception to be passed to the CLI
framework if it is invalid; there is no return otherwise.
:param x: input value to be validated
:type x: str
"""
# These are meant to be used with okaara which expects either ValueError or
# TypeError for a graceful failure, so catch any parsing errors and raise
# the appropriate new error.
try:
dateutils.parse_iso8601_interval(x)
except Exception:
raise ValueError(_('value must be a valid iso8601 string with an interval'))
开发者ID:pkilambi,项目名称:pulp,代码行数:17,代码来源:validators.py
示例8: test_interval_full
def test_interval_full(self):
i1 = datetime.timedelta(hours=100)
t1 = datetime.datetime(year=2, month=6, day=20, hour=2, minute=22, second=46)
r1 = 5
s = dateutils.format_iso8601_interval(i1, t1, r1)
i2, t2, r2 = dateutils.parse_iso8601_interval(s)
self.assertEqual(i1, i2)
self.assertEqual(t1, t2)
self.assertEqual(r1, r2)
开发者ID:ehelms,项目名称:pulp,代码行数:9,代码来源:test_dateutils.py
示例9: interval_iso6801_validator
def interval_iso6801_validator(x):
"""
Validates that a user-entered value is a correct iso8601 date with
an interval.
:param x: input value to be validated
:type x: str
:raise ValueError: if the input is not a valid iso8601 string
"""
# These are meant to be used with okaara which expects either ValueError or
# TypeError for a graceful failure, so catch any parsing errors and raise
# the appropriate new error.
try:
dateutils.parse_iso8601_interval(x)
except Exception:
raise ValueError(_('value must be a valid iso8601 string with an interval'))
开发者ID:alanoe,项目名称:pulp,代码行数:18,代码来源:validators.py
示例10: convert_schedule
def convert_schedule(save_func, call):
"""
Converts one scheduled call from the old schema to the new
:param save_func: a function that takes one parameter, a dictionary that
represents the scheduled call in its new schema. This
function should save the call to the database.
:type save_func: function
:param call: dictionary representing the scheduled call in its old
schema
:type call: dict
"""
call.pop('call_exit_states', None)
call['total_run_count'] = call.pop('call_count')
call['iso_schedule'] = call['schedule']
interval, start_time, occurrences = dateutils.parse_iso8601_interval(call['schedule'])
# this should be a pickled instance of celery.schedules.schedule
call['schedule'] = pickle.dumps(schedule(interval))
call_request = call.pop('serialized_call_request')
# we are no longer storing these pickled.
# these are cast to a string because python 2.6 sometimes fails to
# deserialize json from unicode.
call['args'] = pickle.loads(str(call_request['args']))
call['kwargs'] = pickle.loads(str(call_request['kwargs']))
# keeping this pickled because we don't really know how to use it yet
call['principal'] = call_request['principal']
# this always get calculated on-the-fly now
call.pop('next_run', None)
first_run = call['first_run'].replace(tzinfo=dateutils.utc_tz())
call['first_run'] = dateutils.format_iso8601_datetime(first_run)
last_run = call.pop('last_run')
if last_run:
last_run_at = last_run.replace(tzinfo=dateutils.utc_tz())
call['last_run_at'] = dateutils.format_iso8601_datetime(last_run_at)
else:
call['last_run_at'] = None
call['task'] = NAMES_TO_TASKS[call_request['callable_name']]
# this is a new field that is used to determine when the scheduler needs to
# re-read the collection of schedules.
call['last_updated'] = time.time()
# determine if this is a consumer-related schedule, which we can only identify
# by the consumer resource tag. If it is, save that tag value in the new
# "resource" field, which is the new way that we will identify the
# relationship between a schedule and some other object. This is not
# necessary for repos, because we have a better method above for identifying
# them (move_scheduled_syncs).
tags = call_request.get('tags', [])
for tag in tags:
if tag.startswith('pulp:consumer:'):
call['resource'] = tag
break
save_func(call)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:57,代码来源:0007_scheduled_task_conversion.py
示例11: test_future
def test_future(self, mock_time):
mock_time.return_value = 1389307330.966561
call = ScheduledCall('2014-01-19T17:15Z/PT1H', 'pulp.tasks.dosomething')
next_run = call.calculate_next_run()
# make sure the next run is equal to the specified first run.
# don't want to compare a generated ISO8601 string directly, because there
# could be subtle variations that are valid but break string equality.
self.assertEqual(dateutils.parse_iso8601_interval(call.iso_schedule)[1],
dateutils.parse_iso8601_datetime(next_run))
开发者ID:aweiteka,项目名称:pulp,代码行数:11,代码来源:test_dispatch.py
示例12: _calculate_next_run
def _calculate_next_run(scheduled_call):
# rip-off from scheduler module
if scheduled_call['remaining_runs'] == 0:
return None
last_run = scheduled_call['last_run']
if last_run is None:
return scheduled_call['first_run']
now = datetime.utcnow()
interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]
next_run = last_run
while next_run < now:
next_run = dateutils.add_interval_to_datetime(interval, next_run)
return next_run
开发者ID:bartwo,项目名称:pulp,代码行数:13,代码来源:all_repos.py
示例13: update
def update(schedule_id, delta):
"""
Updates the schedule with unique ID schedule_id. This only allows updating
of fields in ScheduledCall.USER_UPDATE_FIELDS.
:param schedule_id: a unique ID for a schedule
:type schedule_id: basestring
:param delta: a dictionary of keys with values that should be modified
on the schedule.
:type delta: dict
:return: instance of ScheduledCall representing the post-update state
:rtype ScheduledCall
:raise exceptions.UnsupportedValue
:raise exceptions.MissingResource
"""
unknown_keys = set(delta.keys()) - ScheduledCall.USER_UPDATE_FIELDS
if unknown_keys:
raise exceptions.UnsupportedValue(list(unknown_keys))
delta['last_updated'] = time.time()
# bz 1139703 - if we update iso_schedule, update the pickled object as well
if 'iso_schedule' in delta:
interval, start_time, occurrences = dateutils.parse_iso8601_interval(delta['iso_schedule'])
delta['schedule'] = pickle.dumps(CelerySchedule(interval))
# set first_run and next_run so that the schedule update will take effect
new_schedule_call = ScheduledCall(delta['iso_schedule'], 'dummytaskname')
delta['first_run'] = new_schedule_call.first_run
delta['next_run'] = new_schedule_call.next_run
try:
spec = {'_id': ObjectId(schedule_id)}
except InvalidId:
# During schedule update, MissingResource should be raised even if
# schedule_id is invalid object_id.
raise exceptions.MissingResource(schedule_id=schedule_id)
schedule = ScheduledCall.get_collection().find_and_modify(
query=spec, update={'$set': delta}, new=True)
if schedule is None:
raise exceptions.MissingResource(schedule_id=schedule_id)
return ScheduledCall.from_db(schedule)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:44,代码来源:utils.py
示例14: update
def update(self, schedule_id, **updated_schedule_options):
"""
Update and existing scheduled call.
Supported update schedule options:
* call_request: new itinerary call request instance
* schedule: new ISO8601 interval string
* failure_threshold: new failure threshold integer
* remaining_runs: new remaining runs count integer
* enabled: new enabled flag boolean
:param schedule_id: unique identifier of the scheduled call
:type schedule_id: str or pulp.server.compat.ObjectID
:param updated_schedule_options: updated options for this scheduled call
:raises: pulp.server.exceptions.MissingResource if the corresponding scheduled call does not exist
:raises: pulp.server.exceptions.UnsupportedValue if unsupported schedule options are passed in
:raises: pulp.server.exceptions.InvalidValue if any of the options are invalid
"""
if isinstance(schedule_id, basestring):
schedule_id = ObjectId(schedule_id)
if self.scheduled_call_collection.find_one(schedule_id) is None:
raise pulp_exceptions.MissingResource(schedule=str(schedule_id))
validate_updated_schedule_options(updated_schedule_options)
call_request = updated_schedule_options.pop('call_request', None)
if isinstance(call_request, call.CallRequest):
updated_schedule_options['serialized_call_request'] = call_request.serialize()
schedule = updated_schedule_options.get('schedule', None)
if schedule is not None:
runs = dateutils.parse_iso8601_interval(schedule)[2]
updated_schedule_options.setdefault('remaining_runs', runs) # honor explicit update
updated_schedule_options['next_run'] = self.calculate_first_run(schedule)
self.scheduled_call_collection.update({'_id': schedule_id}, {'$set': updated_schedule_options}, safe=True)
开发者ID:ashcrow,项目名称:pulp,代码行数:41,代码来源:scheduler.py
示例15: calculate_next_run
def calculate_next_run(self, scheduled_call):
"""
Calculate the next run datetime of a scheduled call
@param scheduled_call: scheduled call to schedule
@type scheduled_call: dict
@return: datetime of scheduled call's next run or None if there is no next run
@rtype: datetime.datetime or None
"""
if scheduled_call['remaining_runs'] == 0:
return None
last_run = scheduled_call['last_run']
if last_run is None:
return scheduled_call['first_run'] # this was calculated by the model constructor
now = datetime.datetime.utcnow()
interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]
next_run = last_run
while next_run < now:
next_run += interval
return next_run
开发者ID:jlsherrill,项目名称:pulp,代码行数:21,代码来源:scheduler.py
示例16: _is_valid_schedule
def _is_valid_schedule(schedule):
"""
Test that a schedule string is in the ISO8601 interval format
:param schedule: schedule string
:type schedule: str
:return: True if the schedule is in the ISO8601 format, False otherwise
:rtype: bool
"""
if not isinstance(schedule, basestring):
return False
try:
interval, start_time, runs = dateutils.parse_iso8601_interval(schedule)
except isodate.ISO8601Error:
return False
if runs is not None and runs <= 0:
return False
return True
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:23,代码来源:utils.py
示例17: __init__
def __init__(self, call_request, schedule, failure_threshold=None, last_run=None, enabled=True):
super(ScheduledCall, self).__init__()
schedule_tag = resource_tag(dispatch_constants.RESOURCE_SCHEDULE_TYPE, str(self._id))
call_request.tags.append(schedule_tag)
interval, start, runs = dateutils.parse_iso8601_interval(schedule)
now = datetime.utcnow()
zero = timedelta(seconds=0)
start = start and dateutils.to_naive_utc_datetime(start)
self.serialized_call_request = call_request.serialize()
self.schedule = schedule
self.failure_threshold = failure_threshold
self.consecutive_failures = 0
self.first_run = start or now
# NOTE using != because ordering comparison with a Duration is not allowed
while interval != zero and self.first_run <= now:
# try to schedule the first run in the future
self.first_run = dateutils.add_interval_to_datetime(interval, self.first_run)
self.last_run = last_run and dateutils.to_naive_utc_datetime(last_run)
self.next_run = None # will calculated and set by the scheduler
self.remaining_runs = runs
self.enabled = enabled
开发者ID:domcleal,项目名称:pulp,代码行数:23,代码来源:dispatch.py
示例18: _insert_scheduled_v2_repo
def _insert_scheduled_v2_repo(self, repo_id, schedule):
importer_id = ObjectId()
schedule_id = ObjectId()
importer_doc = {'importer_id': importer_id,
'importer_type_id': yum_repos.YUM_IMPORTER_TYPE_ID,
'scheduled_syncs': [str(schedule_id)]}
self.tmp_test_db.database.repo_importers.update({'repo_id': repo_id}, {'$set': importer_doc}, safe=True)
call_request = CallRequest(sync_with_auto_publish_itinerary, [repo_id], {'overrides': {}})
interval, start, recurrences = dateutils.parse_iso8601_interval(schedule)
scheduled_call_doc = {'_id': schedule_id,
'id': str(schedule_id),
'serialized_call_request': call_request.serialize(),
'schedule': schedule,
'failure_threshold': None,
'consecutive_failures': 0,
'first_run': start or datetime.datetime.utcnow(),
'next_run': None,
'last_run': None,
'remaining_runs': recurrences,
'enabled': True}
scheduled_call_doc['next_run'] = all_repos._calculate_next_run(scheduled_call_doc)
self.tmp_test_db.database.scheduled_calls.insert(scheduled_call_doc, safe=True)
开发者ID:domcleal,项目名称:pulp,代码行数:24,代码来源:test_db_all_repos.py
示例19: calculate_next_run
def calculate_next_run(scheduled_call):
"""
Given a schedule call, calculate when it should be run next.
:param scheduled_call: scheduled call
:type scheduled_call: bson.BSON or pulp.server.db.model.dispatch.ScheduledCall
:return: when the scheduled call should be run next
:rtype: datetime.datetime
"""
last_run = scheduled_call['last_run']
if last_run is None:
return scheduled_call['first_run']
now = datetime.datetime.utcnow()
interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]
next_run = last_run
while next_run < now:
next_run = dateutils.add_interval_to_datetime(interval, next_run)
return next_run
开发者ID:ashcrow,项目名称:pulp,代码行数:24,代码来源:scheduler.py
示例20: __init__
def __init__(self, call_request, schedule, failure_threshold=None, last_run=None, enabled=True):
super(ScheduledCall, self).__init__()
# add custom scheduled call tag to call request
schedule_tag = resource_tag(dispatch_constants.RESOURCE_SCHEDULE_TYPE, str(self._id))
call_request.tags.append(schedule_tag)
self.serialized_call_request = call_request.serialize()
self.schedule = schedule
self.enabled = enabled
self.failure_threshold = failure_threshold
self.consecutive_failures = 0
# scheduling fields
self.first_run = None # will be calculated and set by the scheduler
self.last_run = last_run and dateutils.to_naive_utc_datetime(last_run)
self.next_run = None # will be calculated and set by the scheduler
self.remaining_runs = dateutils.parse_iso8601_interval(schedule)[2]
# run-time call group metadata for tracking success or failure
self.call_count = 0
self.call_exit_states = []
开发者ID:ashcrow,项目名称:pulp,代码行数:24,代码来源:dispatch.py
注:本文中的pulp.common.dateutils.parse_iso8601_interval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论