本文整理汇总了Python中time.clock_gettime函数的典型用法代码示例。如果您正苦于以下问题:Python clock_gettime函数的具体用法?Python clock_gettime怎么用?Python clock_gettime使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了clock_gettime函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_pthread_getcpuclockid
def test_pthread_getcpuclockid(self):
clk_id = time.pthread_getcpuclockid(threading.get_ident())
self.assertTrue(type(clk_id) is int)
self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
t1 = time.clock_gettime(clk_id)
t2 = time.clock_gettime(clk_id)
self.assertLessEqual(t1, t2)
开发者ID:1st1,项目名称:cpython,代码行数:7,代码来源:test_time.py
示例2: run
def run(self):
"""
main loop
· run events
· run .update() functions
· update viewport
· and sleep if there's time left
"""
smallju_frame = False
while self.running:
t = time.clock_gettime(time.CLOCK_MONOTONIC)
for ev in pygame.event.get():
ev.state = self.view_proxy
handler.dispatch(ev, self, ev)
if self.view_proxy == "mandel" and self.smallju_show and \
self.smallju.updating and smallju_frame:
self.smallju.update()
self.update = True
elif self.view().updating:
self.view().update()
self.update = True
smallju_frame ^= self.smallju_show
if self.update:
self.draw()
self.update = False
# naively sleep until we expect the new frame
t = time.clock_gettime(time.CLOCK_MONOTONIC) - t
if t < config["interface"]["max_fps"]:
time.sleep(config["interface"]["max_fps"] - t)
开发者ID:mar77i,项目名称:mandelizer,代码行数:29,代码来源:mandelizer.py
示例3: infinite_loop
def infinite_loop(self):
self.debug.write("Successfully launched")
self.gps.wait()
tick = 1
while True:
if self.kill_now:
self.debug.write("Quitting")
self.log.close()
self.uart.close()
self.debug.close()
break
# monotonic clock will not change as system time changes
begin = time.clock_gettime(time.CLOCK_MONOTONIC)
# system time can be converted to date/time stamp
stamp = time.time()
t1 = threading.Thread(target=self.log_data, args=(stamp,))
t1.start()
if tick >= self.cam_period:
tick = 1
t2 = threading.Thread(target=self.take_picture, args=(begin,))
t2.start()
else:
tick = tick + 1
delta = time.clock_gettime(time.CLOCK_MONOTONIC) - begin
if self.serial_period > delta:
time.sleep(self.serial_period - delta)
开发者ID:bjester,项目名称:project-leda,代码行数:33,代码来源:leda.py
示例4: sendTexture
def sendTexture(scene):
for name, target in Splash._targets.items():
if target._texture is None or target._texWriterPath is None:
return
buffer = bytearray()
pixels = [pix for pix in target._texture.pixels]
pixels = numpy.array(pixels)
pixels = (pixels * 255.0).astype(numpy.ubyte)
buffer += pixels.tostring()
currentTime = time.clock_gettime(time.CLOCK_REALTIME) - target._startTime
if target._texWriter is None or target._texture.size[0] != target._texSize[0] or target._texture.size[1] != target._texSize[1]:
try:
from pyshmdata import Writer
os.remove(target._texWriterPath)
except:
pass
if target._texWriter is not None:
del target._texWriter
target._texSize[0] = target._texture.size[0]
target._texSize[1] = target._texture.size[1]
target._texWriter = Writer(path=target._texWriterPath, datatype="video/x-raw, format=(string)RGBA, width=(int){0}, height=(int){1}, framerate=(fraction)1/1".format(target._texSize[0], target._texSize[1]), framesize=len(buffer))
target._texWriter.push(buffer, floor(currentTime * 1e9))
# We send twice to pass through the Splash input buffer
time.sleep(0.1)
currentTime = time.clock_gettime(time.CLOCK_REALTIME) - target._startTime
target._texWriter.push(buffer, floor(currentTime * 1e9))
开发者ID:ameisso,项目名称:splash,代码行数:31,代码来源:operators.py
示例5: cpu_measurement_wrapper
def cpu_measurement_wrapper(*args, **kwargs):
start = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID)
try:
return f(*args, **kwargs)
finally:
end = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID)
tracking[1] += 1
tracking[2] += (end - start)
开发者ID:mutability,项目名称:mlat-server,代码行数:8,代码来源:profile.py
示例6: _generic_impl
def _generic_impl(self):
try:
yield from self.connect()
except AuthenticationError as e:
self._statistics.req_401_rsp += 1
self._logger.error("Authentication failed. Bad username/password.")
raise RestError(401, "bad username/password")
except ValueError as e:
self._statistics.req_401_rsp += 1
self._logger.error("Authentication failed. Missing BasicAuth.")
raise RestError(401, "must supply BasicAuth")
# ATTN: special checks for POST and PUT
if self.request.method == "DELETE" and not is_config(self.request.uri):
self._logger.error("Operational data DELETE not supported.")
raise RestError(405, "can't delete operational data")
# ATTN: make json it's own request handler
if self.request.method == "GET" and is_schema_api(self.request.uri):
return 200, "application/vnd.yang.data+json", get_json_schema(self._schema_root, self.request.uri)
# convert url and body into xml payload
body = self.request.body.decode("utf-8")
body_header = self.request.headers.get("Content-Type")
convert_payload = (body, body_header)
try:
if self.request.method in ("GET"):
converted_url = convert_get_request_to_xml(self._schema_root, self.request.uri)
else:
converted_url = self._confd_url_converter.convert(self.request.method, self.request.uri, convert_payload)
except Exception as e:
self._logger.error("invalid url requested %s, %s", self.request.uri, e)
raise RestError(404, "Resource target or resource node not found")
self._logger.debug("converted url: %s" % converted_url)
if self._configuration.log_timing:
sb_start_time = time.clock_gettime(time.CLOCK_REALTIME)
result, netconf_response = yield from self._netconf.execute(self._netconf_operation, self.request.uri, converted_url)
if self._configuration.log_timing:
sb_end_time = time.clock_gettime(time.CLOCK_REALTIME)
self._logger.debug("southbound transaction time: %s " % sb_end_time - sb_start_time)
if result != Result.OK:
code = map_error_to_http_code(result)
raise RestError(code, netconf_response)
# convert response to match the accept type
code, response = self._convert_response_and_get_status(self._netconf_operation, self._accept_type, netconf_response)
return code, self._accept_type.value, response
开发者ID:RIFTIO,项目名称:RIFT.ware,代码行数:53,代码来源:http_handler.py
示例7: main_loop
def main_loop(bar, inputs = None):
# TODO: remove eventinputs again?
#inputs += bar.widget.eventinputs()
if inputs == None:
inputs = global_inputs
global_update = True
def signal_quit(signal, frame):
quit_main_loop()
signal.signal(signal.SIGINT, signal_quit)
signal.signal(signal.SIGTERM, signal_quit)
# main loop
while not core.shutdown_requested() and bar.is_running():
now = time.clock_gettime(time.CLOCK_MONOTONIC)
if bar.widget.maybe_timeout(now):
global_update = True
data_ready = []
if global_update:
painter = bar.painter()
painter.widget(bar.widget)
data_ready = select.select(inputs,[],[], 0.00)[0]
if not data_ready:
#print("REDRAW: " + str(time.clock_gettime(time.CLOCK_MONOTONIC)))
painter.flush()
global_update = False
else:
pass
#print("more data already ready")
if not data_ready:
# wait for new data
next_timeout = now + 360 # wait for at most one hour until the next bar update
to = bar.widget.next_timeout()
if to != None:
next_timeout = min(next_timeout, to)
now = time.clock_gettime(time.CLOCK_MONOTONIC)
next_timeout -= now
next_timeout = max(next_timeout,0.1)
#print("next timeout = " + str(next_timeout))
data_ready = select.select(inputs,[],[], next_timeout)[0]
if core.shutdown_requested():
break
if not data_ready:
pass #print('timeout!')
else:
for x in data_ready:
x.process()
global_update = True
bar.proc.kill()
for i in inputs:
i.kill()
bar.proc.wait()
开发者ID:t-wissmann,项目名称:barpyrus,代码行数:52,代码来源:mainloop.py
示例8: test_pthread_getcpuclockid
def test_pthread_getcpuclockid(self):
clk_id = time.pthread_getcpuclockid(threading.get_ident())
self.assertTrue(type(clk_id) is int)
# when in 32-bit mode AIX only returns the predefined constant
if not platform.system() == "AIX":
self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
elif (sys.maxsize.bit_length() > 32):
self.assertNotEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
else:
self.assertEqual(clk_id, time.CLOCK_THREAD_CPUTIME_ID)
t1 = time.clock_gettime(clk_id)
t2 = time.clock_gettime(clk_id)
self.assertLessEqual(t1, t2)
开发者ID:Eyepea,项目名称:cpython,代码行数:13,代码来源:test_time.py
示例9: ping
def ping(target, timeout=5.0):
request = IcmpEcho(payload=os.urandom(32))
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_ICMP) as s:
s.connect(target.address)
s.settimeout(timeout)
start = time.clock_gettime(time.CLOCK_MONOTONIC)
s.send(request.to_bytes())
response = s.recv(65536)
end = time.clock_gettime(time.CLOCK_MONOTONIC)
response = IcmpEcho.from_bytes(response)
rtt_ms = (end - start) * 1000
print('Got response in {delay:.3f} ms'.format(delay=rtt_ms))
开发者ID:olavmrk,项目名称:python-ping,代码行数:14,代码来源:ping.py
示例10: RunCompilation
def RunCompilation(num):
global total_tasks
global total_time
global end
while True:
start = time.clock_gettime(time.CLOCK_MONOTONIC)
if os.system("./compile.sh /usr/bin/dist-clang/clang++ {} > /dev/null".format(num)) != 0:
break
total_tasks += 1
total_time += time.clock_gettime(time.CLOCK_MONOTONIC) - start
if len(sys.argv) > 2 and total_tasks >= int(sys.argv[2]):
end = time.clock_gettime(time.CLOCK_MONOTONIC)
break
开发者ID:RainM,项目名称:dist-clang,代码行数:15,代码来源:load.py
示例11: test_FDTimer_absolute
def test_FDTimer_absolute(self):
t = timers.FDTimer(timers.CLOCK_REALTIME)
start = time.time()
t.settime(time.clock_gettime(time.CLOCK_REALTIME)+5.0, 0.0, absolute=True)
print(t.read())
t.close()
self.assertAlmostEqual(time.time()-start, 5.0, places=2)
开发者ID:kdart,项目名称:pycopia3,代码行数:7,代码来源:test.py
示例12: poll_timed
def poll_timed(self, timeout : float, clock_id : int = None) -> bytes:
'''
Wait for a message to be broadcasted on the bus.
The caller should make a copy of the received message,
without freeing the original copy, and parse it in a
separate thread. When the new thread has started be
started, the caller of this function should then
either call `Bus.poll_timed` again or `Bus.poll_stop`.
@param timeout:float The time the function shall fail with `os.errno.EAGAIN`,
if it has not already completed
@param clock_id:int? The clock `timeout` is measured in, it must be a
predictable clock, if `None`, `timeout` is measured in
relative time instead of absolute time
@return :bytes The received message
NB! The received message will not be decoded from UTF-8
'''
from native_bus import bus_poll_timed_wrapped
if clock_id is None:
import time
clock_id = time.CLOCK_MONOTONIC_RAW
timeout += time.clock_gettime(clock_id)
(message, e) = bus_poll_timed_wrapped(self.bus, timeout, clock_id)
if message is None:
raise self.__oserror(e)
return message
开发者ID:maandree,项目名称:python-bus,代码行数:26,代码来源:bus.py
示例13: dump_cpu_profiles
def dump_cpu_profiles(tofile=sys.stderr):
elapsed_cpu = time.clock_gettime(time.CLOCK_THREAD_CPUTIME_ID) - baseline_cpu
elapsed_wall = time.monotonic() - baseline_wall
print('Elapsed: {wall:.1f} CPU: {cpu:.1f} ({percent:.0f}%)'.format(
wall=elapsed_wall,
cpu=elapsed_cpu,
percent=100.0 * elapsed_cpu / elapsed_wall), file=tofile)
print('{rank:4s} {name:60s} {count:6s} {persec:6s} {total:8s} {each:8s} {fraction:6s}'.format(
rank='#',
name='Function',
count='Calls',
persec='(/sec)',
total='Total(s)',
each='Each(us)',
fraction="Frac"), file=tofile)
rank = 1
for name, count, total in sorted(_cpu_tracking, key=operator.itemgetter(2), reverse=True):
if count == 0:
break
print('{rank:4d} {name:60s} {count:6d} {persec:6.1f} {total:8.3f} {each:8.0f} {fraction:6.1f}'.format(
rank=rank,
name=name,
count=count,
persec=1.0 * count / elapsed_wall,
total=total,
each=total * 1e6 / count,
fraction=100.0 * total / elapsed_cpu), file=tofile)
rank += 1
tofile.flush()
开发者ID:mutability,项目名称:mlat-server,代码行数:33,代码来源:profile.py
示例14: HandleEvent
def HandleEvent(self, event):
now = time.clock_gettime(time.CLOCK_MONOTONIC)
if not self._first_event:
self._first_event = event
self._first_event_timestamp = now
self._last_event = event
self._last_event_timestamp = now
self._count_by_type[event['type']] += 1
开发者ID:flamingcowtv,项目名称:adsb-tools,代码行数:8,代码来源:sourcestats.py
示例15: main
def main(argv):
"""
Entry point for ntpclient.py.
Arguments:
argv: command line arguments
"""
res = None
quiet = False
args = set(argv)
if {'-q', '--quiet'}.intersection(args):
quiet = True
if '-s' in argv:
server = argv[argv.index('-s')+1]
elif '--server' in argv:
server = argv[argv.index('--server')+1]
elif 'NTPSERVER' in os.environ:
server = os.environ['NTPSERVER']
else:
server = 'pool.ntp.org'
if {'-h', '--help'}.intersection(args):
print('Usage: ntpclient [-h|--help] [-q|--quiet] [-s|--server timeserver]')
print(f'Using time server {server}.')
sys.exit(0)
t1 = time.clock_gettime(time.CLOCK_REALTIME)
ntptime = get_ntp_time(server)
t4 = time.clock_gettime(time.CLOCK_REALTIME)
# It is not guaranteed that the NTP time is *exactly* in the middle of both
# local times. But it is a reasonable simplification.
roundtrip = round(t4 - t1, 4)
localtime = (t1 + t4) / 2
diff = localtime - ntptime
if os.geteuid() == 0:
time.clock_settime(time.CLOCK_REALTIME, ntptime)
res = 'Time set to NTP time.'
localtime = datetime.fromtimestamp(localtime)
ntptime = datetime.fromtimestamp(ntptime)
if not quiet:
print('Using server {}.'.format(server))
print('NTP call took approximately', roundtrip, 's.')
print('Local time value:', localtime.strftime('%a %b %d %H:%M:%S.%f %Y.'))
print('NTP time value:', ntptime.strftime('%a %b %d %H:%M:%S.%f %Y.'),
'±', roundtrip/2, 's.')
print('Local time - ntp time: {:.6f} s.'.format(diff))
if res:
print(res)
开发者ID:rsmith-nl,项目名称:scripts,代码行数:46,代码来源:ntpclient.py
示例16: test_record_event_sequence_sync_has_reasonable_relative_timestamp
def test_record_event_sequence_sync_has_reasonable_relative_timestamp(self):
monotonic_time_first = time.clock_gettime(time.CLOCK_MONOTONIC)
calls = self.call_start_stop_event_sync()
relative_time_first = calls[0][2][2][0][0]
self.interface_mock.ClearCalls()
calls = self.call_start_stop_event_sync()
relative_time_second = calls[0][2][2][1][0]
monotonic_time_second = time.clock_gettime(time.CLOCK_MONOTONIC)
relative_time_difference = relative_time_second - relative_time_first
self.assertLessEqual(0, relative_time_difference)
monotonic_time_difference = 1e9 * \
(monotonic_time_second - monotonic_time_first)
self.assertLessEqual(relative_time_difference,
monotonic_time_difference)
开发者ID:endlessm,项目名称:eos-metrics,代码行数:17,代码来源:test-daemon-integration.py
示例17: test_clock_settime
def test_clock_settime(self):
t = time.clock_gettime(time.CLOCK_REALTIME)
try:
time.clock_settime(time.CLOCK_REALTIME, t)
except PermissionError:
pass
if hasattr(time, "CLOCK_MONOTONIC"):
self.assertRaises(OSError, time.clock_settime, time.CLOCK_MONOTONIC, 0)
开发者ID:juleskt,项目名称:ek128_work,代码行数:9,代码来源:test_time.py
示例18: createRecordData
def createRecordData(timeSec:float, dataSz:int, node, sAddr, nAddr, frames):
return {
'node' : node,
'sz' : dataSz,
'saddr' : sAddr,
'naddr' : nAddr,
'frames': frames,
'time' : timeSec,
'start' : time.clock_gettime(time.CLOCK_MONOTONIC),
}
开发者ID:finaldie,项目名称:skull,代码行数:10,代码来源:skull-addr-remap.py
示例19: handler
def handler(conn):
clean_list = []
while True:
if conn.poll(10):
data = conn.recv()
sh_thread = Process(target=gs_shakalizing, args=data)
sh_thread.start()
sh_thread.join()
clean_list.append((data[1], time.clock_gettime(0)))
curr_time = time.clock_gettime(0)
new_list = []
for (filename, l_time) in clean_list:
delta = curr_time - l_time
if delta > TIME_DELTA:
gen_file = path.join(app.config['UPLOAD_FOLDER'], filename)
remove(gen_file)
else:
new_list.append((filename, l_time))
clean_list = new_list
开发者ID:FreeCX,项目名称:shakalator,代码行数:19,代码来源:main.py
示例20: log
def log(self,sstr,level='msg'):
now=time.strftime("%y/%m/%d %H:%M:%S")
nows=(str(time.clock_gettime(time.CLOCK_REALTIME)).split('.')[1]+'000')[0:3]
head=tail=''
if level=='error':
head='\033[31m'
tail='\033[0m'
output=now+nows+"\033[32m ["+str(level)+"]\033[0m "+":"+head+str(sstr)+tail
print(output,file=sys.stderr)
self.logf.write(output+"\n")
开发者ID:yyd01245,项目名称:overlay_video,代码行数:10,代码来源:test-vdm.py
注:本文中的time.clock_gettime函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论