本文整理汇总了Python中timer.timer函数的典型用法代码示例。如果您正苦于以下问题:Python timer函数的具体用法?Python timer怎么用?Python timer使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了timer函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: fetch_people_page
def fetch_people_page(conn, username, page = 1):
url = "/people/{}/answers".format(username)
url_page = "{}?page={:d}".format(url, page)
print("\n{}\t".format(url_page), end='')
sys.stdout.flush()
timer.timer()
try:
conn.request("GET", url_page)
except socket.timeout as e:
print('wow! timeout')
raise e
response = conn.getresponse()
t = timer.timer()
avg = int(get_average(t, 'user page'))
code = response.status
print("[{}]\t{} ms\tAvg: {} ms".format(code, t, avg))
if code == 404:
slog("user username fetch fail, code code")
dbhelper.update_user_by_name(username, {'fetch': dbhelper.FETCH_FAIL})
print( "没有这个用户", username)
return None
if code != 200:
slog("user username fetch fail, code code")
dbhelper.update_user_by_name(username, {'fetch': dbhelper.FETCH_FAIL})
print( "奇奇怪怪的返回码", code)
return None
content = response.read()
return content
开发者ID:linzelong,项目名称:zhihu-archive,代码行数:28,代码来源:zhihu.py
示例2: exec_readlines_test_filter_stderr
def exec_readlines_test_filter_stderr(self):
"""Test execReadlines and filter_stderr."""
# Test that stderr is normally included
with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
testscript.write("""#!/bin/sh
echo "one"
echo "two" >&2
echo "three"
exit 0
""")
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "two")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
# Test that filter stderr removes the middle line
with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
testscript.write("""#!/bin/sh
echo "one"
echo "two" >&2
echo "three"
exit 0
""")
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name], filter_stderr=True)
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
开发者ID:jaymzh,项目名称:anaconda,代码行数:35,代码来源:iutil_test.py
示例3: start_program_reset_handlers_test
def start_program_reset_handlers_test(self):
"""Test the reset_handlers parameter of startProgram."""
with tempfile.NamedTemporaryFile() as testscript:
testscript.write("""#!/bin/sh
# Just hang out and do nothing, forever
while true ; do sleep 1 ; done
""")
testscript.flush()
# Start a program with reset_handlers
proc = iutil.startProgram(["/bin/sh", testscript.name])
with timer(5):
# Kill with SIGPIPE and check that the python's SIG_IGN was not inheritted
# The process should die on the signal.
proc.send_signal(signal.SIGPIPE)
proc.communicate()
self.assertEqual(proc.returncode, -(signal.SIGPIPE))
# Start another copy without reset_handlers
proc = iutil.startProgram(["/bin/sh", testscript.name], reset_handlers=False)
with timer(5):
# Kill with SIGPIPE, then SIGTERM, and make sure SIGTERM was the one
# that worked.
proc.send_signal(signal.SIGPIPE)
proc.terminate()
proc.communicate()
self.assertEqual(proc.returncode, -(signal.SIGTERM))
开发者ID:cyclefusion,项目名称:anaconda,代码行数:30,代码来源:iutil_test.py
示例4: saveAnswer
def saveAnswer(conn, username, answer_link_list, dblock):
regex = re.compile(r'^/question/(\d+)/answer/(\d+)')
success_ratio = None
avg = None
for url in answer_link_list:
matches = regex.search(url)
if matches is None:
raise Exception('url not good')
qid = matches.group(1)
aid = matches.group(2)
slog("\t{}".format(url))
sys.stdout.flush()
timer.timer('saveAnswer')
content = get_url(url)
if content is None:
continue
success_ratio = get_average(0 if content is None else 1, 'success_ratio')
t = timer.timer('saveAnswer')
avg = int(get_average(t))
slog("\t{} ms".format(t))
if len(content) == 0:
slog("content is empty\n")
slog("url [code] empty")
return False
question, descript, content, vote = parse_answer_pure(content)
slog("{}\t^{}\t{}".format(url, vote, question))
with dblock:
dbhelper.saveQuestion(qid, question, descript)
dbhelper._saveAnswer(aid, qid, username, content, vote)
if success_ratio is not None and avg is not None:
success_ratio = int(success_ratio*100)
print("\tAvg: {} ms\tsuccess_ratio: {}%".format(avg, success_ratio))
开发者ID:linzelong,项目名称:zhihu-archive,代码行数:34,代码来源:zhihu.py
示例5: run
def run(fn, name, imshow=False):
print
print "---", name
for (prefix, wrapper) in [('parakeet-', jit), ('numba-', autojit)]:
try:
wrapped_fn = wrapper(fn)
with timer(prefix + name + '-compile', True):
wrapped_fn(image[:1, :1], k)
with timer(prefix + name, False):
result = wrapped_fn(image, k)
if imshow:
import pylab
pylab.imshow(image)
pylab.figure()
pylab.imshow(result)
pylab.figure()
pylab.imshow(scipy_result)
pylab.show()
if not running_pypy:
assert allclose(result, scipy_result)
except KeyboardInterrupt:
raise
except:
print "%s failed" % (prefix+name)
import sys
print sys.exc_info()[1]
开发者ID:cournape,项目名称:parakeet,代码行数:26,代码来源:morphology.py
示例6: exec_readlines_test_signals
def exec_readlines_test_signals(self):
"""Test execReadlines and signal receipt."""
# ignored signal
old_HUP_handler = signal.signal(signal.SIGHUP, signal.SIG_IGN)
try:
with tempfile.NamedTemporaryFile(mode="wt") as testscript:
testscript.write(
"""#!/bin/sh
echo "one"
kill -HUP $PPID
echo "two"
echo -n "three"
exit 0
"""
)
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "two")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
finally:
signal.signal(signal.SIGHUP, old_HUP_handler)
# caught signal
def _hup_handler(signum, frame):
pass
old_HUP_handler = signal.signal(signal.SIGHUP, _hup_handler)
try:
with tempfile.NamedTemporaryFile(mode="wt") as testscript:
testscript.write(
"""#!/bin/sh
echo "one"
kill -HUP $PPID
echo "two"
echo -n "three"
exit 0
"""
)
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "two")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
finally:
signal.signal(signal.SIGHUP, old_HUP_handler)
开发者ID:nandakishore1006,项目名称:anaconda,代码行数:53,代码来源:iutil_test.py
示例7: start_qt_app
def start_qt_app(config):
import sys
from PyQt5.QtWidgets import QApplication, QSystemTrayIcon, QMessageBox
from quamash import QEventLoop
from ui.main import Window
from ui.qt_gui_connection import qSignal
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
if not QSystemTrayIcon.isSystemTrayAvailable():
QMessageBox.critical(None, "Systray",
"I couldn't detect any system tray on this system.")
sys.exit(1)
QApplication.setQuitOnLastWindowClosed(False)
gui_connection = qSignal()
window = Window(gui_connection)
def closeApp():
print("Close app signal")
for task in asyncio.Task.all_tasks():
print(task)
task.cancel()
loop.stop()
gui_connection.closeApp.connect(closeApp)
with loop:
#asyncio.run_coroutine_threadsafe(timer(loop, config, gui_connection), loop)
try:
loop.run_until_complete(timer(loop, config, gui_connection))
except asyncio.CancelledError:
pass
开发者ID:Aksem,项目名称:KoffeeBreak,代码行数:35,代码来源:main.py
示例8: walk
def walk (self, x, y):
assert(x == 1 or x == -1 or x == 0)
assert(y == 1 or y == -1 or y == 0)
if x == 0 and y == 0:
return
elif self.timer is None or self.timer.is_complete():
self.x += x
self.y += y
self.__x = x
self.__y = y
self.__offset_x = common.TILE_SIZE * x
self.__offset_y = common.TILE_SIZE * y
if x < 0:
self.ori = 'left'
anim = self.left
elif x > 0:
self.ori = 'right'
anim = self.right
elif y > 0:
self.ori = 'down'
anim = self.down
else:
self.ori = 'up'
anim = self.up
anim.play(WALK_MOVEMENT_TIME)
self.timer = timer(WALK_MOVEMENT_TIME)
开发者ID:marlosdm,项目名称:Maze-Of-Kindred,代码行数:34,代码来源:player.py
示例9: exec_readlines_auto_kill_test
def exec_readlines_auto_kill_test(self):
"""Test execReadlines with reading only part of the output"""
with tempfile.NamedTemporaryFile() as testscript:
testscript.write("""#!/bin/sh
# Output forever
while true; do
echo hey
done
""")
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
# Save the process context
proc = rl_iterator._proc
# Read two lines worth
self.assertEqual(rl_iterator.next(), "hey")
self.assertEqual(rl_iterator.next(), "hey")
# Delete the iterator and wait for the process to be killed
del rl_iterator
proc.communicate()
# Check that the process is gone
self.assertIsNotNone(proc.poll())
开发者ID:cyclefusion,项目名称:anaconda,代码行数:28,代码来源:iutil_test.py
示例10: _stepFade
def _stepFade(self, final=False):
#Figure out what step we're on
secondsSoFar = (datetime.now() - self.fadeStartTime).total_seconds()
prog = min(secondsSoFar / self.fadeSeconds, 1)
self.currentStep = int(round(prog * self.numSteps))
#print ("%3.2f%%, step %d/%d"%(prog, self.currentStep, self.numSteps))
#set colors to correct output
self.r = self.last_r + int(prog * self.delta_r)
self.g = self.last_g + int(prog * self.delta_g)
self.b = self.last_b + int(prog * self.delta_b)
self.apply()
# Are we done yet?
if self.currentStep < self.numSteps:
# if not, set up the next timeout
self.fadeTimer = timer(
timeout = datetime.now() + timedelta(seconds=self.secondsPerStep),
callback = self._stepFade,
res = self.secondsPerStep)
self.fadeTimer.start()
elif not final:
#This is the second-to-last step!
# make sure we get to do the last step (because we
# don't want rounding or CPU delays to make us stop at 96.7%..
self._stepFade(final=True)
else:
# call the callback if there is one, then reset it so
# it doens't get called again if it's not set in the future
self.fadeCallback and self.fadeCallback()
开发者ID:fastfieros,项目名称:sunrise,代码行数:35,代码来源:sunrise.py
示例11: test_still_running
def test_still_running():
with timer(5):
# Run something forever so we can kill it
proc = iutil.startProgram(["/bin/sh", "-c", "while true; do sleep 1; done"])
iutil.watchProcess(proc, "test1")
proc.kill()
# Wait for the SIGCHLD
signal.pause()
开发者ID:cyclefusion,项目名称:anaconda,代码行数:8,代码来源:iutil_test.py
示例12: pvalue
def pvalue(log_pmf, s0, L, desired_beta):
"""Compute $log((exp(log_pmf)**L)[s0:])$, such that the relative error
to the exact answer is less than or equal to $desired_beta$."""
total_len, _ = utils.iterated_convolution_lengths(len(log_pmf), L)
if s0 >= total_len:
return NEG_INF
_, p_lower_preshift, p_upper_preshift = _bounds(log_pmf, log_pmf, 0, 0.0,
s0, L, desired_beta)
sfft_good_preshift, sfft_pval_preshift = _check_sfft_pvalue(p_lower_preshift,
p_upper_preshift,
desired_beta)
if sfft_good_preshift:
logging.debug(' pre-shift sfft worked %.20f', sfft_pval_preshift)
return sfft_pval_preshift
with timer('computing theta'):
theta = _compute_theta(log_pmf, s0, L)
logging.debug('raw theta %s', theta)
# TODO: too-large or negative theta causes numerical instability,
# so this is a huge hack
theta = utils.clamp(theta, 0, THETA_LIMIT)
shifted_pmf, log_mgf = utils.shift(log_pmf, theta)
beta = desired_beta / 2.0
with timer('bounds'):
log_delta, p_lower, p_upper = _bounds(log_pmf, shifted_pmf, theta, log_mgf,
s0, L, desired_beta)
sfft_good, sfft_pval = _check_sfft_pvalue(p_lower, p_upper, desired_beta)
logging.debug('theta %s, log_mgf %s, beta %s, log delta %s', theta, log_mgf, beta, log_delta)
if sfft_good:
logging.debug(' sfft worked %.20f', sfft_pval)
return sfft_pval
delta = np.exp(log_delta)
conv = conv_power(shifted_pmf, L, beta, delta)
pval = utils.log_sum(utils.unshift(conv, theta, (log_mgf, L))[s0:])
logging.debug(' sis pvalue %.20f', pval)
return pval
开发者ID:huonw,项目名称:sisfft-py,代码行数:43,代码来源:sisfft.py
示例13: main
def main(input, multi, compression):
if os.path.isfile(input) or os.path.isdir(input):
start = timer()
if os.path.isdir(input):
list = os.listdir(input)
files = []
for l in list:
if os.path.splitext(l)[1] == ".psd":
l = os.path.join(input, l)
if os.path.isfile(l):
files.append(l)
else:
files = [input]
print("Found {} files to convert.".format(len(files)))
for f in files:
layers = extract_layers(f)
i = 0
for layer in layers.split("\n"):
i += 1
layer = layer.strip()
if layer == "":
print("Skipping empty layer name. Likely flattened compatibility layer.")
else:
print("layer {}: {}".format(i, layer))
tmpfile = export_layer(i, layer, f, compression)
exr_compression(tmpfile, compression)
cleanup(tmpfile)
if multi:
exr_multipart(layers.split("\n"), f)
else:
print("Not a PSD document. Skipping.")
timer(start, "PSD To EXR Conversion")
开发者ID:AlexKucera,项目名称:tinkertoys,代码行数:43,代码来源:convert_psd_to_exr.py
示例14: consumer
def consumer(index, filename, start, end, u):
parser = apparser.apparser()
t = timer.timer()
# print('started job %d' % index)
i = 0
t2 = timer.timer()
# map(parser.proto, iterate_log(filename, start, end))
# t.stop('job %d' % index, i)
# return (index, i)
for l in iterate_log(filename, start, end):
# if (i % 100001) == 0 and not i == 0:
# t2.stop('[%d] parsed %10d lines' % (index, i), i)
i += 1
# r = apparser._parse2(l)
r = parser.proto(l)
# break
# print(parser)
# u.get(parser.vhost, index)
t.stop("job %d" % index, i)
return (index, i)
开发者ID:T0aD,项目名称:pyawstats,代码行数:21,代码来源:para_test.py
示例15: exec_readlines_test_normal_output
def exec_readlines_test_normal_output(self):
"""Test the output of execReadlines."""
# Test regular-looking output
with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
testscript.write(
"""#!/bin/sh
echo "one"
echo "two"
echo "three"
exit 0
"""
)
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "two")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
# Test output with no end of line
with tempfile.NamedTemporaryFile(mode="w+t") as testscript:
testscript.write(
"""#!/bin/sh
echo "one"
echo "two"
echo -n "three"
exit 0
"""
)
testscript.flush()
with timer(5):
rl_iterator = iutil.execReadlines("/bin/sh", [testscript.name])
self.assertEqual(next(rl_iterator), "one")
self.assertEqual(next(rl_iterator), "two")
self.assertEqual(next(rl_iterator), "three")
self.assertRaises(StopIteration, rl_iterator.__next__)
开发者ID:nandakishore1006,项目名称:anaconda,代码行数:40,代码来源:iutil_test.py
示例16: main
def main():
# Get forecasts and save to DB
def get_forecasts():
data = wfs.wfs().get_all_sources()
dbase = db.db()
dbase.insert_wfs_data(data)
dbase.close_connection()
# Read data from sensors and save to DB
def read_sensors():
data = sensors.sensors().read_all()
dbase = db.db()
dbase.insert_sensor_data(data)
dbase.close_connection()
# Screensaver
scr = Thread(target = pir.screensaver)
scr.daemon = True
scr.start()
##test
##get_forecasts()
##read_sensors()
# get data from forecasts each hour
repeat_get_forecasts = timer.timer(3600, get_forecasts)
repeat_get_forecasts.start()
# get data from sensors each minute
repreat_read_sensors = timer.timer(60, read_sensors)
repreat_read_sensors.start()
# Send sensor data to openweathermap.com
#response = wfs.wfs().send_to_owm()
#print (response)
# Interface
root = gui.Interface()
root.mainloop()
开发者ID:spalk,项目名称:dimeteo,代码行数:40,代码来源:dimeteo.py
示例17: __init__
def __init__(self,website,query,max_results = 500):
self.base_url = google_search_url(website,query)
self.url = self.base_url
self.max_results = max_results
self.max_pages = math.ceil(max_results/100.)
self.page_number = 1
self.results_list = []
self.timer = timer.timer(mean = 3.13,sd = 0.62, floor = 15.12, ceiling = 46.1)
self.website = website
self.query = query
self.driver = setup_google_driver()
self.open_first_url()
time.sleep(1)
self.terminated = False
开发者ID:danjump,项目名称:recipe_optimizer,代码行数:14,代码来源:google_search.py
示例18: start_program_stdout_test
def start_program_stdout_test(self):
"""Test redirecting stdout with startProgram."""
marker_text = "yo wassup man"
# Create a temporary file that will be written by the program
with tempfile.NamedTemporaryFile() as testfile:
# Open a new copy of the file so that the child doesn't close and
# delete the NamedTemporaryFile
stdout = open(testfile.name, 'w')
with timer(5):
proc = iutil.startProgram(["/bin/echo", marker_text], stdout=stdout)
proc.communicate()
# Rewind testfile and look for the text
testfile.seek(0, os.SEEK_SET)
self.assertEqual(testfile.read().strip(), marker_text)
开发者ID:cyclefusion,项目名称:anaconda,代码行数:16,代码来源:iutil_test.py
示例19: __init__
def __init__(self, cf):
super(TaskBarIcon, self).__init__()
self.__config__ = cf
self.mute = False
self.SetIcon(res.icon.GetIcon(), self.__config__.result['name']) # Set icon
try:
# Choose an startup message
msg = random.choice(self.__config__.findMessageByCondition({'startup': True}))
except IndexError:
msg = []
if msg != []:
self.show(msg.chooseMessage())
# Set a timer
self.__timer__ = timer.timer(self.__config__, self)
self.__timer__.start()
开发者ID:LYZhelloworld,项目名称:Meidochan,代码行数:17,代码来源:taskicon.py
示例20: watch_process_test
def watch_process_test(self):
"""Test watchProcess"""
def test_still_running():
with timer(5):
# Run something forever so we can kill it
proc = iutil.startProgram(["/bin/sh", "-c", "while true; do sleep 1; done"])
iutil.watchProcess(proc, "test1")
proc.kill()
# Wait for the SIGCHLD
signal.pause()
self.assertRaises(iutil.ExitError, test_still_running)
# Make sure watchProcess checks that the process has not already exited
with timer(5):
proc = iutil.startProgram(["true"])
proc.communicate()
self.assertRaises(iutil.ExitError, iutil.watchProcess, proc, "test2")
开发者ID:cyclefusion,项目名称:anaconda,代码行数:18,代码来源:iutil_test.py
注:本文中的timer.timer函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论