本文整理汇总了Python中time.perf_counter函数的典型用法代码示例。如果您正苦于以下问题:Python perf_counter函数的具体用法?Python perf_counter怎么用?Python perf_counter使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了perf_counter函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: benchfindindices
def benchfindindices(arraycode):
"""Benchmark the findindices function.
"""
# These are set to target a balanced execution time for the different tests.
pyitercounts = calibrationdata['findindices'][0]
afitercounts = calibrationdata['findindices'][1]
compval = comptype(arraycode, 10)
data = array.array(arraycode, itertools.repeat(0, ARRAYSIZE))
data[-1] = compval
dataout = array.array('q', itertools.repeat(0, ARRAYSIZE))
# Native Python time.
starttime = time.perf_counter()
for i in range(pyitercounts):
z = 0
for x in data:
if x == compval:
dataout[z] = z
z += 1
endtime = time.perf_counter()
pythontime = (endtime - starttime) / pyitercounts
dataout = array.array('q', itertools.repeat(0, ARRAYSIZE))
# Arrayfunc time.
starttime = time.perf_counter()
for i in range(afitercounts):
x = arrayfunc.findindices(arrayfunc.aops.af_eq, data, dataout, compval)
endtime = time.perf_counter()
functime = (endtime - starttime) / afitercounts
return (pythontime, functime, pythontime / functime)
开发者ID:m1griffin,项目名称:arrayfunc,代码行数:35,代码来源:benchfuncs.py
示例2: run
def run(self):
activation_time = perf_counter()
activation_locked = False
while self.alive:
# determine if any of the neighbours are active
activity_level = 0
num_active = 0
for var in self.in_var.values():
if var.val > activity_level:
activity_level = var.val
if var.val > self.config["active_thres"]:
num_active += 1
if self.out_var["neighbour_active"].val:
if (
perf_counter() - activation_time > self.config["active_period"]
or activity_level < self.config["deactive_thres"]
):
self.out_var["neighbour_active"].val = False
activation_locked = True
elif not activation_locked and activity_level >= self.config["active_thres"]:
sleep(2.0)
self.out_var["neighbour_active"].val = True
activation_time = perf_counter()
if activation_locked:
if activity_level < self.config["deactive_thres"]:
activation_locked = False
sleep(max(0, self.messenger.estimated_msg_period * 10))
开发者ID:rbgorbet,项目名称:Hylozoic-Series-3,代码行数:35,代码来源:neighbourhood_node.py
示例3: _generate_statements
def _generate_statements(stmt, args, iterations, duration):
if duration is None:
yield from itertools.repeat((stmt, args), iterations or 100)
else:
now = perf_counter()
while perf_counter() - now < duration:
yield (stmt, args)
开发者ID:mikethebeer,项目名称:cr8,代码行数:7,代码来源:engine.py
示例4: format_msg
def format_msg(self, msg, dpth=3, ops='+fun:ln +wait==') :
if '(==' in msg :
# Начать замер нового интервала
self.stms = self.stms + [perf_counter()]
msg = msg.replace( '(==', '(==[' + Tr.format_tm(0) + ']' )
if '+fun:ln' in ops :
frCaller= inspect.stack()[dpth] # 0-format_msg, 1-Tr.log|Tr.TrLiver, 2-log, 3-need func
try:
cls = frCaller[0].f_locals['self'].__class__.__name__ + '.'
except:
cls = ''
fun = (cls + frCaller[3]).replace('.__init__','()')
ln = frCaller[2]
msg = '[{}]{}{}:{} '.format( Tr.format_tm( perf_counter() - self.tm ), self.gap, fun, ln ) + msg
else :
msg = '[{}]{}'.format( Tr.format_tm( perf_counter() - self.tm ), self.gap ) + msg
if '+wait==' in ops :
if ( '==)' in msg or '==>' in msg ) and len(self.stms)>0 :
# Закончить/продолжить замер последнего интервала и вывести его длительность
sign = '==)' if '==)' in msg else '==>'
# sign = icase( '==)' in msg, '==)', '==>' )
stm = '[{}]'.format( Tr.format_tm( perf_counter() - self.stms[-1] ) )
msg = msg.replace( sign, sign+stm )
if '==)' in msg :
del self.stms[-1]
if '=}}' in msg :
# Отменить все замеры
self.stms = []
return msg.replace('¬',c9).replace('¶',c10)
开发者ID:heX16,项目名称:CudaText,代码行数:33,代码来源:cd_plug_lib.py
示例5: update
def update(self):
if not self.paused:
calc_start_time = time.perf_counter() # Get calculation start time
# Run update loop until simulation is caught up with real time or time out occurs
needs_update = (((time.perf_counter() - calc_start_time) < self.update_time_out and
self.sim_time < (time.perf_counter() - self.real_time_start)) or
self.run_max_speed)
while needs_update:
self.sim_time += self.dt
self.swarm.update()
self.predator.update()
# Check termination conditions
if (self.sim_time > self.time_out or # Time out
len(self.swarm.agents) == 0 or # All agents dead
self.predator.is_kill): # Predator is dead
total_agent_health = 0
for agent in self.swarm.agents:
total_agent_health += agent.health
print("time = ", self.sim_time)
print("survivor_count = ", len(self.swarm.agents))
print("total_agent_health = ", total_agent_health)
print("predator_health = ", self.predator.health)
return "terminate"
needs_update = ((time.perf_counter() - calc_start_time) < self.update_time_out and
self.sim_time < (time.perf_counter() - self.real_time_start) and
not self.run_max_speed)
开发者ID:tdm5923,项目名称:AIProject_2014,代码行数:29,代码来源:environment.py
示例6: main
def main():
if len(sys.argv) != 3:
print('Error, must supply 2 arguments.\n\n' +
'Usage: python3 project_3.py <text_file> <n>')
sys.exit(1)
filename = sys.argv[1]
n = int(sys.argv[2])
# read up to n words from the file
word_list = []
input_file = open(filename, 'r')
for i in range(n):
word_list.append(input_file.readline().rstrip())
input_file.close()
# print input information and first 10 words of input list
print('requested n = {}'.format(n))
print("loaded {} lines from '{}'".format(n, filename))
print_ten_words(word_list)
print('\nselection sort...')
# run and time the selection sort algorithm
start = time.perf_counter()
sorted_list = selection_sort(word_list)
end = time.perf_counter()
# print run time information and first 10 words of sorted list
print_ten_words(sorted_list)
print('\nelapsed time: {} seconds'.format(end - start))
开发者ID:honeytoast,项目名称:Algorithms-Sorting,代码行数:30,代码来源:selection_sort.py
示例7: main
def main():
if len(sys.argv) != 4:
print('error: you must supply exactly four arguments\n\n' +
'usage: python3 <Python source code file> <text file> <selection||merge> <n>')
sys.exit(1)
filename = sys.argv[1]
sort_type = sys.argv[2]
n = int(sys.argv[3])
input_list = read_file(filename, n)
print('Requested n = ' + str(n))
print('Loaded ' + str(n) + ' lines from "' + filename + '"')
first_ten(input_list)
if sort_type == "selection":
print('Selection Sort [O(n2)]...')
start = time.perf_counter()
sorted_list = selection_sort(input_list)
end = time.perf_counter()
elif sort_type == "merge":
print('Merge Sort [O(n log n)]...')
start = time.perf_counter()
sorted_list = merge_sort(input_list)
end = time.perf_counter()
first_ten(sorted_list)
print('Elapsed time: ' + str(end-start) + " seconds")
开发者ID:jaredkotoff,项目名称:Algorithms,代码行数:29,代码来源:project3.py
示例8: _get_related_entity_ids
def _get_related_entity_ids(session, entity_filter):
from homeassistant.components.recorder.models import States
from homeassistant.components.recorder.util import \
RETRIES, QUERY_RETRY_WAIT
from sqlalchemy.exc import SQLAlchemyError
import time
timer_start = time.perf_counter()
query = session.query(States).with_entities(States.entity_id).distinct()
for tryno in range(0, RETRIES):
try:
result = [
row.entity_id for row in query
if entity_filter(row.entity_id)]
if _LOGGER.isEnabledFor(logging.DEBUG):
elapsed = time.perf_counter() - timer_start
_LOGGER.debug(
'fetching %d distinct domain/entity_id pairs took %fs',
len(result),
elapsed)
return result
except SQLAlchemyError as err:
_LOGGER.error("Error executing query: %s", err)
if tryno == RETRIES - 1:
raise
time.sleep(QUERY_RETRY_WAIT)
开发者ID:chilicheech,项目名称:home-assistant,代码行数:31,代码来源:__init__.py
示例9: update_properties
def update_properties(self):
"""Handle data changes for node values."""
self._state = self.values.primary.data
if self.refresh_on_update and \
time.perf_counter() - self.last_update > 30:
self.last_update = time.perf_counter()
self.node.request_state()
开发者ID:BaptisteSim,项目名称:home-assistant,代码行数:7,代码来源:zwave.py
示例10: run
def run(in_globals=False):
global output
doc["console"].value = ''
src = editor.getValue()
if storage is not None:
storage["py_src"] = src
t0 = time.perf_counter()
try:
if(in_globals):
exec(src)
else:
ns = {}
exec(src, ns)
state = 1
except Exception as exc:
traceback.print_exc(file=sys.stderr)
state = 0
output = doc["console"].value
print('Brython: %6.2f ms' % ((time.perf_counter() - t0) * 1000.0))
# run with CPython
req = ajax.ajax()
req.bind('complete',on_complete)
req.set_timeout(4,err_msg)
req.open('POST','/cgi-bin/speed.py',True)
req.set_header('content-type','application/x-www-form-urlencoded')
req.send({'src':src})
return state
开发者ID:Fireheart182,项目名称:brython,代码行数:31,代码来源:editor.py
示例11: _delay_accordingly
def _delay_accordingly(self, current_clock):
scheduling_interval = 0.05 # Seconds till reschedule of process
elapsed_time = time.perf_counter() - self._last_process_time
if elapsed_time < scheduling_interval:
# FIXME: Not very content with this. Limits our resolution a lot.
# Should be fine for interactive use but will suck for
# interfacing with say an outside circuit. Not that I expect
# us to do a PWM but 20Hz _if_ things go right isn't that
# great. It's totally fixable though. Should switch to a
# min{next_event, control_proc_schedule} with busy loop for
# small delays. In any case I'm not sure how much much
# timing precision we can squeeze out this anyways.
time.sleep(scheduling_interval - elapsed_time)
# Set target clock independently of history. This prevents the
# simulation from trying to "catch" up. Imho the right way for
# interactive control. If this behavior is wanted we should switch
# from a delta to an absolute simulation time calculation.
target_clock = current_clock + \
self._simulation_rate * scheduling_interval
self._last_process_time = time.perf_counter()
return target_clock, self._last_process_time + scheduling_interval
开发者ID:hacst,项目名称:LogikSimPython,代码行数:25,代码来源:controller.py
示例12: run_performance_test
def run_performance_test(self, src):
#execute code
t0 = time.perf_counter()
try:
exec(src)
state = 1
except Exception as exc:
traceback.print_exc(file=sys.stderr)
self.add_brython_result(int((time.perf_counter() - t0) * 1000.0))
src = src.replace('\r\n','\n')
self._timings[self._filename]['code'] = src
def err_msg(*args):
from javascript import console
console.log(args)
#also run this for cpython via cgi-bin
# run with CPython
req = ajax.ajax()
req.bind('complete',self.on_cpython_complete)
req.set_timeout(4,err_msg)
req.open('POST','/cgi-bin/script_timer.py',False)
req.set_header('content-type','application/x-www-form-urlencoded')
req.send({'filename': self._filename})
return state
开发者ID:BrythonServer,项目名称:brython,代码行数:28,代码来源:perf_bookkeeping.py
示例13: timeblock
def timeblock(label):
start = time.perf_counter()
try:
yield
finally:
end = time.perf_counter()
print('{}:{}'.format(label,end - start))
开发者ID:jishandong,项目名称:python-learn,代码行数:7,代码来源:time_block.py
示例14: benchasumov
def benchasumov(arraycode):
"""Benchmark the asum function with overflow checking disabled.
"""
# These are set to target a balanced execution time for the different tests.
pyitercounts = calibrationdata['asum'][0]
afitercounts = calibrationdata['asum'][1]
compval = comptype(arraycode, 10)
data = array.array(arraycode, itertools.repeat(0, ARRAYSIZE))
data[-1] = compval
# Native Python time.
starttime = time.perf_counter()
for i in range(pyitercounts):
x = sum(data)
endtime = time.perf_counter()
pythontime = (endtime - starttime) / pyitercounts
# Arrayfunc time.
starttime = time.perf_counter()
for i in range(afitercounts):
x = arrayfunc.asum(data, ov=True)
endtime = time.perf_counter()
functime = (endtime - starttime) / afitercounts
return (pythontime, functime, pythontime / functime)
开发者ID:m1griffin,项目名称:arrayfunc,代码行数:28,代码来源:benchfuncs.py
示例15: run
def run(self):
UPDATE_INTERVAL_MS = self.update_interval_ms
UPDATE_PER_FRAME_LIMIT = self.update_per_frame_limit
clock = 0
previous_time = perf_counter()
next_update = 0
proceed = True
while proceed:
current_time = perf_counter()
if current_time - previous_time > (UPDATE_INTERVAL_MS * UPDATE_PER_FRAME_LIMIT):
clock += UPDATE_INTERVAL_MS * UPDATE_PER_FRAME_LIMIT
else:
clock += current_time - previous_time
while clock >= next_update:
time_elapsed = UPDATE_INTERVAL_MS
time_current = next_update
self.update(time_elapsed, time_current)
next_update += UPDATE_INTERVAL_MS
previous_time = perf_counter()
self.render()
if terminal.has_input():
key = terminal.read()
if key is terminal.TK_CLOSE:
proceed = False
else:
proceed = self.process_input(key)
开发者ID:NoahTheDuke,项目名称:roguelike,代码行数:27,代码来源:rl_2.py
示例16: preprocess_signals_and_peaks
def preprocess_signals_and_peaks():
dataio = DataIO(dirname=dirname)
catalogueconstructor = CatalogueConstructor(dataio=dataio)
print(dataio)
catalogueconstructor.set_preprocessor_params(chunksize=1024,
memory_mode='memmap',
#signal preprocessor
#~ signalpreprocessor_engine='numpy',
signalpreprocessor_engine='opencl',
highpass_freq=300,
common_ref_removal=True,
backward_chunksize=1280,
#peak detector
peakdetector_engine='numpy',
#~ peakdetector_engine='opencl',
peak_sign='-',
relative_threshold=7,
peak_span=0.0005,
)
t1 = time.perf_counter()
catalogueconstructor.estimate_signals_noise(seg_num=0, duration=10.)
t2 = time.perf_counter()
print('estimate_signals_noise', t2-t1)
t1 = time.perf_counter()
catalogueconstructor.run_signalprocessor(duration=60.)
t2 = time.perf_counter()
print('run_signalprocessor', t2-t1)
print(catalogueconstructor)
开发者ID:samuelgarcia,项目名称:tridesclous,代码行数:35,代码来源:example_kampff_dataset.py
示例17: search
def search(shelve_to_process, shelve_key):
# start timing of search functionality
start_time = time.perf_counter() * 1000
s = shelve.open(shelve_to_process)
indexedFiles = s[shelve_key]
keepGoing = True
while keepGoing:
query = input("query (type . to exit):")
if query == '.':
keepGoing = False
else:
terms = query.split(" ")
hasOr = False
hasAnd = "and" in terms
if not hasAnd:
hasOr = "or" in terms
if not hasOr:
hasAnd = True
#remove the delimiter terms now
while "and" in terms:
terms.remove("and")
while "or" in terms:
terms.remove("or")
goodFiles = set()
if hasOr:
for term in terms:
goodFiles = goodFiles.union(indexedFiles[term])
if hasAnd:
for term in terms:
if term in indexedFiles:
if len(goodFiles) == 0:
goodFiles.update(indexedFiles[term])
else:
goodFiles = goodFiles.intersection(indexedFiles[term])
# finish timing of search functionality
end_time = time.perf_counter() * 1000
# print the output
word = "OR" if hasOr else "AND"
print("Performing " + word +
" search for: {\'" + '\', \''.join(terms) + "\'}")
if len(goodFiles) == 0:
print("Sorry, no files found with that query!")
else:
for gf in goodFiles:
print("Found at", gf)
print("Execution time:", int(end_time - start_time))
开发者ID:bmarshallpf,项目名称:Python,代码行数:60,代码来源:searcher.py
示例18: wrapped_function
def wrapped_function(*args, **kwargs):
start = time.perf_counter()
result = function(*args, **kwargs)
duration_ms = int((time.perf_counter() - start) * 1000)
msg = {
"task_name": function.__module__ + "." + function.__name__,
"duration": duration_ms,
# Use 'task_args' instead of 'args' because 'args' conflicts with
# attribute of LogRecord
"task_args": args,
"task_kwargs": kwargs,
}
# Only log if slow value not provided
# or duration in milliseconds is greater than slow value in milliseconds.
if not slow or duration_ms > int(slow * 1000):
if function.__name__.startswith("task_"):
logger.info(
"TASK: function=%s duration=%sms",
function.__name__,
duration_ms,
extra=msg,
)
else:
logger.info(
"FUNC: function=%s duration=%sms",
function.__name__,
duration_ms,
extra=msg,
)
return result
开发者ID:DBeath,项目名称:flask-feedrsub,代码行数:30,代码来源:decorators.py
示例19: get_next
def get_next(self):
if self.item_for_next_get is None:
try:
tm, val = next(self.store_iterator)
except Exception as ex:
self.is_playing = False
return None
tm = int(tm)
# print(1, tm)
now = time.perf_counter() * self.TIME_FACTOR
if self.last_returned_at is None:
# First item, so return immediately
self.last_returned_at = (now, tm)
return self.get_parsed(val)
else:
if (now - self.last_returned_at[0]) >= (tm - self.last_returned_at[1]):
# Sufficient time has passed
self.last_returned_at = (now, tm)
return self.get_parsed(val)
else:
# Keep item to return at next get
self.item_for_next_get = (tm, val)
else:
tm, val = self.item_for_next_get
# print(2, tm)
now = time.perf_counter() * self.TIME_FACTOR
if (now - self.last_returned_at[0]) >= (tm - self.last_returned_at[1]):
# Sufficient time has passed
self.last_returned_at = (now, tm)
self.item_for_next_get = None
return self.get_parsed(val)
开发者ID:michaeldboyd,项目名称:indy-plenum,代码行数:31,代码来源:recorder.py
示例20: main
def main():
args = parser.parse_args()
if (args.logging_config):
import logging
import logging.config
logging.config.fileConfig(args.logging_config)
if (six.PY3):
t0 = time.perf_counter()
else:
t0 = time.clock()
data = cyacd.BootloaderData.read(args.image)
session = make_session(args, data.checksum_type)
bl = BootloaderHost(session, sys.stdout)
try:
bl.bootload(
data,
seek_permission(
args.downgrade,
"Device version %d is greater than local version %d. Flash anyway? (Y/N)"),
seek_permission(
args.newapp,
"Device app ID %d is different from local app ID %d. Flash anyway? (Y/N)"))
except (protocol.BootloaderError, BootloaderError) as e:
print ("Unhandled error: {}".format(e))
return 1
if (six.PY3):
t1 = time.perf_counter()
else:
t1 = time.clock()
print ("Total running time {0:02.2f}s".format(t1-t0))
return 0
开发者ID:cowo78,项目名称:cyflash,代码行数:33,代码来源:bootload.py
注:本文中的time.perf_counter函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论