• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python times.format函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中times.format函数的典型用法代码示例。如果您正苦于以下问题:Python format函数的具体用法?Python format怎么用?Python format使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了format函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: save

    def save(self):
        """Persists the current job instance to its corresponding Redis key."""
        key = self.key

        obj = {}
        obj["created_at"] = times.format(self.created_at, "UTC")

        if self.func_name is not None:
            obj["data"] = dumps(self.job_tuple)
        if self.origin is not None:
            obj["origin"] = self.origin
        if self.description is not None:
            obj["description"] = self.description
        if self.enqueued_at is not None:
            obj["enqueued_at"] = times.format(self.enqueued_at, "UTC")
        if self.ended_at is not None:
            obj["ended_at"] = times.format(self.ended_at, "UTC")
        if self._result is not None:
            obj["result"] = dumps(self._result)
        if self.exc_info is not None:
            obj["exc_info"] = self.exc_info
        if self.timeout is not None:
            obj["timeout"] = self.timeout
        if self.result_ttl is not None:
            obj["result_ttl"] = self.result_ttl
        if self._status is not None:
            obj["status"] = self._status
        if self.meta:
            obj["meta"] = dumps(self.meta)

        self.connection.hmset(key, obj)
开发者ID:toxster,项目名称:rq,代码行数:31,代码来源:job.py


示例2: dump

    def dump(self):
        """Returns a serialization of the current job instance"""
        obj = {}
        obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')

        if self.func_name is not None:
            obj['data'] = dumps(self.job_tuple)
        if self.origin is not None:
            obj['origin'] = self.origin
        if self.description is not None:
            obj['description'] = self.description
        if self.enqueued_at is not None:
            obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
        if self.ended_at is not None:
            obj['ended_at'] = times.format(self.ended_at, 'UTC')
        if self._result is not None:
            obj['result'] = dumps(self._result)
        if self.exc_info is not None:
            obj['exc_info'] = self.exc_info
        if self.timeout is not None:
            obj['timeout'] = self.timeout
        if self.result_ttl is not None:
            obj['result_ttl'] = self.result_ttl
        if self._status is not None:
            obj['status'] = self._status
        if self._dependency_id is not None:
            obj['dependency_id'] = self._dependency_id
        if self.meta:
            obj['meta'] = dumps(self.meta)

        return obj
开发者ID:alonisser,项目名称:rq,代码行数:31,代码来源:job.py


示例3: on_callback

    def on_callback(self, request):
        if request.method != 'POST':
            request.respond('This hook only supports POST method.')
        else:
            if request.GET.get('secret', [None])[0] != self.bot.config.draftin_secret:
                request.respond('Wrong secret was specified')
            else:
                payload = anyjson.deserialize(request.POST['payload'][0])
                title = payload['name']
                content = payload['content']
                slug = slugify(title)
                created_at = times.to_universal(payload['created_at'])
                updated_at = times.to_universal(payload['updated_at'])
                timezone = self.bot.config.timezone

                with open(os.path.join(
                        self.bot.config.documents_dir,
                        slug + '.md'), 'w') as f:

                    post_content = self.template.format(title=title,
                                                        content=content,
                                                        slug=slug,
                                                        created_at=times.format(created_at, timezone, '%Y-%m-%d %H:%M'),
                                                        updated_at=times.format(updated_at, timezone, '%Y-%m-%d %H:%M'))
                    f.write(post_content.encode('utf-8'))
                    
                try:
                    subprocess.check_output(self.bot.config.update_command,
                                            stderr=subprocess.STDOUT,
                                            shell=True)
                except subprocess.CalledProcessError, e:
                    request.respond(u'I tried to update a blog, but there was an error: ' + e.output.encode('utf-8'))
                else:
                    request.respond('Done, published')
开发者ID:svetlyak40wt,项目名称:thebot-draftin,代码行数:34,代码来源:thebot_draftin.py


示例4: save

    def save(self):
        """Persists the current job instance to its corresponding Redis key."""
        key = self.key

        obj = {}
        obj['created_at'] = times.format(self.created_at, 'UTC')

        if self.func_name is not None:
            obj['data'] = dumps(self.job_tuple)
        if self.origin is not None:
            obj['origin'] = self.origin
        if self.description is not None:
            obj['description'] = self.description
        if self.enqueued_at is not None:
            obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
        if self.ended_at is not None:
            obj['ended_at'] = times.format(self.ended_at, 'UTC')
        if self._result is not None:
            obj['result'] = self._result
        if self.exc_info is not None:
            obj['exc_info'] = self.exc_info
        if self.timeout is not None:
            obj['timeout'] = self.timeout

        self.connection.hmset(key, obj)
