本文整理汇总了Python中tzlocal.get_localzone函数的典型用法代码示例。如果您正苦于以下问题:Python get_localzone函数的具体用法?Python get_localzone怎么用?Python get_localzone使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_localzone函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _parse_time_range
def _parse_time_range(cls, date, start_time, end_time, now):
parsed_date = cls._parse_date(date, now)
parsed_start = cls._parse_time(start_time)
parsed_end = cls._parse_time(end_time)
if parsed_date is None:
if parsed_start is None:
raise ValueError('Failed to create event: --date or --start options are missing.')
# set date today or tomorrow
dt = now.date()
if (parsed_start.hour, parsed_start.minute) < (now.hour, now.minute):
dt += timedelta(days=1)
else:
if parsed_start is None:
if parsed_end is not None:
raise ValueError('Failed to create event: --date option with --end is set but --start is missing.')
# all-day event
t = get_localzone().localize(datetime(parsed_date.year, parsed_date.month, parsed_date.day))
return EventTime(False, t), EventTime(False, t)
dt = parsed_date
# set start and end event time
start = get_localzone().localize(datetime.combine(dt, parsed_start))
if parsed_end is None:
end = start + cls.DEFAULT_CREATE_DURATION
else:
end = get_localzone().localize(datetime.combine(dt, parsed_end))
if parsed_start > parsed_end:
end += timedelta(days=1)
return EventTime(True, start), EventTime(True, end)
开发者ID:mogproject,项目名称:calendar-cli,代码行数:31,代码来源:setting.py
示例2: get_time_logs
def get_time_logs(self, ignore_marked=True):
"""Get all time logs from toggl
*ignore_marked* entries? if True check for the tags in every
entry and filter them
"""
start_date = datetime.datetime.now(
tz=get_localzone()) - datetime.timedelta(self.time_period)
end_date = datetime.datetime.now(tz=get_localzone())
# Ignore microseconds
start_date = start_date.replace(microsecond=0).isoformat()
end_date = end_date.replace(microsecond=0).isoformat()
payload = {
'start_date': start_date,
'end_date': end_date
}
try:
request_object = requests.get(self.url,
params=payload,
auth=(self.api_key, 'api_token'))
except Exception:
logging.error('Failed to make request')
raise
if request_object.status_code != 200:
logging.error('Failed to fetch time logs please see the response:\n{0}'
.format(request_object.content))
return list()
return self._filter(request_object.json(), ignore_marked)
开发者ID:skarbot,项目名称:utils,代码行数:35,代码来源:toggl.py
示例3: test_small
def test_small(self):
config = get_config(PATH + 'small.conf')
assert config == {
'calendars': {
'home': {'path': os.path.expanduser('~/.calendars/home/'), 'color': 'dark green', 'readonly': False},
'work': {'path': os.path.expanduser('~/.calendars/work/'), 'readonly': True, 'color': ''}},
'sqlite': {'path': os.path.expanduser('~/.local/share/khal/khal.db')},
'locale': {
'local_timezone': get_localzone(),
'default_timezone': get_localzone(),
'timeformat': '%H:%M',
'dateformat': '%d.%m.',
'longdateformat': '%d.%m.%Y',
'datetimeformat': '%d.%m. %H:%M',
'longdatetimeformat': '%d.%m.%Y %H:%M',
'firstweekday': 0,
'encoding': 'utf-8',
'unicode_symbols': True,
},
'default': {
'default_command': 'calendar',
'debug': False,
'default_calendar': 'home',
}
}
开发者ID:gunendu,项目名称:khal,代码行数:25,代码来源:settings_test.py
示例4: test_small
def test_small(self):
config = get_config(PATH + 'small.conf')
comp_config = {
'calendars': {
'home': {'path': os.path.expanduser('~/.calendars/home/'),
'color': 'dark green', 'readonly': False,
'type': 'calendar'},
'work': {'path': os.path.expanduser('~/.calendars/work/'),
'readonly': True, 'color': '',
'type': 'calendar'}},
'sqlite': {'path': os.path.expanduser('~/.local/share/khal/khal.db')},
'locale': {
'local_timezone': get_localzone(),
'default_timezone': get_localzone(),
'timeformat': '%H:%M',
'dateformat': '%d.%m.',
'longdateformat': '%d.%m.%Y',
'datetimeformat': '%d.%m. %H:%M',
'longdatetimeformat': '%d.%m.%Y %H:%M',
'firstweekday': 0,
'encoding': 'utf-8',
'unicode_symbols': True,
'weeknumbers': False,
},
'default': {
'default_calendar': None,
'default_command': 'calendar',
'print_new': 'False',
'show_all_days': False,
'days': 2,
}
}
for key in comp_config:
assert config[key] == comp_config[key]
开发者ID:0mark,项目名称:khal,代码行数:34,代码来源:settings_test.py
示例5: create_from_raw
def create_from_raw(cls):
"""
Creates MinuteData from raw EfergyData
"""
try:
latest_minute = cls.objects.latest('timestamp')
start_time = latest_minute.timestamp
# Why do I have to do this?!?!
start_time = start_time + timedelta(minutes=1)
start_time = start_time.astimezone(get_localzone())
except cls.DoesNotExist:
start_time = pytz.utc.localize(datetime(2000, 1, 1, 0, 0, 0))
last_minute = now().replace(second=0, microsecond=0)
end_time = last_minute.astimezone(get_localzone())
cursor = connection.cursor()
# NOTE: This code is MySQL-specific!
# NOTE: When taking the HOUR() or MINUTE() from a datetime column, it
# is important to explicitly convert the value to the local timezone
# first otherwise, the calculations will be for UTC.
cursor.execute("""
INSERT INTO efergy_minutedata (
SELECT
NULL,
FROM_UNIXTIME(FLOOR(UNIX_TIMESTAMP(`timestamp`)/60)*60) as 'timestamp',
HOUR(CONVERT_TZ(`timestamp`, '+00:00', @@global.time_zone)) * 60 + MINUTE(CONVERT_TZ(`timestamp`, '+00:00', @@global.time_zone)) as 'minute',
TRUNCATE(AVG(`watts`), 6) as 'watts'
FROM `efergy_efergydata`
WHERE `timestamp` >= %s AND `timestamp` < %s
GROUP BY 2
) ON DUPLICATE KEY UPDATE `timestamp`=`timestamp`""",
(start_time, end_time))
开发者ID:mkoistinen,项目名称:joule,代码行数:33,代码来源:models.py
示例6: test_seedset_export
def test_seedset_export(self, mock_rabbit_worker_class):
mock_rabbit_worker = MagicMock(spec=RabbitWorker)
mock_rabbit_worker_class.side_effect = [mock_rabbit_worker]
export = Export.objects.create(user=self.user,
export_type="test_type",
export_format="json",
dedupe=True,
item_date_start=datetime.datetime.now(get_localzone()),
item_date_end=datetime.datetime.now(get_localzone()),
harvest_date_start=datetime.datetime.now(get_localzone()),
harvest_date_end=datetime.datetime.now(get_localzone()))
export.seed_set = self.seedset
export.save()
request_export(export)
# Export start message sent
name, args, kwargs = mock_rabbit_worker.mock_calls[0]
self.assertEqual("send_message", name)
message = args[0]
self.assertEqual(message["id"], export.export_id)
self.assertEqual(message["path"], export.path)
self.assertEqual(message["type"], export.export_type)
self.assertEqual(message["format"], export.export_format)
self.assertTrue(message["dedupe"])
self.assertEqual(iso8601.parse_date(message["item_date_start"]), export.item_date_start)
self.assertEqual(iso8601.parse_date(message["item_date_end"]), export.item_date_end)
self.assertEqual(iso8601.parse_date(message["harvest_date_start"]), export.harvest_date_start)
self.assertEqual(iso8601.parse_date(message["harvest_date_end"]), export.harvest_date_end)
self.assertEqual(message["seedset"]["id"], export.seed_set.seedset_id)
self.assertEqual("export.start.test_platform.test_type", args[1])
开发者ID:Tanych,项目名称:gh-sfm-ui,代码行数:32,代码来源:test_export.py
示例7: read
def read(subject, library, startdate=None, enddate=None, fields=None):
'''
Searches for data requested by params. Returns data as a pandas dataframe, with datetimes as localzone
With no datetimes provided, will return first 100000 results. With startdate only, will provide 100000 results from
the starttdate. With enddate, will provide everything up to enddate.
@param: startdate; datetime; start date to query
@param: enddate; datetime; end date to query
@param: subject; string; what data to query
@param: fields; [str]; fields to return
@param: library; where to store it
@return: data of 'subject' from 'daterange' with timezones converted to local
'''
startdate = addTZ(startdate)
enddate = addTZ(enddate)
date_range = DateRange(startdate, enddate)
df = library.read(subject, date_range, fields)
if df.index.tzinfo is None:
df.index = df.index.tz_localize(pytz.utc).tz_convert(tzlocal.get_localzone())
else:
df.index = df.index.tz_convert(tzlocal.get_localzone())
return df
开发者ID:isentium,项目名称:arctic,代码行数:29,代码来源:tickstore_addons.py
示例8: print_events
def print_events(conn, stack_name, follow, lines=100, from_dt=datetime.fromtimestamp(0, tz=pytz.UTC)):
"""Prints tabulated list of events"""
events_display = []
seen_ids = set()
next_token = None
while True:
events, next_token = get_events(conn, stack_name, next_token)
status = get_stack_status(conn, stack_name)
normalize_events_timestamps(events)
if follow:
events_display = [(ev.timestamp.astimezone(tzlocal.get_localzone()), ev.resource_status, ev.resource_type,
ev.logical_resource_id, ev.resource_status_reason) for ev in events
if ev.event_id not in seen_ids and ev.timestamp >= from_dt]
if len(events_display) > 0:
print(tabulate(events_display, tablefmt='plain'), flush=True)
seen_ids |= set([event.event_id for event in events])
if status not in IN_PROGRESS_STACK_STATES and next_token is None:
break
if next_token is None:
time.sleep(5)
else:
events_display.extend([(event.timestamp.astimezone(tzlocal.get_localzone()), event.resource_status,
event.resource_type, event.logical_resource_id, event.resource_status_reason)
for event in events])
if len(events_display) >= lines or next_token is None:
break
if not follow:
print(tabulate(events_display[:lines], tablefmt='plain'), flush=True)
return status
开发者ID:cfstacks,项目名称:stacks,代码行数:32,代码来源:cf.py
示例9: UtcToLocalTime
def UtcToLocalTime(utctime):
if isinstance(utctime, datetime.datetime):
return utctime.replace(tzinfo=pytz.utc).astimezone(tzlocal.get_localzone())
elif isinstance(utctime, list):
return [ t.replace(tzinfo=pytz.utc).astimezone(tzlocal.get_localzone()) for t in utctime ]
else:
raise Exception('UTC time ' + str(utctime) + ' has unknown type ' + str(type(utctime)))
开发者ID:proebrock,项目名称:flaskplot,代码行数:7,代码来源:app.py
示例10: ls
def ls(s3_url,
recursive=False,
exit_codes: 'exit 1 if there are no results' = True):
"""
list bucket contents
"""
if not s3_url:
for bucket in _retry(_client().list_buckets)()['Buckets']:
yield '%s %s' % (str(bucket['CreationDate'].astimezone(tzlocal.get_localzone()))[:-6],
bucket['Name'])
else:
bucket, *prefix = s3_url.split('s3://')[-1].split('/')
kw = {'Bucket': bucket,
'Prefix': '/'.join(prefix),
'Delimiter': '' if recursive else '/'}
results = False
while True:
resp = _retry(_client().list_objects_v2)(**kw)
logging.debug(pprint.pformat(resp))
for pre in resp.get('CommonPrefixes', []):
results = True
yield 'PRE %s' % pre['Prefix']
for key in resp.get('Contents', []):
results = True
yield '%s %s %s %s' % (str(key['LastModified'].astimezone(tzlocal.get_localzone()))[:-6],
key['Size'],
key['Key'],
key['StorageClass'])
if resp['IsTruncated']:
kw['ContinuationToken'] = resp['NextContinuationToken']
else:
break
if not results and exit_codes:
sys.exit(1)
开发者ID:nathants,项目名称:py-aws,代码行数:34,代码来源:s3.py
示例11: set_heating_trigger
def set_heating_trigger(self, proportion, on):
self.proportional_time = proportion
if self.heating_trigger is not None:
try:
self.heating_trigger.remove()
except JobLookupError as e:
pass
self.heating_trigger = None
if on:
if proportion < self.config['heating_settings']['proportional_heating_interval_minutes']:
run_date = self.time_on + datetime.timedelta(0,self.proportional_time * 60)
logger.info('New proportional time: ' + str(proportion) + '/' + str(self.config['heating_settings']['proportional_heating_interval_minutes']) +\
' mins - will turn off at ' + str(run_date.astimezone(get_localzone())))
self.heating_trigger = self.sched.add_job(\
self.process, trigger='date', run_date=run_date, name='Proportional off at ' + str(run_date.astimezone(get_localzone())))
else:
if proportion > 0:
if self.time_off is None:
self.time_off = pytz.utc.localize(datetime.datetime.utcnow())
run_date = self.time_off + datetime.timedelta(0,(self.config['heating_settings']['proportional_heating_interval_minutes'] - self.proportional_time) * 60)
logger.info('New proportional time: ' + str(proportion) + '/' + str(self.config['heating_settings']['proportional_heating_interval_minutes']) +\
' mins - will turn on at ' + str(run_date.astimezone(get_localzone())))
self.heating_trigger = self.sched.add_job(\
self.process, trigger='date', run_date=run_date, name='Proportional on at ' + str(run_date.astimezone(get_localzone())))
开发者ID:daviessm,项目名称:heating,代码行数:25,代码来源:heating.py
示例12: iscurrent
def iscurrent(self,thread,created_utc,days_old=7):
if thread=="weekly":
check = datetime.utcnow().replace(tzinfo=tz.utc).astimezone(tzlocal.get_localzone())
created = datetime.utcfromtimestamp(created_utc).replace(tzinfo=tz.utc)
created_local = created.astimezone(tzlocal.get_localzone())
if (check - created_local).total_seconds() <= ((days_old*24)+4)*3600:
logging.debug("Thread created within last %s days (+4 hour buffer), %s seconds ago: %s.", days_old, (check - created_local).total_seconds(), created.strftime("%Y-%m-%d %I:%M:%S %p %Z"))
return True
return False
开发者ID:toddrob99,项目名称:Baseball-GDT-Bot,代码行数:9,代码来源:timecheck.py
示例13: time_entries
def time_entries():
week_ago = datetime.datetime.now(tz=tzlocal.get_localzone()).replace(microsecond=0) - datetime.timedelta(days=7)
params = {}
params = {
'start_date': week_ago.isoformat(),
'end_date': datetime.datetime.now(tz=tzlocal.get_localzone()).replace(microsecond=0).isoformat()
}
print(req('time_entries', params, 'GET'))
开发者ID:t0mk,项目名称:titds,代码行数:9,代码来源:titds.py
示例14: updateInfo
def updateInfo(self,force=False):
self.logger.debug("[TvShowSchedule] updateInfo method called")
v30DaysAgo = datetime.datetime.now(tzlocal.get_localzone()) - datetime.timedelta(days=30)
if force:
self['info']['infoUpdate'] = v30DaysAgo
# If update is forced or info is indicated and newer than 30 days ago
if force or self['info'].setdefault('infoUpdate',v30DaysAgo) <= v30DaysAgo:
t = myTvDB.myTvDB()
if 'overview' in t[self.seriesid].data.keys() and t[self.seriesid].data['overview'] is not None:
overview = t[self.seriesid].data['overview']
else:
overview = ""
bannerAvailable = (
self.bannerDir is not None
and 'banner' in t[self.seriesid].data
and t[self.seriesid].data['banner'] is not None
)
episodesList = t[self.seriesid].getEpisodesList()
arrList = [0]
for key in range(1,99):
if key in episodesList.keys():
arrList.append(episodesList[key])
else:
break
episodesList = arrList
info={
'seriesname':t[self.seriesid].data['seriesname'],
'overview':overview,
'infoUpdate':datetime.datetime.now(tzlocal.get_localzone()),
'banner': bannerAvailable,
'episodesList':episodesList
}
self.set(info=info,autoComplete=False)
if bannerAvailable and not self.isBannerUpdated():
self.updateBanner(t[self.seriesid].data['banner'])
if self['season'] * self['episode'] > 0:
try:
t[self.seriesid][self['season']][self['episode']]
except:
self.logger.error("[tvShowSchedule] {0} S{1:02}E{2:02} does not exist. Reset to next episode"
.format(self.seriesid,self['season'],self['episode']))
self.set(season=0,episode=0,status=0,autoComplete=True)
return
episodeData = t[self.seriesid][self['season']][self['episode']]
firstaired = episodeData['firstaired']
if isinstance(firstaired,basestring):
firstaired = dateutil.parser.parse(firstaired)
if firstaired is None:
self.logger.error("[tvShowSchedule] No firstaired for {0}".format(unicode(firstaired)))
raise Exception("No firstaired for {0}".format(unicode(firstaired)))
if firstaired.tzinfo is None or firstaired.tzinfo.utcoffset(firstaired) is None:
firstaired = tzlocal.get_localzone().localize(firstaired)
info['firstaired'] = firstaired
self.set(info=info,autoComplete=False)
self.setDefault()
self.logger.debug("[tvShowSchedule] TvShow has been updated. New value:\n {0}".format(unicode(self.data)))
开发者ID:kavod,项目名称:TvShowWatch-2,代码行数:57,代码来源:tvShowSchedule.py
示例15: __init__
def __init__(self, d, t, interaction, winner, quantity, item, cp=False):
if cp:
if d:
month, day = map(int, d.strip('[] ').split('/'))
else:
month, day = 1,1
if t:
hour, minute = map(int, t.strip('[] ').split(':'))
else:
hour, minute = 0, 0
if tzlocal_present:
self.datetime = tzlocal.get_localzone().localize(
datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute))
else:
self.datetime = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute)
else:
if tzlocal_present:
#self.datetime = tzlocal.get_localzone().localize(datetime.datetime.strptime(d+t, '%Y%m%d%H%M%S'))
self.datetime = tzlocal.get_localzone().localize(datetime.datetime(year=int(d[:4]), month=int(d[4:6]),
day=int(d[6:]), hour=int(t[:2]), minute=int(t[2:4]), second=int(t[4:])))
else:
#self.datetime = datetime.datetime.strptime(d+t, '%Y%m%d%H%M%S')
self.datetime = datetime.datetime(year=int(d[:4]), month=int(d[4:6]),
day=int(d[6:]), hour=int(t[:2]), minute=int(t[2:4]), second=int(t[4:]))
self.winner = winner or ''
quantity = quantity or ''
quantity = (int(quantity.strip().replace(',','') or 0) or
int(''.join(item.split(' x ')[1:]).replace(',','') or 1))
item = item.split(' x ')[0].rstrip('!.').rsplit(' erhalten', maxsplit=1)[0]
self.interaction = interaction or ''
if interaction in {'lost', 'placed a bet of', 'discarded', 'spent'}:
self.gain_item = ''
self.gain_value = 0
self.loss_item = item
self.loss_value = quantity * -1
elif interaction == 'sold':
item, gain = item.rsplit(' for ', maxsplit=1)
self.loss_item = item
self.loss_value = -1
quantity, item = gain.split(maxsplit=1)
self.gain_item = item
self.gain_value = int(quantity.replace(',', ''))
elif interaction == "didn't win any":
self.gain_item = item
self.gain_value = 0
self.loss_item = ''
self.loss_value = 0
else:
self.gain_item = item
self.gain_value = quantity
self.loss_item = ''
self.loss_value = 0
开发者ID:TigerhawkT3,项目名称:sto_loot_parser,代码行数:57,代码来源:sto_loot_parser.py
示例16: convert
def convert(self, writers_schema, value):
if not isinstance(value, datetime.datetime):
if isinstance(value, datetime.date):
value = tzlocal.get_localzone().localize(
datetime.datetime(value.year, value.month, value.day, 0, 0, 0, 0))
if value.tzinfo is None:
value = tzlocal.get_localzone().localize(value)
value = (time.mktime(value.utctimetuple()) - EPOCH_TT) + value.microsecond / 1000000.0
return long(value * 1000000)
开发者ID:rbystrit,项目名称:avro_gen,代码行数:10,代码来源:logical.py
示例17: dateoflastweekly
def dateoflastweekly(self):
if self.weeklycheck():
return datetime.utcnow().replace(tzinfo=tz.utc).astimezone(tzlocal.get_localzone()).date()
else:
for d in (datetime.utcnow().replace(tzinfo=tz.utc).astimezone(tzlocal.get_localzone()).date() - timedelta(days=x) for x in range(1, 7)):
if self.weeklycheck(d):
return d
logging.error("Error in dateoflastweekly(), returning today's date...")
return datetime.utcnow().replace(tzinfo=tz.utc).astimezone(tzlocal.get_localzone()).date()
开发者ID:toddrob99,项目名称:Baseball-GDT-Bot,代码行数:10,代码来源:timecheck.py
示例18: ls_versions
def ls_versions(s3_url,
recursive=False,
latest: 'only show the latest version of a key' = False,
version_id: 'include version-ids as the last column of output'=False,
exit_codes: 'exit 1 if there are no results' = True):
"""
list bucket contents, including versions of keys
"""
if not s3_url:
for bucket in _retry(_client().list_buckets)()['Buckets']:
yield '%s %s' % (str(bucket['CreationDate'].astimezone(tzlocal.get_localzone()))[:-6],
bucket['Name'])
else:
bucket, *prefix = s3_url.split('s3://')[-1].split('/')
kw = {'Bucket': bucket,
'Prefix': '/'.join(prefix),
'Delimiter': '' if recursive else '/'}
results = False
while True:
resp = _retry(_client().list_object_versions)(**kw)
logging.debug(pprint.pformat(resp))
for pre in resp.get('CommonPrefixes', []):
results = True
yield 'PRE %s' % pre['Prefix']
for version in resp.get('Versions', []):
if not latest or version['IsLatest']:
results = True
yield '%s %s %s %s %s %s' % (
str(version['LastModified'].astimezone(tzlocal.get_localzone()))[:-6],
version['Size'],
version['Key'],
version['StorageClass'],
'LATEST' if version['IsLatest'] else 'HISTORICAL',
version['VersionId'] if version_id else '',
)
for delete in resp.get('DeleteMarkers', []):
if not latest or delete['IsLatest']:
results = True
yield '%s %s %s %s %s %s' % (
str(delete['LastModified'].astimezone(tzlocal.get_localzone()))[:-6],
'-',
delete['Key'],
'-',
'DELETED' if delete['IsLatest'] else 'HISTORICAL-DELETE',
delete['VersionId'] if version_id else '',
)
if resp['IsTruncated']:
if 'NextKeyMarker' in resp:
kw['KeyMarker'] = resp['NextKeyMarker']
if 'NextVersionIdMarker' in resp:
kw['VersionIdMarker'] = resp['NextVersionIdMarker']
else:
break
if not results and exit_codes:
sys.exit(1)
开发者ID:nathants,项目名称:py-aws,代码行数:55,代码来源:s3.py
示例19: gamecheck
def gamecheck(self,thisgame=1,gamecount=1,just_get_time=False):
if self.games[thisgame].get('gamesub'): return True #game thread is already posted
if self.games[thisgame].get('doubleheader') and str(self.games[thisgame].get('gameNumber'))=='2':
if self.SETTINGS.get('GAME_THREAD').get('HOLD_DH_GAME2_THREAD') and not just_get_time:
if self.games[self.games[thisgame].get('othergame')].get('doubleheader') and not self.games[self.games[thisgame].get('othergame')].get('final'):
logging.info("Holding doubleheader Game %s until Game %s is final, sleeping for 5 seconds...", self.games[thisgame].get('gameNumber'), self.games[self.games[thisgame].get('othergame')].get('gameNumber'))
time.sleep(5)
return False
else:
logging.debug("Doubleheader Game 2 start time: %s; Game 1 start time: %s", self.games[thisgame].get('gameInfo').get('date_object'), self.games[self.games[thisgame].get('othergame')].get('gameInfo').get('date_object'))
if self.games[self.games[thisgame].get('othergame')].get('gameInfo').get('date_object') > self.games[thisgame].get('gameInfo').get('date_object'): #game 1 start time is after game 2 start time
logging.info("Detected doubleheader Game 2 start time is before Game 1 start time. Using Game 1 start time + 3.5 hours for Game 2...")
self.games[thisgame]['gameInfo'].update({'date_object' : self.games[self.games[thisgame].get('othergame')].get('gameInfo').get('date_object') + timedelta(hours=3, minutes=30)}) #use game 1 start time + 3.5 hours for game 2 start time
logging.debug("Game 2 start time: %s; Game 1 start time: %s", self.games[thisgame].get('gameInfo').get('date_object'), self.games[self.games[thisgame].get('othergame')].get('gameInfo').get('date_object'))
if just_get_time:
post_time = self.games[thisgame].get('gameInfo').get('date_object').replace(hour=self.games[thisgame].get('gameInfo').get('date_object').hour - self.SETTINGS.get('GAME_THREAD').get('HOURS_BEFORE'))
if self.SETTINGS.get('GAME_THREAD').get('POST_BY'):
post_by_object = tzlocal.get_localzone().localize(datetime.strptime(datetime.today().strftime("%Y-%m-%d ") + self.SETTINGS.get('GAME_THREAD').get('POST_BY'), "%Y-%m-%d %I%p"))
if post_by_object < post_time: return post_by_object
return post_time
if self.games[thisgame].get('status').get('detailedState').startswith('Postponed') or self.games[thisgame].get('status').get('detailedState').startswith('Suspended') or self.games[thisgame].get('status').get('detailedState').startswith('Cancelled'):
if self.SETTINGS.get('POST_THREAD').get('ENABLED'):
logging.info("Game %s is %s, skipping game thread...",thisgame, self.games[thisgame].get('status').get('detailedState'))
self.games[thisgame].update({'skipflag':True})
else:
logging.info("Game %s is %s, overriding hours before setting for game thread since postgame thread is disabled...", thisgame, self.games[thisgame].get('status').get('detailedState'))
return True #go ahead and post the postgame thread since the game is postponed/suspended/canceled
if self.games[thisgame].get('status',{}).get('abstractGameState') in ['Final','Live']: return True #game has already started (or ended)
while True:
if self.SETTINGS.get('GAME_THREAD').get('POST_BY') and self.pregamecheck(self.SETTINGS.get('GAME_THREAD').get('POST_BY'),persist=False):
logging.info("POST_BY time reached for Game %s game thread...", thisgame)
return True
check = datetime.utcnow().replace(tzinfo=tz.utc).astimezone(tzlocal.get_localzone())
if self.games[thisgame].get('gameInfo').get('date_object_utc').astimezone(tzlocal.get_localzone()) >= check:
if (self.games[thisgame].get('gameInfo').get('date_object_utc').astimezone(tzlocal.get_localzone()) - check).seconds <= (self.SETTINGS.get('GAME_THREAD').get('HOURS_BEFORE') * 60 * 60):
return True
else:
post_time = self.games[thisgame].get('gameInfo').get('date_object_utc').astimezone(tzlocal.get_localzone()).replace(hour=self.games[thisgame].get('gameInfo').get('date_object_utc').astimezone(tzlocal.get_localzone()).hour - self.SETTINGS.get('GAME_THREAD').get('HOURS_BEFORE'))
if self.SETTINGS.get('GAME_THREAD').get('POST_BY'):
post_by_object = tzlocal.get_localzone().localize(datetime.strptime(datetime.today().strftime("%Y-%m-%d ") + self.SETTINGS.get('GAME_THREAD').get('POST_BY'), "%Y-%m-%d %I%p"))
if post_by_object < post_time:
logging.debug("Time to post game thread (based on POST_BY): %s", post_by_object)
else:
logging.debug("Time to post game thread (based on HOURS_BEFORE): %s", post_time)
if gamecount>1:
logging.info("Not time to post Game %s game thread yet, sleeping for 5 seconds...", thisgame)
time.sleep(5)
return False
else:
return True
开发者ID:toddrob99,项目名称:Baseball-GDT-Bot,代码行数:50,代码来源:timecheck.py
示例20: __init__
def __init__(
self,
filelist=None,
period=None,
duration=None
):
if period > 366:
# This is completely arbitrary
raise ValueError("Periods longer than an year are currently not supported")
uid=0
for ctab in self.crontabs:
for job in ctab:
if job.is_valid() and job.is_enabled():
# Creates an event in ical for every valid job occurence in PERIOD
current_period = 0
schedule = job.schedule()
while (current_period < period):
# Calculates the next occurency period from now
next_oc = schedule.get_next().replace(tzinfo=tzlocal.get_localzone())
current_period = (next_oc - datetime.datetime.now(tzlocal.get_localzone())).days
event = icalendar.Event()
event.add("uid", uid)
# We prefer comments over commands, if possible
if job.comment:
event.add("summary", job.comment)
else:
event.add("summary", job.command)
event.add("description" , job.command)
# For readability, all "events" have 10 minutes(see default_settings.py:
event.add("dtstart", next_oc)
event.add("dtend", next_oc + datetime.timedelta(minutes=duration))
event.add("dtstamp", datetime.datetime.now(tzlocal.get_localzone()))
self.calendar.add_component(event)
uid += 1
开发者ID:guhcampos,项目名称:cronical,代码行数:48,代码来源:cronical.py
注:本文中的tzlocal.get_localzone函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论