本文整理汇总了Python中trac.util.datefmt.to_timestamp函数的典型用法代码示例。如果您正苦于以下问题:Python to_timestamp函数的具体用法?Python to_timestamp怎么用?Python to_timestamp使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了to_timestamp函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
if 'mailarchive' in filters:
add_stylesheet(req, 'mailarchive/css/mailarchive.css')
db = self.env.get_db_cnx()
mailarchive_realm = Resource('mailarchive')
cursor = db.cursor()
cursor.execute("SELECT id,category as mlname,utcdate as localdate,"
"fromname,fromaddr , subject FROM mailarc "
"WHERE utcdate>=%s AND utcdate<=%s ",
(to_timestamp(start), to_timestamp(stop)))
for id,category,localdate, fromname, fromaddr,subject in cursor:
#if 'WIKI_VIEW' not in req.perm('wiki', name):
# continue
author = get_author(fromname,fromaddr)
#ctx = context('mailarchive', id)
resource = mailarchive_realm(id=id,version=None)
if 'MAILARCHIVE_VIEW' not in req.perm(resource):
continue
yield ('mailarchive',
datetime.fromtimestamp(localdate, utc),
author or '--',
(resource,(category,author,subject)))
开发者ID:okamototk,项目名称:kanonconductor,代码行数:25,代码来源:web_ui.py
示例2: test_initial_sync
def test_initial_sync(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
changes = [('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
('trunk/README', Node.FILE, Changeset.ADD, None, None)]
changesets = [Mock(Changeset, 0, '', '', t1,
get_changes=lambda: []),
Mock(Changeset, 1, 'Import', 'joe', t2,
get_changes=lambda: iter(changes))]
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: changesets[int(x)],
get_oldest_rev=lambda: 0,
get_youngest_rev=lambda: 1,
normalize_rev=lambda x: x,
next_rev=lambda x: int(x) == 0 and 1 or None)
cache = CachedRepository(self.db, repos, None, self.log)
cache.sync()
cursor = self.db.cursor()
cursor.execute("SELECT rev,time,author,message FROM revision")
self.assertEquals(('0', to_timestamp(t1), '', ''), cursor.fetchone())
self.assertEquals(('1', to_timestamp(t2), 'joe', 'Import'), cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
cursor.execute("SELECT rev,path,node_type,change_type,base_path,"
"base_rev FROM node_change")
self.assertEquals(('1', 'trunk', 'D', 'A', None, None),
cursor.fetchone())
self.assertEquals(('1', 'trunk/README', 'F', 'A', None, None),
cursor.fetchone())
self.assertEquals(None, cursor.fetchone())
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:30,代码来源:cache.py
示例3: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
if 'mentions' not in filters:
return
ts_start = to_timestamp(start)
ts_stop = to_timestamp(stop)
def make_event(mention):
ts = mention[4]
return (
'mention-%s' % mention[1],
datetime.fromtimestamp(ts, utc),
None,
mention,
)
db = self.env.get_db_cnx()
cursor = db.cursor()
try:
cursor.execute("SELECT mentioned, location, uri, text, at FROM mentions WHERE at>=%s AND at<=%s",
(ts_start, ts_stop,))
for r in cursor:
yield make_event(r)
except sqlite.OperationalError, e:
# db lock, table doesn't exist, or something else that's hopefully transient
self.env.log.info("Failed to fetch mentions: %s" % str(e))
开发者ID:trac-hacks,项目名称:tracmentions,代码行数:25,代码来源:plugin.py
示例4: get_num_closed_tix
def get_num_closed_tix(db, from_date, at_date, milestone):
"""Returns an integer of the number of close ticket events counted
between from_date to at_date."""
cursor = db.cursor()
args = [to_timestamp(from_date), to_timestamp(at_date)]
milestone_str = ''
if milestone:
args.append(milestone)
milestone_str += 'AND t.milestone = %s'
# Count tickets between two dates (note: does not account for tickets
# that were closed and then reopened between the two dates)
cursor.execute("""
SELECT newvalue
FROM ticket_change tc
INNER JOIN ticket t ON t.id = tc.ticket AND tc.time > %%s
AND tc.time <= %%s AND tc.field == 'status' %s
ORDER BY tc.time""" % milestone_str, args)
closed_count = 0
for (status,) in cursor:
if status == 'closed':
closed_count += 1
return closed_count
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:27,代码来源:ticketstats.py
示例5: _get_num_closed_tix
def _get_num_closed_tix(self, from_date, at_date, req, ticketFilter=""):
"""Returns an integer of the number of close ticket events counted
between from_date to at_date."""
status_map = {
'new': 0,
'reopened': 0,
'assigned': 0,
'closed': 1,
'edit': 0
}
count = 0
db = self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute("""
SELECT t.id, tc.field, tc.time, tc.oldvalue, tc.newvalue,
t.priority
FROM ticket_change tc
INNER JOIN ticket t ON t.id = tc.ticket
INNER JOIN enum p ON p.name = t.priority AND p.type = 'priority'
WHERE tc.time > %s AND tc.time <= %s %s
ORDER BY tc.time
""" % (to_timestamp(from_date), to_timestamp(at_date),
ticketFilter))
for tid, field, time, old, status, priority in cursor:
if field == 'status':
if status in ('new', 'assigned', 'reopened', 'closed', 'edit'):
count += status_map[status]
return count
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:34,代码来源:TicketStatsMacro.py
示例6: update
def update(self, db=None):
assert self.name, 'Cannot update milestone with no name'
if not db:
db = self.env.get_db_cnx()
handle_ta = True
else:
handle_ta = False
self.name = simplify_whitespace(self.name)
cursor = db.cursor()
self.env.log.info('Updating milestone "%s"' % self.name)
cursor.execute("UPDATE milestone SET name=%s,due=%s,"
"completed=%s,description=%s WHERE name=%s",
(self.name, to_timestamp(self.due), to_timestamp(self.completed),
self.description,
self._old_name))
self.env.log.info('Updating milestone field of all tickets '
'associated with milestone "%s"' % self.name)
cursor.execute("UPDATE ticket SET milestone=%s WHERE milestone=%s",
(self.name, self._old_name))
self._old_name = self.name
if handle_ta:
db.commit()
TicketSystem(self.env).reset_ticket_fields()
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:25,代码来源:model.py
示例7: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
"""
Return a list of events in the time range given by the `start` and
`stop` parameters.
The `filters` parameters is a list of the enabled filters, each item
being the name of the tuples returned by `get_timeline_filters`.
Since 0.11, the events are `(kind, date, author, data)` tuples,
where `kind` is a string used for categorizing the event, `date`
is a `datetime` object, `author` is a string and `data` is some
private data that the component will reuse when rendering the event.
When the event has been created indirectly by another module,
like this happens when calling `AttachmentModule.get_timeline_events()`
the tuple can also specify explicitly the provider by returning tuples
of the following form: `(kind, date, author, data, provider)`.
"""
if 'main_git_repository' in filters or \
'cloned_git_repository' in filters:
for event in GitHubEvent.get_commit_by_date(
self.env, to_timestamp(start), to_timestamp(stop), git_url=self.github_url):
if event.is_clone() and 'cloned_git_repository' in filters:
yield ('cloned_git_repository',
datetime.fromtimestamp(event.time, utc),
event.author,
event)
elif not event.is_clone() and 'main_git_repository' in filters:
yield ('main_git_repository',
datetime.fromtimestamp(event.time, utc),
event.author,
event) # TODO: only sent needed data
开发者ID:dinoboff,项目名称:trachub,代码行数:34,代码来源:timeline.py
示例8: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
if isinstance(start, datetime): # Trac>=0.11
from trac.util.datefmt import to_timestamp
start = to_timestamp(start)
stop = to_timestamp(stop)
if 'build' in filters:
add_stylesheet(req, 'BambooTrac/bambootrac.css')
feed = feedparser.parse(self.feed_url, handlers=[self.bAuth, self.dAuth])
for entry in feed.entries:
# check time range
completed = calendar.timegm(entry.date_parsed)
# create timeline entry
if entry.title.find('SUCCESS') >= 0:
message = 'Build finished successfully'
kind = 'bamboo-successful'
else:
message = 'Build failed'
kind = 'bamboo-failed'
fulltitle = entry.title.split(":")
newtitle = fulltitle[0]
href = entry.link
title = entry.title
comment = message + ' at ' + format_datetime(completed)
yield kind, href, newtitle, completed, None, comment
开发者ID:grimsy,项目名称:bamboo-trac-plugin,代码行数:33,代码来源:BambooTracPlugin.py
示例9: get_blog_comments
def get_blog_comments(env, post_name='', from_dt=None, to_dt=None):
""" Returns comments as a list of tuples from search based on
AND input for post_name, and datetime span (from_dt and to_dt):
(post_name, number, comment, author, time)
Instantiate BlogComment objects to get further details of each.
Example of sorting the output by time, newest first:
from trac.util.compat import sorted, itemgetter
comments = get_blog_comments(env)
sorted(comments, key=itemgetter(4), reverse=True) """
# Build the list of WHERE restrictions
args = [post_name and ("name=%s", post_name) or None,
from_dt and ("time>%s", to_timestamp(from_dt)) or None,
to_dt and ("time<%s", to_timestamp(to_dt)) or None]
args = [arg for arg in args if arg]
where_clause = ""
where_values = None
if args:
where_clause = "WHERE " + " AND ".join([arg[0] for arg in args])
where_values = tuple([arg[1] for arg in args])
# Do the SELECT
cnx = env.get_db_cnx()
cursor = cnx.cursor()
sql = "SELECT name, number, comment, author, time " \
"FROM fullblog_comments " + where_clause
env.log.debug("get_blog_comments() SQL: %r (%r)" % (sql, where_values))
cursor.execute(sql, where_values or None)
# Return the items we have found
return [(row[0], row[1], row[2], row[3], to_datetime(row[4], utc))
for row in cursor]
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:32,代码来源:model.py
示例10: save
def save(self, db=None):
"""Save changes or add a new paste."""
if db:
handle_ta = False
else:
handle_ta = True
db = self.env.get_db_cnx()
cursor = db.cursor()
if self.time is None:
self.time = datetime.now(utc)
if self.id is None:
cursor.execute('INSERT INTO pastes (title, author, mimetype, '
'data, time) VALUES (%s, %s, %s, %s, %s)',
(self.title, self.author, self.mimetype, self.data,
to_timestamp(self.time)))
self.id = db.get_last_id(cursor, 'pastes')
else:
cursor.execute('UPDATE pastes SET title=%s, author=%s, mimetype=%s,'
'data=%s, time=%s WHERE id = %s', (
self.title, self.author, self.mimetype, self.data,
to_timestamp(self.time), self.id
))
if handle_ta:
db.commit()
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:27,代码来源:model.py
示例11: test_update_page
def test_update_page(self):
cursor = self.db.cursor()
t = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
cursor.execute("INSERT INTO wiki VALUES(%s,%s,%s,%s,%s,%s,%s,%s)",
('TestPage', 1, to_timestamp(t), 'joe', '::1', 'Bla bla',
'Testing', 0))
page = WikiPage(self.env, 'TestPage')
page.text = 'Bla'
page.save('kate', 'Changing', '192.168.0.101', t2)
self.assertEqual(2, page.resource.version)
cursor.execute("SELECT version,time,author,ipnr,text,comment,"
"readonly FROM wiki WHERE name=%s", ('TestPage',))
self.assertEqual((1, to_timestamp(t), 'joe', '::1', 'Bla bla', 'Testing', 0),
cursor.fetchone())
self.assertEqual((2, to_timestamp(t2), 'kate', '192.168.0.101', 'Bla',
'Changing', 0), cursor.fetchone())
listener = TestWikiChangeListener(self.env)
self.assertEqual((page, 2, t2, 'Changing', 'kate', '192.168.0.101'),
listener.changed[0])
page = WikiPage(self.env, 'TestPage')
history = list(page.get_history())
self.assertEqual(2, len(history))
self.assertEqual((2, t2, 'kate', 'Changing', '192.168.0.101'),
history[0])
self.assertEqual((1, t, 'joe', 'Testing', '::1'), history[1])
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:30,代码来源:model.py
示例12: test_get_changes
def test_get_changes(self):
t1 = datetime(2001, 1, 1, 1, 1, 1, 0, utc)
t2 = datetime(2002, 1, 1, 1, 1, 1, 0, utc)
cursor = self.db.cursor()
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (0,%s,'','')", (to_timestamp(t1),))
cursor.execute("INSERT INTO revision (rev,time,author,message) "
"VALUES (1,%s,'joe','Import')", (to_timestamp(t2),))
cursor.executemany("INSERT INTO node_change (rev,path,node_type,"
"change_type,base_path,base_rev) "
"VALUES ('1',%s,%s,%s,%s,%s)",
[('trunk', 'D', 'A', None, None),
('trunk/README', 'F', 'A', None, None)])
cursor.execute("UPDATE system SET value='1' WHERE name='youngest_rev'")
repos = Mock(Repository, 'test-repos', None, self.log,
get_changeset=lambda x: None,
get_youngest_rev=lambda: 1,
get_oldest_rev=lambda: 0,
next_rev=lambda x: None,
normalize_rev=lambda rev: rev)
cache = CachedRepository(self.db, repos, None, self.log)
self.assertEqual('1', cache.youngest_rev)
changeset = cache.get_changeset(1)
self.assertEqual('joe', changeset.author)
self.assertEqual('Import', changeset.message)
self.assertEqual(t2, changeset.date)
changes = changeset.get_changes()
self.assertEqual(('trunk', Node.DIRECTORY, Changeset.ADD, None, None),
changes.next())
self.assertEqual(('trunk/README', Node.FILE, Changeset.ADD, None, None),
changes.next())
self.assertRaises(StopIteration, changes.next)
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:33,代码来源:cache.py
示例13: fetch_tickets
def fetch_tickets(self, tickets, ids, detailed):
for id in ids:
t = { 'id': id }
try:
ticket = model.Ticket(self.env, id)
except:
self.log.error('Failed to fetch ticket %d' % id)
if str(id) in tickets:
delattr(tickets, str(id))
continue
# Get mandatory fields
for field_name in self.mandatory_fields:
t[field_name] = ticket.get_value_or_default(field_name)
if id in detailed:
# Get fields that are are always shown in detail dialog
for field_name in self.always_shown_fields:
if field_name not in t:
t[field_name] = ticket.get_value_or_default(field_name)
# Get user specified extra fields
for field_name in self.fields:
if field_name not in self.mandatory_fields:
t[field_name] = ticket.get_value_or_default(field_name)
# Convert DateTimes to (millisecond) timestamps
if 'time' in t:
t['time'] = to_timestamp(t['time']) * 1000
if 'changetime' in t:
t['changetime'] = to_timestamp(t['changetime']) * 1000
# Get changes and comments and group changes from same action together
t['changelog'] = []
changelog = ticket.get_changelog()
time_entry = None
for log_item in changelog:
current_time = to_timestamp(log_item[0]) * 1000
if time_entry is None or time_entry['time'] < current_time:
if time_entry is not None:
t['changelog'].append(time_entry)
time_entry = {}
time_entry['time'] = current_time
time_entry['author'] = log_item[1]
time_entry['changes'] = []
change_entry = {}
change_entry['field'] = log_item[2]
change_entry['oldValue'] = log_item[3]
change_entry['newValue'] = log_item[4]
time_entry['changes'].append(change_entry)
if time_entry is not None:
t['changelog'].append(time_entry)
tickets[str(id)] = t
开发者ID:GraceH,项目名称:trac-kanban-board,代码行数:56,代码来源:kanbanboardmacro.py
示例14: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
try:
master = BuildBotSystem(self.buildbot_url)
except Exception as e:
print('Error hitting BuildBot', e)
return
# This was a comprehension: the loop is clearer
for build in master.getAllBuildsInInterval(to_timestamp(start), to_timestamp(stop)):
# BuildBot builds are reported as
# (builder_name, num, end, branch, rev, results, text)
print('Reporting build', build)
yield ('build', to_datetime(build[2]), '', build)
开发者ID:rapyuta,项目名称:buildbot,代码行数:12,代码来源:web_ui.py
示例15: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
self.log.debug("Monit: get_timeline_events() called")
conn = self.get_db_cnx()
myfilter = [f for f in filters if f.startswith('monit_')]
event_filter = [k for k,v in srv_types.items() if v in [f.split('_')[1] for f in myfilter]]
self.log.debug("Input: %s, filtered: %s, Types: %s" % (filters, myfilter, event_filter))
if event_filter:
#monit_realm = Resource('monit')
cur = conn.cursor()
sql = "SELECT COUNT(*) AS events FROM event WHERE \
collected_sec >=? AND collected_sec <=? AND type IN (%s)" % ','.join(['?' for e in event_filter])
cur.execute(sql, (to_timestamp(start), to_timestamp(stop))+tuple(event_filter))
self.log.debug("There are currently %s events for the range from %s to %s" % (
cur.fetchall(), start, stop))
sql = "SELECT * FROM event WHERE collected_sec >=? \
AND collected_sec <=? AND type IN (%s)" % ','.join(['?' for e in event_filter])
cur.execute(sql, (to_timestamp(start), to_timestamp(stop))+tuple(event_filter))
events = cur.fetchall()
for evt in events:
self.log.debug("Found monit event %s" % evt)
srv_table = "%s_service" % srv_types[evt['type']]
#self.log.debug("Selecting service entry with %s" )
cur.execute("SELECT * FROM %s WHERE id=? LIMIT 1" % srv_table, (evt['service_id'],))
srv = cur.fetchone()
#self.log.debug("Found service entry %s" % srv)
if srv:
cur.execute("SELECT * FROM monit WHERE id=?", (srv['monit_id'],))
monit = cur.fetchone()
#self.log.debug("Found monit instance %s" % monit)
if monit:
msg = ('monit', datetime.fromtimestamp(evt['collected_sec'], utc),
'[email protected]%s' % monit['localhostname'], (evt, srv, monit))
else:
self.log.warning("No monit entry with id '%s' found while rendering event '%s'." % (
srv['monit_id'], evt['id']))
msg = ('monit', datetime.fromtimestamp(evt['collected_sec'], utc),
'[email protected]', (evt, srv, None))
else:
self.log.warning("No service entry with id '%s' found while rendering event '%s'." % (
evt['service_id'], evt['id']))
msg = ('monit', datetime.fromtimestamp(evt['collected_sec'], utc),
'[email protected]', (evt, None, None))
yield msg
conn.close()
开发者ID:kopernikus,项目名称:systrac,代码行数:52,代码来源:monit.py
示例16: get_annotation_data
def get_annotation_data(self, context):
add_stylesheet(context.req, 'bitten/bitten_coverage.css')
resource = context.resource
# attempt to use the version passed in with the request,
# otherwise fall back to the latest version of this file.
version = context.req.args.get('rev') or resource.version
# get the last change revision for the file so that we can
# pick coverage data as latest(version >= file_revision)
created = context.req.args.get('created') or resource.version
full_path = get_resource_path(resource)
_name, repos, _path = get_repos(self.env, full_path, None)
version_time = to_timestamp(repos.get_changeset(version).date)
if version != created:
created_time = to_timestamp(repos.get_changeset(created).date)
else:
created_time = version_time
self.log.debug("Looking for coverage report for %[email protected]%s [%s:%s]..." % (
full_path, str(resource.version), created, version))
db = self.env.get_db_cnx()
cursor = db.cursor()
cursor.execute("""
SELECT b.id, b.rev, i2.value
FROM bitten_config AS c
INNER JOIN bitten_build AS b ON c.name=b.config
INNER JOIN bitten_report AS r ON b.id=r.build
INNER JOIN bitten_report_item AS i1 ON r.id=i1.report
INNER JOIN bitten_report_item AS i2 ON (i1.item=i2.item
AND i1.report=i2.report)
WHERE i2.name='line_hits'
AND b.rev_time>=%s
AND b.rev_time<=%s
AND i1.name='file'
AND """ + db.concat('c.path', "'/'", 'i1.value') + """=%s
ORDER BY b.rev_time DESC LIMIT 1""" ,
(created_time, version_time, full_path))
row = cursor.fetchone()
if row:
build_id, build_rev, line_hits = row
coverage = line_hits.split()
self.log.debug("Coverage annotate for %[email protected]%s using build %d: %s",
resource.id, build_rev, build_id, coverage)
return coverage
add_warning(context.req, "No coverage annotation found for "
"/%s for revision range [%s:%s]." % (
resource.id.lstrip('/'), version, created))
return []
开发者ID:hefloryd,项目名称:bitten,代码行数:52,代码来源:coverage.py
示例17: get_timeline_events
def get_timeline_events(self, req, start, stop, filters):
if Key.SPRINT not in filters:
return
# prepare select criteria
criteria = {'start': '>=%d' % to_timestamp(start),
'start': '<=%d' % to_timestamp(stop)}
for sprint in self.sp_manager.select(criteria=criteria):
# the first value of the data tuple tells if we're showing
# a start or an end date (True=start, False=end), see next function
if sprint.is_currently_running:
yield(Key.SPRINT, sprint.start, '', (True, sprint))
if sprint.is_closed:
yield(Key.SPRINT, sprint.end, '', (False, sprint))
开发者ID:djangsters,项目名称:agilo,代码行数:13,代码来源:web_ui.py
示例18: get_changesets
def get_changesets(self, start, stop):
db = self.getdb()
cursor = db.cursor()
cursor.execute("SELECT rev FROM revision "
"WHERE time >= %s AND time < %s "
"ORDER BY time DESC, rev DESC",
(to_timestamp(start), to_timestamp(stop)))
for rev, in cursor:
try:
if self.authz.has_permission_for_changeset(rev):
yield self.get_changeset(rev)
except NoSuchChangeset:
pass # skip changesets currently being resync'ed
开发者ID:gdgkyoto,项目名称:kyoto-gtug,代码行数:13,代码来源:cache.py
示例19: get_timeline_events
def get_timeline_events(self, req, start, stop, filters, pid, syllabus_id):
if pid is None:
return
is_multi = isinstance(pid, (list, tuple))
if is_multi:
# TODO:
return
# Worklog changes
show_starts = 'workstart' in filters
show_stops = 'workstop' in filters
if show_starts or show_stops:
add_stylesheet(req, "worklog/worklogplugin.css")
ts_start = to_timestamp(start)
ts_stop = to_timestamp(stop)
ticket_realm = Resource('ticket')
db = self.env.get_read_db()
cursor = db.cursor()
cursor.execute("""
SELECT wl.worker,wl.ticket,wl.time,wl.starttime,wl.comment,wl.kind,t.summary,t.status,t.resolution,t.type
FROM (
SELECT worker, ticket, starttime AS time, starttime, comment, 'start' AS kind
FROM work_log
UNION
SELECT worker, ticket, endtime AS time, starttime, comment, 'stop' AS kind
FROM work_log
) AS wl
JOIN ticket t ON t.id = wl.ticket AND project_id=%s AND wl.time>=%s AND wl.time<=%s
ORDER BY wl.time""", (pid, ts_start, ts_stop))
for worker,tid,ts,ts_start,comment,kind,summary,status,resolution,type in cursor:
ticket = ticket_realm(id=tid)
time = to_datetime(ts)
started = None
if kind == 'start':
if not show_starts:
continue
yield ('workstart', pid, time, worker, (ticket,summary,status,resolution,type, started, ""))
else:
if not show_stops:
continue
started = to_datetime(ts_start)
if comment:
comment = "(Time spent: %s)\n\n%s" % (pretty_timedelta(started, time), comment)
else:
comment = '(Time spent: %s)' % pretty_timedelta(started, time)
yield ('workstop', pid, time, worker, (ticket,summary,status,resolution,type, started, comment))
开发者ID:lexqt,项目名称:EduTracTicketWorklog,代码行数:50,代码来源:timeline_hook.py
示例20: get_pastes
def get_pastes(env, number=None, offset=None, from_dt=None, to_dt=None, db=None):
"""Returns a list of pastes as dicts without data.
One or more filters need to be set:
* number - maximum number of items that may be returned
* offset - number of items to skip in returned results
* from_dt - pasted on or after the given time (datetime object)
* to_dt - pasted before or on the given time (datetime object)
Returns dictionary of the form:
(id, title, author, time)
where time is in UTC.
To get the paste data, use id to instantiate a Paste object."""
db = db or env.get_db_cnx()
cursor = db.cursor()
sql = "SELECT id, title, author, time FROM pastes"
order_clause = " ORDER BY id DESC"
limit_clause = ""
if number:
limit_clause += " LIMIT %s" % number
if offset:
limit_clause += " OFFSET %s" % offset
where_clause = ""
where_values = None
args = [from_dt and ("time>%s", to_timestamp(from_dt)) or None,
to_dt and ("time<%s", to_timestamp(to_dt)) or None]
args = [arg for arg in args if arg] # Get rid of the None values
if args:
where_clause = " WHERE " + " AND ".join([arg[0] for arg in args])
where_values = tuple([arg[1] for arg in args])
sql += where_clause + order_clause + limit_clause
env.log.debug("get_pastes() SQL: %r (%r)" % (sql, where_values))
cursor.execute(sql, where_values)
result = []
for row in cursor:
result.append({
'id': row[0],
'title': row[1],
'author': row[2],
'time': datetime.fromtimestamp(row[3], utc)
})
return result
开发者ID:nyuhuhuu,项目名称:trachacks,代码行数:49,代码来源:model.py
注:本文中的trac.util.datefmt.to_timestamp函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论