开发者ID:MorsCode,项目名称:rq,代码行数:25,代码来源:job.py


示例5: save

    def save(self, pipeline=None):
        """Persists the current job instance to its corresponding Redis key."""
        key = self.key
        connection = pipeline if pipeline is not None else self.connection

        obj = {}
        obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')

        if self.func_name is not None:
            obj['data'] = dumps(self.job_tuple)
        if self.origin is not None:
            obj['origin'] = self.origin
        if self.description is not None:
            obj['description'] = self.description
        if self.enqueued_at is not None:
            obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
        if self.ended_at is not None:
            obj['ended_at'] = times.format(self.ended_at, 'UTC')
        if self._result is not None:
            obj['result'] = dumps(self._result)
        if self.exc_info is not None:
            obj['exc_info'] = self.exc_info
        if self.timeout is not None:
            obj['timeout'] = self.timeout
        if self.result_ttl is not None:
            obj['result_ttl'] = self.result_ttl
        if self._status is not None:
            obj['status'] = self._status
        if self.meta:
            obj['meta'] = dumps(self.meta)

        connection.hmset(key, obj)
开发者ID:sempr,项目名称:rq,代码行数:32,代码来源:job.py


示例6: test_clean_rq

 def test_clean_rq(self):
     r = get_redis_connection()
     self.assertEqual(len(r.keys("rq:job:*")), 0)
     r.hmset("rq:job:abc", {"bar": "baz"})
     r.hmset("rq:job:def", {"created_at": times.format(times.now(), "UTC")})
     r.hmset("rq:job:123", {"created_at": times.format(times.now() - timedelta(days=10), "UTC")})
     self.assertEqual(len(r.keys("rq:job:*")), 3)
     call_command("clean_rq")
     self.assertEqual(len(r.keys("rq:job:*")), 2)
开发者ID:bdyck,项目名称:feedhq,代码行数:9,代码来源:test_update_queue.py


示例7: test_format_without_tzinfo

 def test_format_without_tzinfo(self):
     """Format times without timezone info"""
     dt = self.sometime_univ
     auckland = pytz.timezone('Pacific/Auckland')
     est = pytz.timezone('EST')
     ams = pytz.timezone('Europe/Amsterdam')
     self.assertEquals(times.format(dt, auckland), '2012-02-02 00:56:31+1300')
     self.assertEquals(times.format(dt, ams), '2012-02-01 12:56:31+0100')
     self.assertEquals(times.format(dt, est), '2012-02-01 06:56:31-0500')
开发者ID:bwghughes,项目名称:times,代码行数:9,代码来源:test_times.py


示例8: test_format_without_tzinfo

 def test_format_without_tzinfo(self):  # noqa
     """Format times without timezone info"""
     dt = self.sometime_univ
     auckland = 'Pacific/Auckland'
     est = 'EST'
     ams = 'Europe/Amsterdam'
     self.assertEquals(times.format(dt, auckland),
                       '2012-02-02T00:56:31+13:00')
     self.assertEquals(times.format(dt, ams), '2012-02-01T12:56:31+01:00')
     self.assertEquals(times.format(dt, est), '2012-02-01T06:56:31-05:00')
开发者ID:gionniboy,项目名称:times,代码行数:10,代码来源:test_times.py


示例9: test_clean_rq

 def test_clean_rq(self):
     r = redis.Redis(**settings.REDIS)
     self.assertEqual(len(r.keys('rq:job:*')), 0)
     r.hmset('rq:job:abc', {'bar': 'baz'})
     r.hmset('rq:job:def', {'created_at': times.format(times.now(), 'UTC')})
     r.hmset('rq:job:123', {
         'created_at': times.format(
             times.now() - timedelta(days=10), 'UTC')})
     self.assertEqual(len(r.keys('rq:job:*')), 3)
     call_command('clean_rq')
     self.assertEqual(len(r.keys('rq:job:*')), 2)
开发者ID:gjxlu,项目名称:feedhq,代码行数:11,代码来源:test_update_queue.py


示例10: facebook_event

def facebook_event(id):
    event = current_user.event_or_404(id)
    if event.is_facebook_involved():
        try:
            api = facebook.create_api()
            payload = {
                'name': event.name,
                'description': event.description or '',
                'location': event.venue or '',
                'start_time': times.format(event.starts_at, current_user.timezone, '%Y-%m-%dT%H:%M:%S'),
            }

            if event.facebook_id:
                api.post(path='/' + event.facebook_id, **payload)
            else:
                data = api.post(path='/events', **payload)
                with db.transaction:
                    event.facebook_id = data['id']

            contacts_to_invite = list(event.contacts_facebook_to_invite)
            if contacts_to_invite:
                ids = ','.join([c.facebook_id for c in contacts_to_invite])
                api.post(path='/' + event.facebook_id + '/invited?users=' + ids)
                with db.transaction:
                    for contact in contacts_to_invite:
                        event.set_invitation_sent(contact)

        except (facebook.ConnectionError, facebook.OAuthError):
            return redirect(facebook.create_authorize_url(
                action_url=url_for('facebook_event', id=event.id),
                error_url=url_for('edit_event', id=event.id),
                scope='create_event'
            ))
    return redirect(url_for('google_event', id=event.id))
开发者ID:fdvoracek,项目名称:oleander,代码行数:34,代码来源:events.py


示例11: convert

 def convert(tzs):
     if isinstance(tzs, basestring):
         return times.format(tzs, tzoffset)
     elif isinstance(tzs, int):
         return tzs + int(3600*tzoffset*1000)
     elif isinstance(tzs, list):
         return map(convert, tzs)
开发者ID:prachi,项目名称:raining,代码行数:7,代码来源:whale.py


示例12: test_convert_unix_time_to_datetime

    def test_convert_unix_time_to_datetime(self):  # noqa
        """Can convert from UNIX time to universal time."""
        unix_time = 1328257004.456  # as returned by time.time()
        self.assertEquals(
            times.from_unix(unix_time),
            datetime(2012, 2, 3, 8, 16, 44, 456000)
        )

        self.assertEquals(
            times.format(times.from_unix(unix_time), 'UTC'),
            '2012-02-03T08:16:44.456000+00:00')
        self.assertEquals(
            times.format(times.from_unix(unix_time), 'Europe/Amsterdam'),
            '2012-02-03T09:16:44.456000+01:00')
        self.assertEquals(
            times.format(times.from_unix(unix_time), 'Pacific/Auckland'),
            '2012-02-03T21:16:44.456000+13:00')
开发者ID:Mondego,项目名称:pyreco,代码行数:17,代码来源:allPythonContent.py


示例13: register_death

 def register_death(self):
     """Registers its own death."""
     self.log.debug('Registering death')
     with self.connection._pipeline() as p:
         # We cannot use self.state = 'dead' here, because that would
         # rollback the pipeline
         p.srem(self.redis_workers_keys, self.key)
         p.hset(self.key, 'death', times.format(times.now(), 'UTC'))
         p.expire(self.key, 60)
         p.execute()
开发者ID:aburan28,项目名称:rq,代码行数:10,代码来源:worker.py


示例14: tz_choices

def tz_choices():
    """Prepares timezone choices for use in forms."""
    choices = []
    for tz in common_timezones:
        places = tz.split('/')
        places.reverse()
        label = ', '.join(places).replace('_', ' ')
        time = times.format(times.now(), tz, '%H:%M')
        choices.append((tz, time + u' – ' + label))

    return sorted(choices, key=lambda choice: choice[1])
开发者ID:fdvoracek,项目名称:oleander,代码行数:11,代码来源:forms.py


示例15: save

    def save(self):
        """Persists the current job instance to its corresponding Redis key."""
        key = self.key

        obj = {}
        obj['created_at'] = times.format(self.created_at, 'UTC')

        if self.func_name is not None:
            obj['data'] = dumps(self.job_tuple)
        if self.origin is not None:
            obj['origin'] = self.origin
        if self.description is not None:
            obj['description'] = self.description
        if self.enqueued_at is not None:
            obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
        if self.ended_at is not None:
            obj['ended_at'] = times.format(self.ended_at, 'UTC')
        if self._result is not None:
            obj['result'] = dumps(self._result)
        if self.exc_info is not None:
            obj['exc_info'] = self.exc_info
        if self.timeout is not None:
            obj['timeout'] = self.timeout
        if self.result_ttl is not None:
            obj['result_ttl'] = self.result_ttl
        if self._status is not None:
            obj['status'] = self._status
        """
        Store additional attributes from job instance into Redis. This is done
        so that third party libraries using RQ can store additional data
        directly on ``Job`` instances. For example:

        job = Job.create(func)
        job.foo = 'bar'
        job.save() # Will persist the 'foo' attribute
        """
        additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS)
        for attr in additional_attrs:
            obj[attr] = getattr(self, attr)
        self.connection.hmset(key, obj)
开发者ID:nnjpp,项目名称:rq,代码行数:40,代码来源:job.py


示例16: convert

def convert(tzs, tzoffset=None):
    if tzoffset == 'system':
        tzoffset = (time.timezone / -(60*60) * 100)
    if not tzoffset:
        return tzs
    elif isinstance(tzs, datetime):
        return tzs + timedelta(hours=float(tzoffset)/100)
    elif isinstance(tzs, basestring):
        return times.format(tzs, int(tzoffset))
    elif isinstance(tzs, int):
        return tzs + int(3600*float(tzoffset)/100)
    elif isinstance(tzs, list):
        return map(lambda tz: convert(tz, float(tzoffset)), tzs)
开发者ID:prachi,项目名称:raining,代码行数:13,代码来源:periods.py


示例17: perform_job

    def perform_job(self, job):
        """Performs the actual work of a job.  Will/should only be called
        inside the work horse's process.
        """
        self.procline('Processing %s from %s since %s' % (
            job.func_name,
            job.origin, time.time()))

        try:
            with death_penalty_after(job.timeout or 180):
                rv = job.perform()

            # Pickle the result in the same try-except block since we need to
            # use the same exc handling when pickling fails
            pickled_rv = dumps(rv)
            job._status = Status.FINISHED
            job.ended_at = times.now()
        except:
            # Use the public setter here, to immediately update Redis
            job.status = Status.FAILED
            self.handle_exception(job, *sys.exc_info())
            return False

        if rv is None:
            self.log.info('Job OK')
        else:
            self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))

        # How long we persist the job result depends on the value of
        # result_ttl:
        # - If result_ttl is 0, cleanup the job immediately.
        # - If it's a positive number, set the job to expire in X seconds.
        # - If result_ttl is negative, don't set an expiry to it (persist
        #   forever)
        result_ttl =  self.default_result_ttl if job.result_ttl is None else job.result_ttl  # noqa
        if result_ttl == 0:
            job.delete()
            self.log.info('Result discarded immediately.')
        else:
            p = self.connection.pipeline()
            p.hset(job.key, 'result', pickled_rv)
            p.hset(job.key, 'status', job._status)
            p.hset(job.key, 'ended_at', times.format(job.ended_at, 'UTC'))
            if result_ttl > 0:
                p.expire(job.key, result_ttl)
                self.log.info('Result is kept for %d seconds.' % result_ttl)
            else:
                self.log.warning('Result will never expire, clean up result key manually.')
            p.execute()

        return True
开发者ID:nzinfo,项目名称:rq,代码行数:51,代码来源:worker.py


示例18: register_birth

 def register_birth(self):  # noqa
     """Registers its own birth."""
     self.log.debug('Registering birth of worker %s' % (self.name,))
     if self.connection.exists(self.key) and \
             not self.connection.hexists(self.key, 'death'):
         raise ValueError('There exists an active worker named \'%s\' '
                          'already.' % (self.name,))
     key = self.key
     queues = ','.join(self.queue_names())
     with self.connection._pipeline() as p:
         p.delete(key)
         p.hset(key, 'birth', times.format(times.now(), 'UTC'))
         p.hset(key, 'queues', queues)
         p.sadd(self.redis_workers_keys, key)
         p.expire(key, self.default_worker_ttl)
         p.execute()
开发者ID:aburan28,项目名称:rq,代码行数:16,代码来源:worker.py


示例19: serialize_date

def serialize_date(dt):
    if dt is None:
        return None

    return times.format(dt, get_tz())
开发者ID:randomknowledge,项目名称:rqworker_dashboard,代码行数:5,代码来源:json.py


示例20: default

 def default(self, o):
     if isinstance(o, datetime.datetime):
         return times.format(o, 'Zulu')
     if isinstance(o, datetime.date):
         return o.isoformat()
     return super(JSONDateTimeMixin, self).default(o)
开发者ID:pombredanne,项目名称:flask-arrest,代码行数:6,代码来源:__init__.py



注:本文中的times.format函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python times.now函数代码示例发布时间:2022-05-27
下一篇:
Python timers.timing函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap