• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python dateutils.parse_iso8601_interval函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.common.dateutils.parse_iso8601_interval函数的典型用法代码示例。如果您正苦于以下问题:Python parse_iso8601_interval函数的具体用法?Python parse_iso8601_interval怎么用?Python parse_iso8601_interval使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_iso8601_interval函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_updated_scheduled_next_run

    def test_updated_scheduled_next_run(self):
        call_request = CallRequest(itinerary_call)
        interval = datetime.timedelta(minutes=2)
        now = datetime.datetime.now(tz=dateutils.utc_tz())
        old_schedule = dateutils.format_iso8601_interval(interval, now)

        scheduled_id = self.scheduler.add(call_request, old_schedule)

        self.assertNotEqual(scheduled_id, None)

        scheduled_call = self.scheduled_call_collection.find_one({'_id': ObjectId(scheduled_id)})

        self.assertNotEqual(scheduled_call, None)

        old_interval, start_time = dateutils.parse_iso8601_interval(old_schedule)[:2]
        start_time = dateutils.to_naive_utc_datetime(start_time)

        self.assertEqual(scheduled_call['last_run'], None)
        self.assertEqual(scheduled_call['first_run'], start_time + old_interval)
        self.assertEqual(scheduled_call['next_run'], start_time + old_interval)

        interval = datetime.timedelta(minutes=1)
        new_schedule = dateutils.format_iso8601_interval(interval, now)

        self.scheduler.update(scheduled_id, schedule=new_schedule)
        updated_scheduled_call = self.scheduled_call_collection.find_one({'_id': ObjectId(scheduled_id)})

        new_interval = dateutils.parse_iso8601_interval(new_schedule)[0]

        self.assertEqual(updated_scheduled_call['last_run'], None)
        self.assertEqual(updated_scheduled_call['first_run'], start_time + old_interval)
        self.assertEqual(updated_scheduled_call['next_run'], start_time + new_interval)
开发者ID:ashcrow,项目名称:pulp,代码行数:32,代码来源:test_dispatch_scheduler.py


示例2: is_valid_schedule

def is_valid_schedule(schedule):
    """
    Validate an iso8601 interval schedule.
    @param schedule: schedule string to validate
    @return: True if the schedule is valid, False otherwise
    @rtype:  bool
    """
    if not isinstance(schedule, basestring):
        return False
    try:
        dateutils.parse_iso8601_interval(schedule)
    except isodate.ISO8601Error:
        return False
    return True
开发者ID:jlsherrill,项目名称:pulp,代码行数:14,代码来源:scheduler.py


示例3: _load_repo_extras

def _load_repo_extras(repo, repos=None):
    config = get_config()
    repoapi = RepositoryAPI()
    repo["url"] = os.path.join(config.cds.baseurl, repo["relative_path"])

    repo["parent"] = None
    repo["children"] = []
    if repos is None:
        repos = getattr(threading.local(), "repos", dict())

    for repo2 in repos.values():
        if repo2 == repo:
            continue
        elif repo["id"] in repo2["clone_ids"]:
            # the clone_id attribute is broken, but we check it anyway
            # just in case it gets fixed some day
            repo["parent"] = repo2
        elif repo2["id"] in repo["clone_ids"]:
            repo["children"].append(repo2)
        elif (
            repo["source"] and repo["source"]["type"] == "local" and repo["source"]["url"].endswith("/%s" % repo2["id"])
        ):
            # the child syncs from a local repo that ends with
            # /<parent repo id>
            repo["parent"] = repo2
        elif (
            repo2["source"]
            and repo2["source"]["type"] == "local"
            and repo2["source"]["url"].endswith("/%s" % repo["id"])
        ):
            repo["children"].append(repo2)

    repo["keys"] = dict()
    for key in repoapi.listkeys(repo["id"]):
        repo["keys"][os.path.basename(key)] = "%s/%s" % (config.cds.keyurl, key)

    if repo["parent"]:
        repo["updates"] = has_updates(repo)

    if repo["last_sync"] and repo["sync_schedule"]:
        repo["next_sync"] = format_iso8601_datetime(
            parse_iso8601_datetime(repo["last_sync"]) + parse_iso8601_interval(repo["sync_schedule"])[0]
        )
    elif repo["sync_schedule"]:
        repo["next_sync"] = format_iso8601_datetime(parse_iso8601_interval(repo["sync_schedule"])[1])
    else:
        repo["next_sync"] = None

    repo["groupid"].sort()
开发者ID:stpierre,项目名称:sponge,代码行数:49,代码来源:repo.py


示例4: test_interval_recurrences

 def test_interval_recurrences(self):
     d = datetime.timedelta(hours=4, minutes=2, seconds=59)
     c = 4
     s = dateutils.format_iso8601_interval(d, recurrences=c)
     i, t, r = dateutils.parse_iso8601_interval(s)
     self.assertEqual(d, i)
     self.assertEqual(c, r)
开发者ID:ehelms,项目名称:pulp,代码行数:7,代码来源:test_dateutils.py


示例5: test_interval_start_time

 def test_interval_start_time(self):
     d = datetime.timedelta(minutes=2)
     t = datetime.datetime(year=2014, month=11, day=5, hour=0, minute=23)
     s = dateutils.format_iso8601_interval(d, t)
     i, e, r = dateutils.parse_iso8601_interval(s)
     self.assertEqual(d, i)
     self.assertEqual(t, e)
开发者ID:ehelms,项目名称:pulp,代码行数:7,代码来源:test_dateutils.py


示例6: update

 def update(self, schedule_id, **schedule_updates):
     """
     Update a scheduled call request
     Valid schedule updates:
      * call_request
      * schedule
      * failure_threshold
      * remaining_runs
      * enabled
     @param schedule_id: id of the schedule for the call request
     @type  schedule_id: str
     @param schedule_updates: updates for scheduled call
     @type  schedule_updates: dict
     """
     if isinstance(schedule_id, basestring):
         schedule_id = ObjectId(schedule_id)
     scheduled_call_collection = ScheduledCall.get_collection()
     if scheduled_call_collection.find_one(schedule_id) is None:
         raise pulp_exceptions.MissingResource(schedule=str(schedule_id))
     validate_schedule_updates(schedule_updates)
     call_request = schedule_updates.pop('call_request', None)
     if call_request is not None:
         schedule_updates['serialized_call_request'] = call_request.serialize()
     schedule = schedule_updates.get('schedule', None)
     if schedule is not None:
         interval, start, runs = dateutils.parse_iso8601_interval(schedule)
         schedule_updates.setdefault('remaining_runs', runs) # honor explicit update
         # XXX (jconnor) it'd be nice to update the next_run if the schedule
         # has changed, but it requires mucking with the internals of the
         # of the scheduled call instance, which is all encapsulated in the
         # ScheduledCall constructor
         # the next_run field will be correctly updated after the next run
     scheduled_call_collection.update({'_id': schedule_id}, {'$set': schedule_updates}, safe=True)
开发者ID:ehelms,项目名称:pulp,代码行数:33,代码来源:scheduler.py


示例7: interval_iso6801_validator

def interval_iso6801_validator(x):
    """
    Validates that a user-entered value is a correct iso8601 date with
    an interval. This call will raise an exception to be passed to the CLI
    framework if it is invalid; there is no return otherwise.

    :param x: input value to be validated
    :type  x: str
    """

    # These are meant to be used with okaara which expects either ValueError or
    # TypeError for a graceful failure, so catch any parsing errors and raise
    # the appropriate new error.
    try:
        dateutils.parse_iso8601_interval(x)
    except Exception:
        raise ValueError(_('value must be a valid iso8601 string with an interval'))
开发者ID:pkilambi,项目名称:pulp,代码行数:17,代码来源:validators.py


示例8: test_interval_full

 def test_interval_full(self):
     i1 = datetime.timedelta(hours=100)
     t1 = datetime.datetime(year=2, month=6, day=20, hour=2, minute=22, second=46)
     r1 = 5
     s = dateutils.format_iso8601_interval(i1, t1, r1)
     i2, t2, r2 = dateutils.parse_iso8601_interval(s)
     self.assertEqual(i1, i2)
     self.assertEqual(t1, t2)
     self.assertEqual(r1, r2)
开发者ID:ehelms,项目名称:pulp,代码行数:9,代码来源:test_dateutils.py


示例9: interval_iso6801_validator

def interval_iso6801_validator(x):
    """
    Validates that a user-entered value is a correct iso8601 date with
    an interval.

    :param x: input value to be validated
    :type  x: str

    :raise ValueError: if the input is not a valid iso8601 string
    """

    # These are meant to be used with okaara which expects either ValueError or
    # TypeError for a graceful failure, so catch any parsing errors and raise
    # the appropriate new error.
    try:
        dateutils.parse_iso8601_interval(x)
    except Exception:
        raise ValueError(_('value must be a valid iso8601 string with an interval'))
开发者ID:alanoe,项目名称:pulp,代码行数:18,代码来源:validators.py


示例10: convert_schedule

def convert_schedule(save_func, call):
    """
    Converts one scheduled call from the old schema to the new

    :param save_func:   a function that takes one parameter, a dictionary that
                        represents the scheduled call in its new schema. This
                        function should save the call to the database.
    :type  save_func:   function
    :param call:        dictionary representing the scheduled call in its old
                        schema
    :type  call:        dict
    """
    call.pop('call_exit_states', None)
    call['total_run_count'] = call.pop('call_count')

    call['iso_schedule'] = call['schedule']
    interval, start_time, occurrences = dateutils.parse_iso8601_interval(call['schedule'])
    # this should be a pickled instance of celery.schedules.schedule
    call['schedule'] = pickle.dumps(schedule(interval))

    call_request = call.pop('serialized_call_request')
    # we are no longer storing these pickled.
    # these are cast to a string because python 2.6 sometimes fails to
    # deserialize json from unicode.
    call['args'] = pickle.loads(str(call_request['args']))
    call['kwargs'] = pickle.loads(str(call_request['kwargs']))
    # keeping this pickled because we don't really know how to use it yet
    call['principal'] = call_request['principal']
    # this always get calculated on-the-fly now
    call.pop('next_run', None)
    first_run = call['first_run'].replace(tzinfo=dateutils.utc_tz())
    call['first_run'] = dateutils.format_iso8601_datetime(first_run)
    last_run = call.pop('last_run')
    if last_run:
        last_run_at = last_run.replace(tzinfo=dateutils.utc_tz())
        call['last_run_at'] = dateutils.format_iso8601_datetime(last_run_at)
    else:
        call['last_run_at'] = None
    call['task'] = NAMES_TO_TASKS[call_request['callable_name']]

    # this is a new field that is used to determine when the scheduler needs to
    # re-read the collection of schedules.
    call['last_updated'] = time.time()

    # determine if this is a consumer-related schedule, which we can only identify
    # by the consumer resource tag. If it is, save that tag value in the new
    # "resource" field, which is the new way that we will identify the
    # relationship between a schedule and some other object. This is not
    # necessary for repos, because we have a better method above for identifying
    # them (move_scheduled_syncs).
    tags = call_request.get('tags', [])
    for tag in tags:
        if tag.startswith('pulp:consumer:'):
            call['resource'] = tag
            break

    save_func(call)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:57,代码来源:0007_scheduled_task_conversion.py


示例11: test_future

    def test_future(self, mock_time):
        mock_time.return_value = 1389307330.966561
        call = ScheduledCall('2014-01-19T17:15Z/PT1H', 'pulp.tasks.dosomething')

        next_run = call.calculate_next_run()

        # make sure the next run is equal to the specified first run.
        # don't want to compare a generated ISO8601 string directly, because there
        # could be subtle variations that are valid but break string equality.
        self.assertEqual(dateutils.parse_iso8601_interval(call.iso_schedule)[1],
                         dateutils.parse_iso8601_datetime(next_run))
开发者ID:aweiteka,项目名称:pulp,代码行数:11,代码来源:test_dispatch.py


示例12: _calculate_next_run

def _calculate_next_run(scheduled_call):
    # rip-off from scheduler module
    if scheduled_call['remaining_runs'] == 0:
        return None
    last_run = scheduled_call['last_run']
    if last_run is None:
        return scheduled_call['first_run']
    now = datetime.utcnow()
    interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]
    next_run = last_run
    while next_run < now:
        next_run = dateutils.add_interval_to_datetime(interval, next_run)
    return next_run
开发者ID:bartwo,项目名称:pulp,代码行数:13,代码来源:all_repos.py


示例13: update

def update(schedule_id, delta):
    """
    Updates the schedule with unique ID schedule_id. This only allows updating
    of fields in ScheduledCall.USER_UPDATE_FIELDS.

    :param schedule_id: a unique ID for a schedule
    :type  schedule_id: basestring
    :param delta:       a dictionary of keys with values that should be modified
                        on the schedule.
    :type  delta:       dict

    :return:    instance of ScheduledCall representing the post-update state
    :rtype      ScheduledCall

    :raise  exceptions.UnsupportedValue
    :raise  exceptions.MissingResource
    """
    unknown_keys = set(delta.keys()) - ScheduledCall.USER_UPDATE_FIELDS
    if unknown_keys:
        raise exceptions.UnsupportedValue(list(unknown_keys))

    delta['last_updated'] = time.time()

    # bz 1139703 - if we update iso_schedule, update the pickled object as well
    if 'iso_schedule' in delta:
        interval, start_time, occurrences = dateutils.parse_iso8601_interval(delta['iso_schedule'])
        delta['schedule'] = pickle.dumps(CelerySchedule(interval))

        # set first_run and next_run so that the schedule update will take effect
        new_schedule_call = ScheduledCall(delta['iso_schedule'], 'dummytaskname')
        delta['first_run'] = new_schedule_call.first_run
        delta['next_run'] = new_schedule_call.next_run

    try:
        spec = {'_id': ObjectId(schedule_id)}
    except InvalidId:
        # During schedule update, MissingResource should be raised even if
        # schedule_id is invalid object_id.
        raise exceptions.MissingResource(schedule_id=schedule_id)
    schedule = ScheduledCall.get_collection().find_and_modify(
        query=spec, update={'$set': delta}, new=True)
    if schedule is None:
        raise exceptions.MissingResource(schedule_id=schedule_id)
    return ScheduledCall.from_db(schedule)
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:44,代码来源:utils.py


示例14: update

    def update(self, schedule_id, **updated_schedule_options):
        """
        Update and existing scheduled call.

        Supported update schedule options:

         * call_request: new itinerary call request instance
         * schedule: new ISO8601 interval string
         * failure_threshold: new failure threshold integer
         * remaining_runs: new remaining runs count integer
         * enabled: new enabled flag boolean

        :param schedule_id: unique identifier of the scheduled call
        :type  schedule_id: str or pulp.server.compat.ObjectID
        :param updated_schedule_options: updated options for this scheduled call
        :raises: pulp.server.exceptions.MissingResource if the corresponding scheduled call does not exist
        :raises: pulp.server.exceptions.UnsupportedValue if unsupported schedule options are passed in
        :raises: pulp.server.exceptions.InvalidValue if any of the options are invalid
        """

        if isinstance(schedule_id, basestring):
            schedule_id = ObjectId(schedule_id)

        if self.scheduled_call_collection.find_one(schedule_id) is None:
            raise pulp_exceptions.MissingResource(schedule=str(schedule_id))

        validate_updated_schedule_options(updated_schedule_options)

        call_request = updated_schedule_options.pop('call_request', None)

        if isinstance(call_request, call.CallRequest):
            updated_schedule_options['serialized_call_request'] = call_request.serialize()

        schedule = updated_schedule_options.get('schedule', None)

        if schedule is not None:
            runs = dateutils.parse_iso8601_interval(schedule)[2]
            updated_schedule_options.setdefault('remaining_runs', runs) # honor explicit update
            updated_schedule_options['next_run'] = self.calculate_first_run(schedule)

        self.scheduled_call_collection.update({'_id': schedule_id}, {'$set': updated_schedule_options}, safe=True)
开发者ID:ashcrow,项目名称:pulp,代码行数:41,代码来源:scheduler.py


示例15: calculate_next_run

    def calculate_next_run(self, scheduled_call):
        """
        Calculate the next run datetime of a scheduled call
        @param scheduled_call: scheduled call to schedule
        @type  scheduled_call: dict
        @return: datetime of scheduled call's next run or None if there is no next run
        @rtype:  datetime.datetime or None
        """
        if scheduled_call['remaining_runs'] == 0:
            return None

        last_run = scheduled_call['last_run']
        if last_run is None:
            return scheduled_call['first_run'] # this was calculated by the model constructor

        now = datetime.datetime.utcnow()
        interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]
        next_run = last_run
        while next_run < now:
            next_run += interval
        return next_run
开发者ID:jlsherrill,项目名称:pulp,代码行数:21,代码来源:scheduler.py


示例16: _is_valid_schedule

def _is_valid_schedule(schedule):
    """
    Test that a schedule string is in the ISO8601 interval format

    :param schedule: schedule string
    :type schedule: str
    :return: True if the schedule is in the ISO8601 format, False otherwise
    :rtype:  bool
    """

    if not isinstance(schedule, basestring):
        return False

    try:
        interval, start_time, runs = dateutils.parse_iso8601_interval(schedule)

    except isodate.ISO8601Error:
        return False

    if runs is not None and runs <= 0:
        return False

    return True
开发者ID:CUXIDUMDUM,项目名称:pulp,代码行数:23,代码来源:utils.py


示例17: __init__

    def __init__(self, call_request, schedule, failure_threshold=None, last_run=None, enabled=True):
        super(ScheduledCall, self).__init__()

        schedule_tag = resource_tag(dispatch_constants.RESOURCE_SCHEDULE_TYPE, str(self._id))
        call_request.tags.append(schedule_tag)
        interval, start, runs = dateutils.parse_iso8601_interval(schedule)
        now = datetime.utcnow()
        zero = timedelta(seconds=0)
        start = start and dateutils.to_naive_utc_datetime(start)

        self.serialized_call_request = call_request.serialize()
        self.schedule = schedule
        self.failure_threshold = failure_threshold
        self.consecutive_failures = 0
        self.first_run = start or now
        # NOTE using != because ordering comparison with a Duration is not allowed
        while interval != zero and self.first_run <= now:
            # try to schedule the first run in the future
            self.first_run = dateutils.add_interval_to_datetime(interval, self.first_run)
        self.last_run = last_run and dateutils.to_naive_utc_datetime(last_run)
        self.next_run = None # will calculated and set by the scheduler
        self.remaining_runs = runs
        self.enabled = enabled
开发者ID:domcleal,项目名称:pulp,代码行数:23,代码来源:dispatch.py


示例18: _insert_scheduled_v2_repo

    def _insert_scheduled_v2_repo(self, repo_id, schedule):
        importer_id = ObjectId()
        schedule_id = ObjectId()

        importer_doc = {'importer_id': importer_id,
                        'importer_type_id': yum_repos.YUM_IMPORTER_TYPE_ID,
                        'scheduled_syncs': [str(schedule_id)]}
        self.tmp_test_db.database.repo_importers.update({'repo_id': repo_id}, {'$set': importer_doc}, safe=True)

        call_request = CallRequest(sync_with_auto_publish_itinerary, [repo_id], {'overrides': {}})
        interval, start, recurrences = dateutils.parse_iso8601_interval(schedule)
        scheduled_call_doc = {'_id': schedule_id,
                              'id': str(schedule_id),
                              'serialized_call_request': call_request.serialize(),
                              'schedule': schedule,
                              'failure_threshold': None,
                              'consecutive_failures': 0,
                              'first_run': start or datetime.datetime.utcnow(),
                              'next_run': None,
                              'last_run': None,
                              'remaining_runs': recurrences,
                              'enabled': True}
        scheduled_call_doc['next_run'] = all_repos._calculate_next_run(scheduled_call_doc)
        self.tmp_test_db.database.scheduled_calls.insert(scheduled_call_doc, safe=True)
开发者ID:domcleal,项目名称:pulp,代码行数:24,代码来源:test_db_all_repos.py


示例19: calculate_next_run

    def calculate_next_run(scheduled_call):
        """
        Given a schedule call, calculate when it should be run next.

        :param scheduled_call: scheduled call
        :type  scheduled_call: bson.BSON or pulp.server.db.model.dispatch.ScheduledCall
        :return: when the scheduled call should be run next
        :rtype:  datetime.datetime
        """

        last_run = scheduled_call['last_run']

        if last_run is None:
            return scheduled_call['first_run']

        now = datetime.datetime.utcnow()
        interval = dateutils.parse_iso8601_interval(scheduled_call['schedule'])[0]

        next_run = last_run

        while next_run < now:
            next_run = dateutils.add_interval_to_datetime(interval, next_run)

        return next_run
开发者ID:ashcrow,项目名称:pulp,代码行数:24,代码来源:scheduler.py


示例20: __init__

    def __init__(self, call_request, schedule, failure_threshold=None, last_run=None, enabled=True):
        super(ScheduledCall, self).__init__()

        # add custom scheduled call tag to call request
        schedule_tag = resource_tag(dispatch_constants.RESOURCE_SCHEDULE_TYPE, str(self._id))
        call_request.tags.append(schedule_tag)

        self.serialized_call_request = call_request.serialize()

        self.schedule = schedule
        self.enabled = enabled

        self.failure_threshold = failure_threshold
        self.consecutive_failures = 0

        # scheduling fields
        self.first_run = None # will be calculated and set by the scheduler
        self.last_run = last_run and dateutils.to_naive_utc_datetime(last_run)
        self.next_run = None # will be calculated and set by the scheduler
        self.remaining_runs = dateutils.parse_iso8601_interval(schedule)[2]

        # run-time call group metadata for tracking success or failure
        self.call_count = 0
        self.call_exit_states = []
开发者ID:ashcrow,项目名称:pulp,代码行数:24,代码来源:dispatch.py



注:本文中的pulp.common.dateutils.parse_iso8601_interval函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python dateutils.utc_tz函数代码示例发布时间:2022-05-25
下一篇:
Python dateutils.parse_iso8601_datetime函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap