• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python dateutils.parse_iso8601_datetime函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulp.common.dateutils.parse_iso8601_datetime函数的典型用法代码示例。如果您正苦于以下问题:Python parse_iso8601_datetime函数的具体用法?Python parse_iso8601_datetime怎么用?Python parse_iso8601_datetime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了parse_iso8601_datetime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_spawned_task_status

    def test_spawned_task_status(self, mock_request):
        async_result = AsyncResult('foo-id')

        retval = tasks.TaskResult(error=PulpException('error-foo'),
                                  result='bar')
        retval.spawned_tasks = [async_result]

        task_id = str(uuid.uuid4())
        args = [1, 'b', 'iii']
        kwargs = {'1': 'for the money', 'tags': ['test_tags'], 'routing_key': WORKER_2}
        mock_request.called_directly = False

        task_status = TaskStatus(task_id).save()
        self.assertEqual(task_status['state'], 'waiting')
        self.assertEqual(task_status['finish_time'], None)

        task = tasks.Task()
        task.on_success(retval, task_id, args, kwargs)

        new_task_status = TaskStatus.objects(task_id=task_id).first()
        self.assertEqual(new_task_status['state'], 'finished')
        self.assertEqual(new_task_status['result'], 'bar')
        self.assertEqual(new_task_status['error']['description'], 'error-foo')
        self.assertFalse(new_task_status['finish_time'] is None)
        # Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
        dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
        self.assertEqual(new_task_status['spawned_tasks'], ['foo-id'])
开发者ID:hgschmie,项目名称:pulp,代码行数:27,代码来源:test_tasks.py


示例2: test_on_success_handler_async_result

    def test_on_success_handler_async_result(self, mock_request):
        """
        Make sure that overridden on_success handler updates task status correctly
        """
        retval = AsyncResult('foo-id')

        task_id = str(uuid.uuid4())
        args = [1, 'b', 'iii']
        kwargs = {'1': 'for the money', 'tags': ['test_tags'], 'queue': RESERVED_WORKER_2}
        mock_request.called_directly = False

        task_status = TaskStatusManager.create_task_status(task_id, 'some_queue')
        self.assertEqual(task_status['state'], 'waiting')
        self.assertEqual(task_status['finish_time'], None)

        task = tasks.Task()
        task.on_success(retval, task_id, args, kwargs)

        new_task_status = TaskStatusManager.find_by_task_id(task_id)
        self.assertEqual(new_task_status['state'], 'finished')
        self.assertEqual(new_task_status['result'], None)
        self.assertFalse(new_task_status['finish_time'] is None)
        # Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
        dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
        self.assertEqual(new_task_status['spawned_tasks'], ['foo-id'])
开发者ID:jbennett7,项目名称:pulp,代码行数:25,代码来源:test_tasks.py


示例3: test_publish_history_end_date

    def test_publish_history_end_date(self):

        # Setup
        self.repo_manager.create_repo("test_date")
        self.distributor_manager.add_distributor("test_date", "mock-distributor", {}, True, distributor_id="test_dist")
        # Create three consecutive publish entries
        date_string = "2013-06-01T12:00:0%sZ"
        for i in range(0, 6, 2):
            r = RepoPublishResult.expected_result(
                "test_date",
                "test_dist",
                "bar",
                date_string % str(i),
                date_string % str(i + 1),
                "test-summary",
                "test-details",
                RepoPublishResult.RESULT_SUCCESS,
            )
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Verify that all entries retrieved have dates prior to the given end date
        end_date = "2013-06-01T12:00:03Z"
        end_entries = self.publish_manager.publish_history("test_date", "test_dist", end_date=end_date)
        # Confirm the dates of the retrieved entries are earlier than or equal to the requested date
        self.assertEqual(2, len(end_entries))
        for entries in end_entries:
            retrieved = dateutils.parse_iso8601_datetime(entries["started"])
            given_end = dateutils.parse_iso8601_datetime(end_date)
            self.assertTrue(retrieved <= given_end)
开发者ID:jdob,项目名称:pulp,代码行数:29,代码来源:test_repo_publish_manager.py


示例4: validate

    def validate(self, value):
        super(ISO8601StringField, self).validate(value)

        try:
            dateutils.parse_iso8601_datetime(value)
        except ISO8601Error, e:
            self.error(str(e))
开发者ID:BrnoPCmaniak,项目名称:pulp,代码行数:7,代码来源:fields.py


示例5: test_publish_history_start_date

    def test_publish_history_start_date(self):

        # Setup
        self.repo_manager.create_repo('test_date')
        self.distributor_manager.add_distributor('test_date', 'mock-distributor', {}, True,
                                                 distributor_id='test_dist')
        # Create three consecutive publish entries
        date_string = '2013-06-01T12:00:0%sZ'
        for i in range(0, 6, 2):
            r = RepoPublishResult.expected_result(
                'test_date', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
                'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Verify
        self.assertEqual(3, len(self.publish_manager.publish_history('test_date', 'test_dist')))
        start_date = '2013-06-01T12:00:02Z'
        start_entries = self.publish_manager.publish_history('test_date', 'test_dist',
                                                             start_date=start_date)
        # Confirm the dates of the retrieved entries are later than or equal to the requested date
        self.assertEqual(2, len(start_entries))
        for entries in start_entries:
            retrieved = dateutils.parse_iso8601_datetime(entries['started'])
            given_start = dateutils.parse_iso8601_datetime(start_date)
            self.assertTrue(retrieved >= given_start)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py


示例6: test_update_task_status

    def test_update_task_status(self):
        """
        Tests the successful operation of update_task_status().
        """
        task_id = self.get_random_uuid()
        queue = 'special_queue'
        tags = ['test-tag1', 'test-tag2']
        state = 'waiting'
        TaskStatusManager.create_task_status(task_id, queue, tags, state)
        now = datetime.now(dateutils.utc_tz())
        start_time = dateutils.format_iso8601_datetime(now)
        delta = {'start_time': start_time,
                 'state': 'running',
                 'disregard': 'ignored',
                 'progress_report': {'report-id': 'my-progress'}}

        updated = TaskStatusManager.update_task_status(task_id, delta)

        task_status = TaskStatusManager.find_by_task_id(task_id)
        self.assertEqual(task_status['start_time'], delta['start_time'])
        # Make sure that parse_iso8601_datetime is able to parse the start_time without errors
        dateutils.parse_iso8601_datetime(task_status['start_time'])
        self.assertEqual(task_status['state'], delta['state'])
        self.assertEqual(task_status['progress_report'], delta['progress_report'])
        self.assertEqual(task_status['queue'], queue)
        self.assertEqual(updated['start_time'], delta['start_time'])
        self.assertEqual(updated['state'], delta['state'])
        self.assertEqual(updated['progress_report'], delta['progress_report'])
        self.assertTrue('disregard' not in updated)
        self.assertTrue('disregard' not in task_status)
开发者ID:aweiteka,项目名称:pulp,代码行数:30,代码来源:test_task_status_manager.py


示例7: test_get

    def test_get(self, mock_path, mock_ok, mock_utils_get):
        call = ScheduledCall('PT1M', 'pulp.tasks.frequent')
        mock_utils_get.return_value = [call]

        ret = self.controller._get(call.id)
        schedule = mock_ok.call_args[0][0]

        self.assertEqual(ret, mock_ok.return_value)
        self.assertEqual(len(mock_ok.call_args[0]), 1)

        # spot-check the schedule
        self.assertEqual(schedule['_id'], call.id)
        self.assertEqual(schedule['schedule'], 'PT1M')
        self.assertEqual(schedule['task'], 'pulp.tasks.frequent')
        self.assertEqual(schedule['_href'], mock_path.return_value)

        # next_run is calculated on-demand, and there is a small chance that it
        # will be re-calculated in the call.for_display() call as 1 second later
        # than it was calculated above. Thus we will test that equality here
        # with a tolerance of 1 second
        for_display = call.for_display()
        call_next_run = dateutils.parse_iso8601_datetime(call.next_run)
        display_next_run = dateutils.parse_iso8601_datetime(for_display['next_run'])
        self.assertTrue(display_next_run - call_next_run <= timedelta(seconds=1))

        # now check overall equality with the actual for_display value
        del schedule['_href']
        del schedule['next_run']
        del for_display['next_run']
        self.assertEqual(schedule, for_display)

        # make sure we called the manager layer correctly
        mock_utils_get.assert_called_once_with([call.id])
开发者ID:AndreaGiardini,项目名称:pulp,代码行数:33,代码来源:test_schedule.py


示例8: test_publish_history_ascending_sort

    def test_publish_history_ascending_sort(self):
        """
        Tests use the sort parameter to sort the results in ascending order by start time
        """

        # Setup
        self.repo_manager.create_repo('test_sort')
        self.distributor_manager.add_distributor('test_sort', 'mock-distributor', {}, True,
                                                 distributor_id='test_dist')
        # Create some consecutive publish entries
        date_string = '2013-06-01T12:00:0%sZ'
        for i in range(0, 10, 2):
            r = RepoPublishResult.expected_result(
                'test_sort', 'test_dist', 'bar', date_string % str(i), date_string % str(i + 1),
                'test-summary', 'test-details', RepoPublishResult.RESULT_SUCCESS)
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Test that returned entries are in ascending order by time
        entries = self.publish_manager.publish_history('test_sort', 'test_dist',
                                                       sort=constants.SORT_ASCENDING)
        self.assertEqual(5, len(entries))
        for i in range(0, 4):
            first = dateutils.parse_iso8601_datetime(entries[i]['started'])
            second = dateutils.parse_iso8601_datetime(entries[i + 1]['started'])
            self.assertTrue(first < second)
开发者ID:credativ,项目名称:pulp,代码行数:25,代码来源:test_publish.py


示例9: test_sync_history_end_date

    def test_sync_history_end_date(self):
        """
        Tests the functionality of requesting sync history before a given date
        """
        # Setup
        self.repo_manager.create_repo('test_repo')
        # A date string to fake some dates
        date_string = '2013-06-01T12:00:0%sZ'
        # Create 3 entries, with each date entry one second later
        for i in range(0, 6, 2):
            r = RepoSyncResult.expected_result('test_repo', 'foo', 'bar', date_string % str(i),
                                               date_string % str(i + 1), 1, 1, 1, '', '',
                                               RepoSyncResult.RESULT_SUCCESS)
            RepoSyncResult.get_collection().save(r, safe=True)

        # Verify three entries in test_repo
        self.assertEqual(3, len(self.sync_manager.sync_history('test_repo')))
        # Retrieve the first two entries
        end_date = '2013-06-01T12:00:03Z'
        end_entries = self.sync_manager.sync_history('test_repo', end_date=end_date)
        # Confirm the dates of the retrieved entries are earlier than or equal to the requested date
        self.assertEqual(2, len(end_entries))
        for entry in end_entries:
            retrieved = dateutils.parse_iso8601_datetime(entry['started'])
            given_end = dateutils.parse_iso8601_datetime(end_date)
            self.assertTrue(retrieved <= given_end)
开发者ID:beav,项目名称:pulp,代码行数:26,代码来源:test_sync.py


示例10: test_task_status_update

    def test_task_status_update(self):
        """
        Tests the successful operation of task status update.
        """
        task_id = self.get_random_uuid()
        worker_name = 'special_worker_name'
        tags = ['test-tag1', 'test-tag2']
        state = 'waiting'
        TaskStatus(task_id, worker_name, tags, state).save()
        now = datetime.now(dateutils.utc_tz())
        start_time = dateutils.format_iso8601_datetime(now)
        delta = {'start_time': start_time,
                 'state': 'running',
                 'progress_report': {'report-id': 'my-progress'}}

        TaskStatus.objects(task_id=task_id).update_one(
            set__start_time=delta['start_time'], set__state=delta['state'],
            set__progress_report=delta['progress_report'])

        task_status = TaskStatus.objects(task_id=task_id).first()
        self.assertEqual(task_status['start_time'], delta['start_time'])
        # Make sure that parse_iso8601_datetime is able to parse the start_time without errors
        dateutils.parse_iso8601_datetime(task_status['start_time'])
        self.assertEqual(task_status['state'], delta['state'])
        self.assertEqual(task_status['progress_report'], delta['progress_report'])
        self.assertEqual(task_status['worker_name'], worker_name)
开发者ID:credativ,项目名称:pulp,代码行数:26,代码来源:test_dispatch.py


示例11: test_publish_history_descending_sort

    def test_publish_history_descending_sort(self):
        """
        Tests use the sort parameter to sort the results in descending order by start time
        """

        # Setup
        self.repo_manager.create_repo("test_sort")
        self.distributor_manager.add_distributor("test_sort", "mock-distributor", {}, True, distributor_id="test_dist")
        # Create some consecutive publish entries
        date_string = "2013-06-01T12:00:0%sZ"
        for i in range(0, 10, 2):
            r = RepoPublishResult.expected_result(
                "test_sort",
                "test_dist",
                "bar",
                date_string % str(i),
                date_string % str(i + 1),
                "test-summary",
                "test-details",
                RepoPublishResult.RESULT_SUCCESS,
            )
            RepoPublishResult.get_collection().insert(r, safe=True)

        # Test that returned entries are in descending order by time
        entries = self.publish_manager.publish_history("test_sort", "test_dist", sort=constants.SORT_DESCENDING)
        self.assertEqual(5, len(entries))
        for i in range(0, 4):
            first = dateutils.parse_iso8601_datetime(entries[i]["started"])
            second = dateutils.parse_iso8601_datetime(entries[i + 1]["started"])
            self.assertTrue(first > second)
开发者ID:jdob,项目名称:pulp,代码行数:30,代码来源:test_repo_publish_manager.py


示例12: test_on_failure_handler

    def test_on_failure_handler(self, mock_request):
        """
        Make sure that overridden on_failure handler updates task status correctly
        """
        exc = Exception()
        task_id = str(uuid.uuid4())
        args = [1, 'b', 'iii']
        kwargs = {'1': 'for the money', 'tags': ['test_tags']}

        class EInfo(object):
            """
            on_failure handler expects an instance of celery's ExceptionInfo class
            as one of the attributes. It stores string representation of traceback
            in it's traceback instance variable. This is a stub to imitate that behavior.
            """
            def __init__(self):
                self.traceback = "string_repr_of_traceback"
        einfo = EInfo()
        mock_request.called_directly = False

        task_status = TaskStatusManager.create_task_status(task_id, 'some_queue')
        self.assertEqual(task_status['state'], 'waiting')
        self.assertEqual(task_status['finish_time'], None)
        self.assertEqual(task_status['traceback'], None)

        task = tasks.Task()
        task.on_failure(exc, task_id, args, kwargs, einfo)

        new_task_status = TaskStatusManager.find_by_task_id(task_id)
        self.assertEqual(new_task_status['state'], 'error')
        self.assertFalse(new_task_status['finish_time'] is None)
        # Make sure that parse_iso8601_datetime is able to parse the finish_time without errors
        dateutils.parse_iso8601_datetime(new_task_status['finish_time'])
        self.assertEqual(new_task_status['traceback'], einfo.traceback)
开发者ID:jbennett7,项目名称:pulp,代码行数:34,代码来源:test_tasks.py


示例13: test_with_past_runs

    def test_with_past_runs(self, mock_time):
        # setup an hourly call that first ran not quite 2 hours ago, ran again
        # less than one hour ago, and should be scheduled to run at the end of
        # this hour
        mock_time.return_value = 1389389758.547976
        call = ScheduledCall('2014-01-10T20:00Z/PT1H', 'pulp.tasks.dosomething',
                             total_run_count=2, last_run_at='2014-01-10T21:00Z')

        next_run = call.calculate_next_run()

        self.assertEqual(dateutils.parse_iso8601_datetime('2014-01-10T22:00Z'),
                         dateutils.parse_iso8601_datetime(next_run))
开发者ID:aweiteka,项目名称:pulp,代码行数:12,代码来源:test_dispatch.py


示例14: iso8601_datetime_validator

def iso8601_datetime_validator(x):
    """
    Validates that a user-entered value is a correct iso8601 date

    :param x: input value to be validated
    :type x: str

    :raise ValueError: if the input is not a valid iso8601 string
    """
    try:
        dateutils.parse_iso8601_datetime(x)
    except Exception:
        raise ValueError(_('value must be a valid iso8601 string (yyyy-mm-ddThh:mm:ssZ)'))
开发者ID:alanoe,项目名称:pulp,代码行数:13,代码来源:validators.py


示例15: _calculate_times

    def _calculate_times(self):
        """
        Calculates and returns several time-related values that tend to be needed
        at the same time.

        :return:    tuple of numbers described below...
                    now_s: current time as seconds since the epoch
                    first_run_s: time of the first run as seconds since the epoch,
                        calculated based on self.first_run
                    since_first_s: how many seconds have elapsed since the first
                        run
                    run_every_s: how many seconds should elapse between runs of
                        this schedule
                    last_scheduled_run_s: the most recent time at which this
                        schedule should have run based on its schedule, as
                        seconds since the epoch
                    expected_runs: number of runs that should have happened based
                        on the first_run time and the interval
        :rtype:     tuple

        """
        now_s = time.time()
        first_run_dt = dateutils.to_utc_datetime(dateutils.parse_iso8601_datetime(self.first_run))
        first_run_s = calendar.timegm(first_run_dt.utctimetuple())
        since_first_s = now_s - first_run_s
        run_every_s = timedelta_seconds(self.as_schedule_entry().schedule.run_every)
        # don't want this to be negative
        expected_runs = max(int(since_first_s / run_every_s), 0)
        last_scheduled_run_s = first_run_s + expected_runs * run_every_s

        return now_s, first_run_s, since_first_s, run_every_s, last_scheduled_run_s, expected_runs
开发者ID:aweiteka,项目名称:pulp,代码行数:31,代码来源:dispatch.py


示例16: from_progress_report

    def from_progress_report(cls, report):
        """
        Parses the output from the build_progress_report method into an instance
        of this class. The intention is to use this client-side to reconstruct
        the instance as it is retrieved from the server.

        The build_final_report call on instances returned from this call will
        not function as it requires the server-side conduit to be provided.
        Additionally, any exceptions and tracebacks will be a text representation
        instead of formal objects.

        :param report: progress report retrieved from the server's task
        :type  report: dict
        :return:       instance populated with the state in the report
        :rtype:        ISOProgressReport
        """
        # Restore the state transition times to datetime objects
        for key, value in report['state_times'].items():
            report['state_times'][key] = parse_iso8601_datetime(value)

        # python 2.4 does not support unicode keys so convert to string
        new_report = {}
        for key in report:
            new_report[str(key)] = report[key]

        r = cls(None, **new_report)
        return r
开发者ID:ATIX-AG,项目名称:pulp_rpm,代码行数:27,代码来源:progress.py


示例17: assert_last_sync_time

def assert_last_sync_time(time_in_iso):
    now = dateutils.now_utc_datetime_with_tzinfo()
    finished = dateutils.parse_iso8601_datetime(time_in_iso)

    # Compare them within a threshold since they won't be exact
    difference = now - finished
    return difference.seconds < 2
开发者ID:beav,项目名称:pulp,代码行数:7,代码来源:test_sync.py


示例18: test_now

    def test_now(self):
        call = ScheduledCall('PT1H', 'pulp.tasks.dosomething')

        now = datetime.utcnow().replace(tzinfo=dateutils.utc_tz())
        next_run = dateutils.parse_iso8601_datetime(call.calculate_next_run())

        self.assertTrue(next_run - now < timedelta(seconds=1))
开发者ID:aweiteka,项目名称:pulp,代码行数:7,代码来源:test_dispatch.py


示例19: last_publish

    def last_publish(self, repo_id, distributor_id):
        """
        Returns the timestamp of the last publish call, regardless of its
        success or failure. If the repo has never been published, returns None.

        @param repo_id: identifies the repo
        @type  repo_id: str

        @param distributor_id: identifies the repo's distributor
        @type  distributor_id: str

        @return: timestamp of the last publish
        @rtype:  datetime or None

        @raise MissingResource: if there is no distributor identified by the
                given repo ID and distributor ID
        """

        # Validation
        coll = RepoDistributor.get_collection()
        repo_distributor = coll.find_one({'repo_id' : repo_id, 'id' : distributor_id})

        if repo_distributor is None:
            raise MissingResource(repo_id)

        # Convert to datetime instance
        date_str = repo_distributor['last_publish']

        if date_str is None:
            return date_str
        else:
            instance = dateutils.parse_iso8601_datetime(date_str)
            return instance
开发者ID:fdammeke,项目名称:pulp,代码行数:33,代码来源:publish.py


示例20: test_no_first_run

    def test_no_first_run(self):
        call = ScheduledCall('PT1M', 'pulp.tasks.dosomething')

        first_run = dateutils.parse_iso8601_datetime(call.first_run)

        # generously make sure the calculated first_run is within 1 second of now
        now = datetime.utcnow().replace(tzinfo=dateutils.utc_tz())
        self.assertTrue(abs(now - first_run) < timedelta(seconds=1))
开发者ID:aweiteka,项目名称:pulp,代码行数:8,代码来源:test_dispatch.py



注:本文中的pulp.common.dateutils.parse_iso8601_datetime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap