本文整理汇总了Python中times.format函数的典型用法代码示例。如果您正苦于以下问题:Python format函数的具体用法?Python format怎么用?Python format使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了format函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: save
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
obj = {}
obj["created_at"] = times.format(self.created_at, "UTC")
if self.func_name is not None:
obj["data"] = dumps(self.job_tuple)
if self.origin is not None:
obj["origin"] = self.origin
if self.description is not None:
obj["description"] = self.description
if self.enqueued_at is not None:
obj["enqueued_at"] = times.format(self.enqueued_at, "UTC")
if self.ended_at is not None:
obj["ended_at"] = times.format(self.ended_at, "UTC")
if self._result is not None:
obj["result"] = dumps(self._result)
if self.exc_info is not None:
obj["exc_info"] = self.exc_info
if self.timeout is not None:
obj["timeout"] = self.timeout
if self.result_ttl is not None:
obj["result_ttl"] = self.result_ttl
if self._status is not None:
obj["status"] = self._status
if self.meta:
obj["meta"] = dumps(self.meta)
self.connection.hmset(key, obj)
开发者ID:toxster,项目名称:rq,代码行数:31,代码来源:job.py
示例2: dump
def dump(self):
"""Returns a serialization of the current job instance"""
obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
if self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self._result is not None:
obj['result'] = dumps(self._result)
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self._dependency_id is not None:
obj['dependency_id'] = self._dependency_id
if self.meta:
obj['meta'] = dumps(self.meta)
return obj
开发者ID:alonisser,项目名称:rq,代码行数:31,代码来源:job.py
示例3: on_callback
def on_callback(self, request):
if request.method != 'POST':
request.respond('This hook only supports POST method.')
else:
if request.GET.get('secret', [None])[0] != self.bot.config.draftin_secret:
request.respond('Wrong secret was specified')
else:
payload = anyjson.deserialize(request.POST['payload'][0])
title = payload['name']
content = payload['content']
slug = slugify(title)
created_at = times.to_universal(payload['created_at'])
updated_at = times.to_universal(payload['updated_at'])
timezone = self.bot.config.timezone
with open(os.path.join(
self.bot.config.documents_dir,
slug + '.md'), 'w') as f:
post_content = self.template.format(title=title,
content=content,
slug=slug,
created_at=times.format(created_at, timezone, '%Y-%m-%d %H:%M'),
updated_at=times.format(updated_at, timezone, '%Y-%m-%d %H:%M'))
f.write(post_content.encode('utf-8'))
try:
subprocess.check_output(self.bot.config.update_command,
stderr=subprocess.STDOUT,
shell=True)
except subprocess.CalledProcessError, e:
request.respond(u'I tried to update a blog, but there was an error: ' + e.output.encode('utf-8'))
else:
request.respond('Done, published')
开发者ID:svetlyak40wt,项目名称:thebot-draftin,代码行数:34,代码来源:thebot_draftin.py
示例4: save
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
obj = {}
obj['created_at'] = times.format(self.created_at, 'UTC')
if self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self._result is not None:
obj['result'] = self._result
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
self.connection.hmset(key, obj)
开发者ID:MorsCode,项目名称:rq,代码行数:25,代码来源:job.py
示例5: save
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
if self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self._result is not None:
obj['result'] = dumps(self._result)
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self.meta:
obj['meta'] = dumps(self.meta)
connection.hmset(key, obj)
开发者ID:sempr,项目名称:rq,代码行数:32,代码来源:job.py
示例6: test_clean_rq
def test_clean_rq(self):
r = get_redis_connection()
self.assertEqual(len(r.keys("rq:job:*")), 0)
r.hmset("rq:job:abc", {"bar": "baz"})
r.hmset("rq:job:def", {"created_at": times.format(times.now(), "UTC")})
r.hmset("rq:job:123", {"created_at": times.format(times.now() - timedelta(days=10), "UTC")})
self.assertEqual(len(r.keys("rq:job:*")), 3)
call_command("clean_rq")
self.assertEqual(len(r.keys("rq:job:*")), 2)
开发者ID:bdyck,项目名称:feedhq,代码行数:9,代码来源:test_update_queue.py
示例7: test_format_without_tzinfo
def test_format_without_tzinfo(self):
"""Format times without timezone info"""
dt = self.sometime_univ
auckland = pytz.timezone('Pacific/Auckland')
est = pytz.timezone('EST')
ams = pytz.timezone('Europe/Amsterdam')
self.assertEquals(times.format(dt, auckland), '2012-02-02 00:56:31+1300')
self.assertEquals(times.format(dt, ams), '2012-02-01 12:56:31+0100')
self.assertEquals(times.format(dt, est), '2012-02-01 06:56:31-0500')
开发者ID:bwghughes,项目名称:times,代码行数:9,代码来源:test_times.py
示例8: test_format_without_tzinfo
def test_format_without_tzinfo(self): # noqa
"""Format times without timezone info"""
dt = self.sometime_univ
auckland = 'Pacific/Auckland'
est = 'EST'
ams = 'Europe/Amsterdam'
self.assertEquals(times.format(dt, auckland),
'2012-02-02T00:56:31+13:00')
self.assertEquals(times.format(dt, ams), '2012-02-01T12:56:31+01:00')
self.assertEquals(times.format(dt, est), '2012-02-01T06:56:31-05:00')
开发者ID:gionniboy,项目名称:times,代码行数:10,代码来源:test_times.py
示例9: test_clean_rq
def test_clean_rq(self):
r = redis.Redis(**settings.REDIS)
self.assertEqual(len(r.keys('rq:job:*')), 0)
r.hmset('rq:job:abc', {'bar': 'baz'})
r.hmset('rq:job:def', {'created_at': times.format(times.now(), 'UTC')})
r.hmset('rq:job:123', {
'created_at': times.format(
times.now() - timedelta(days=10), 'UTC')})
self.assertEqual(len(r.keys('rq:job:*')), 3)
call_command('clean_rq')
self.assertEqual(len(r.keys('rq:job:*')), 2)
开发者ID:gjxlu,项目名称:feedhq,代码行数:11,代码来源:test_update_queue.py
示例10: facebook_event
def facebook_event(id):
event = current_user.event_or_404(id)
if event.is_facebook_involved():
try:
api = facebook.create_api()
payload = {
'name': event.name,
'description': event.description or '',
'location': event.venue or '',
'start_time': times.format(event.starts_at, current_user.timezone, '%Y-%m-%dT%H:%M:%S'),
}
if event.facebook_id:
api.post(path='/' + event.facebook_id, **payload)
else:
data = api.post(path='/events', **payload)
with db.transaction:
event.facebook_id = data['id']
contacts_to_invite = list(event.contacts_facebook_to_invite)
if contacts_to_invite:
ids = ','.join([c.facebook_id for c in contacts_to_invite])
api.post(path='/' + event.facebook_id + '/invited?users=' + ids)
with db.transaction:
for contact in contacts_to_invite:
event.set_invitation_sent(contact)
except (facebook.ConnectionError, facebook.OAuthError):
return redirect(facebook.create_authorize_url(
action_url=url_for('facebook_event', id=event.id),
error_url=url_for('edit_event', id=event.id),
scope='create_event'
))
return redirect(url_for('google_event', id=event.id))
开发者ID:fdvoracek,项目名称:oleander,代码行数:34,代码来源:events.py
示例11: convert
def convert(tzs):
if isinstance(tzs, basestring):
return times.format(tzs, tzoffset)
elif isinstance(tzs, int):
return tzs + int(3600*tzoffset*1000)
elif isinstance(tzs, list):
return map(convert, tzs)
开发者ID:prachi,项目名称:raining,代码行数:7,代码来源:whale.py
示例12: test_convert_unix_time_to_datetime
def test_convert_unix_time_to_datetime(self): # noqa
"""Can convert from UNIX time to universal time."""
unix_time = 1328257004.456 # as returned by time.time()
self.assertEquals(
times.from_unix(unix_time),
datetime(2012, 2, 3, 8, 16, 44, 456000)
)
self.assertEquals(
times.format(times.from_unix(unix_time), 'UTC'),
'2012-02-03T08:16:44.456000+00:00')
self.assertEquals(
times.format(times.from_unix(unix_time), 'Europe/Amsterdam'),
'2012-02-03T09:16:44.456000+01:00')
self.assertEquals(
times.format(times.from_unix(unix_time), 'Pacific/Auckland'),
'2012-02-03T21:16:44.456000+13:00')
开发者ID:Mondego,项目名称:pyreco,代码行数:17,代码来源:allPythonContent.py
示例13: register_death
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
with self.connection._pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
p.srem(self.redis_workers_keys, self.key)
p.hset(self.key, 'death', times.format(times.now(), 'UTC'))
p.expire(self.key, 60)
p.execute()
开发者ID:aburan28,项目名称:rq,代码行数:10,代码来源:worker.py
示例14: tz_choices
def tz_choices():
"""Prepares timezone choices for use in forms."""
choices = []
for tz in common_timezones:
places = tz.split('/')
places.reverse()
label = ', '.join(places).replace('_', ' ')
time = times.format(times.now(), tz, '%H:%M')
choices.append((tz, time + u' – ' + label))
return sorted(choices, key=lambda choice: choice[1])
开发者ID:fdvoracek,项目名称:oleander,代码行数:11,代码来源:forms.py
示例15: save
def save(self):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
obj = {}
obj['created_at'] = times.format(self.created_at, 'UTC')
if self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self._result is not None:
obj['result'] = dumps(self._result)
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
"""
Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data
directly on ``Job`` instances. For example:
job = Job.create(func)
job.foo = 'bar'
job.save() # Will persist the 'foo' attribute
"""
additional_attrs = set(self.__dict__.keys()).difference(JOB_ATTRS)
for attr in additional_attrs:
obj[attr] = getattr(self, attr)
self.connection.hmset(key, obj)
开发者ID:nnjpp,项目名称:rq,代码行数:40,代码来源:job.py
示例16: convert
def convert(tzs, tzoffset=None):
if tzoffset == 'system':
tzoffset = (time.timezone / -(60*60) * 100)
if not tzoffset:
return tzs
elif isinstance(tzs, datetime):
return tzs + timedelta(hours=float(tzoffset)/100)
elif isinstance(tzs, basestring):
return times.format(tzs, int(tzoffset))
elif isinstance(tzs, int):
return tzs + int(3600*float(tzoffset)/100)
elif isinstance(tzs, list):
return map(lambda tz: convert(tz, float(tzoffset)), tzs)
开发者ID:prachi,项目名称:raining,代码行数:13,代码来源:periods.py
示例17: perform_job
def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % (
job.func_name,
job.origin, time.time()))
try:
with death_penalty_after(job.timeout or 180):
rv = job.perform()
# Pickle the result in the same try-except block since we need to
# use the same exc handling when pickling fails
pickled_rv = dumps(rv)
job._status = Status.FINISHED
job.ended_at = times.now()
except:
# Use the public setter here, to immediately update Redis
job.status = Status.FAILED
self.handle_exception(job, *sys.exc_info())
return False
if rv is None:
self.log.info('Job OK')
else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
# How long we persist the job result depends on the value of
# result_ttl:
# - If result_ttl is 0, cleanup the job immediately.
# - If it's a positive number, set the job to expire in X seconds.
# - If result_ttl is negative, don't set an expiry to it (persist
# forever)
result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa
if result_ttl == 0:
job.delete()
self.log.info('Result discarded immediately.')
else:
p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv)
p.hset(job.key, 'status', job._status)
p.hset(job.key, 'ended_at', times.format(job.ended_at, 'UTC'))
if result_ttl > 0:
p.expire(job.key, result_ttl)
self.log.info('Result is kept for %d seconds.' % result_ttl)
else:
self.log.warning('Result will never expire, clean up result key manually.')
p.execute()
return True
开发者ID:nzinfo,项目名称:rq,代码行数:51,代码来源:worker.py
示例18: register_birth
def register_birth(self): # noqa
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s' % (self.name,))
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
raise ValueError('There exists an active worker named \'%s\' '
'already.' % (self.name,))
key = self.key
queues = ','.join(self.queue_names())
with self.connection._pipeline() as p:
p.delete(key)
p.hset(key, 'birth', times.format(times.now(), 'UTC'))
p.hset(key, 'queues', queues)
p.sadd(self.redis_workers_keys, key)
p.expire(key, self.default_worker_ttl)
p.execute()
开发者ID:aburan28,项目名称:rq,代码行数:16,代码来源:worker.py
示例19: serialize_date
def serialize_date(dt):
if dt is None:
return None
return times.format(dt, get_tz())
开发者ID:randomknowledge,项目名称:rqworker_dashboard,代码行数:5,代码来源:json.py
示例20: default
def default(self, o):
if isinstance(o, datetime.datetime):
return times.format(o, 'Zulu')
if isinstance(o, datetime.date):
return o.isoformat()
return super(JSONDateTimeMixin, self).default(o)
开发者ID:pombredanne,项目名称:flask-arrest,代码行数:6,代码来源:__init__.py
注:本文中的times.format函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论