本文整理汇总了Python中time.monotonic函数的典型用法代码示例。如果您正苦于以下问题:Python monotonic函数的具体用法?Python monotonic怎么用?Python monotonic使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了monotonic函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: incoming_telemetry
def incoming_telemetry(self, label, telemetry: TelemetryInfo):
self.telemetries[label] = telemetry
self.timestamps[label] = time.monotonic()
self.emit('telemetry', label, telemetry)
if (time.monotonic() - self._last_overall_telemetry_emit) > self.state['overall_telemetry_interval']:
self.emit('telemetry', None, self.get_telemetry(None))
self._last_overall_telemetry_emit = time.monotonic()
开发者ID:awacha,项目名称:cct,代码行数:7,代码来源:telemetry.py
示例2: test_eintr
def test_eintr(wfs, spair):
a, b = spair
interrupt_count = [0]
def handler(sig, frame):
assert sig == signal.SIGALRM
interrupt_count[0] += 1
old_handler = signal.signal(signal.SIGALRM, handler)
try:
assert not wfs(a, read=True, timeout=0)
start = monotonic()
try:
# Start delivering SIGALRM 10 times per second
signal.setitimer(signal.ITIMER_REAL, 0.1, 0.1)
# Sleep for 1 second (we hope!)
wfs(a, read=True, timeout=1)
finally:
# Stop delivering SIGALRM
signal.setitimer(signal.ITIMER_REAL, 0)
end = monotonic()
dur = end - start
assert 0.9 < dur < 3
finally:
signal.signal(signal.SIGALRM, old_handler)
assert interrupt_count[0] > 0
开发者ID:christophermca,项目名称:dotfiles,代码行数:27,代码来源:test_wait.py
示例3: go
def go():
router_closed = asyncio.Future()
dealer_closed = asyncio.Future()
router, _ = yield from loop.create_zmq_connection(
lambda: ZmqRouterProtocol(router_closed),
zmq.ROUTER,
bind='tcp://127.0.0.1:*')
addr = next(iter(router.bindings()))
dealer, _ = yield from loop.create_zmq_connection(
lambda: ZmqDealerProtocol(count, dealer_closed),
zmq.DEALER,
connect=addr)
msg = b'func', b'\0'*200
gc.collect()
t1 = time.monotonic()
dealer.write(msg)
yield from dealer_closed
t2 = time.monotonic()
gc.collect()
router.close()
yield from router_closed
return t2 - t1
开发者ID:waytai,项目名称:aiozmq,代码行数:25,代码来源:simple.py
示例4: test_call_later_1
def test_call_later_1(self):
calls = []
def cb(inc=10, stop=False):
calls.append(inc)
self.assertTrue(self.loop.is_running())
if stop:
self.loop.call_soon(self.loop.stop)
self.loop.call_later(0.05, cb)
# canceled right away
h = self.loop.call_later(0.05, cb, 100, True)
self.assertIn('.cb', repr(h))
h.cancel()
self.assertIn('cancelled', repr(h))
self.loop.call_later(0.05, cb, 1, True)
self.loop.call_later(1000, cb, 1000) # shouldn't be called
started = time.monotonic()
self.loop.run_forever()
finished = time.monotonic()
self.assertLess(finished - started, 0.1)
self.assertGreater(finished - started, 0.04)
self.assertEqual(calls, [10, 1])
self.assertFalse(self.loop.is_running())
开发者ID:complyue,项目名称:uvloop,代码行数:30,代码来源:test_base.py
示例5: debug_sql
def debug_sql(self, sql=None, params=None, use_last_executed_query=False, many=False):
start = time.monotonic()
try:
yield
finally:
stop = time.monotonic()
duration = stop - start
if use_last_executed_query:
sql = self.db.ops.last_executed_query(self.cursor, sql, params)
try:
times = len(params) if many else ''
except TypeError:
# params could be an iterator.
times = '?'
self.db.queries_log.append({
'sql': '%s times: %s' % (times, sql) if many else sql,
'time': '%.3f' % duration,
})
logger.debug(
'(%.3f) %s; args=%s',
duration,
sql,
params,
extra={'duration': duration, 'sql': sql, 'params': params},
)
开发者ID:MattBlack85,项目名称:django,代码行数:25,代码来源:utils.py
示例6: check_parallel_module_init
def check_parallel_module_init(self, mock_os):
if imp.lock_held():
# This triggers on, e.g., from test import autotest.
raise unittest.SkipTest("can't run when import lock is held")
done = threading.Event()
for N in (20, 50) * 3:
if verbose:
print("Trying", N, "threads ...", end=' ')
# Make sure that random and modulefinder get reimported freshly
for modname in ['random', 'modulefinder']:
try:
del sys.modules[modname]
except KeyError:
pass
errors = []
done_tasks = []
done.clear()
t0 = time.monotonic()
with start_threads(threading.Thread(target=task,
args=(N, done, done_tasks, errors,))
for i in range(N)):
pass
completed = done.wait(10 * 60)
dt = time.monotonic() - t0
if verbose:
print("%.1f ms" % (dt*1e3), flush=True, end=" ")
dbg_info = 'done: %s/%s' % (len(done_tasks), N)
self.assertFalse(errors, dbg_info)
self.assertTrue(completed, dbg_info)
if verbose:
print("OK.")
开发者ID:M31MOTH,项目名称:cpython,代码行数:32,代码来源:test_threaded_import.py
示例7: _get_spad_info
def _get_spad_info(self):
# Get reference SPAD count and type, returned as a 2-tuple of
# count and boolean is_aperture. Based on code from:
# https://github.com/pololu/vl53l0x-arduino/blob/master/VL53L0X.cpp
for pair in ((0x80, 0x01), (0xFF, 0x01), (0x00, 0x00), (0xFF, 0x06)):
self._write_u8(pair[0], pair[1])
self._write_u8(0x83, self._read_u8(0x83) | 0x04)
for pair in ((0xFF, 0x07), (0x81, 0x01), (0x80, 0x01),
(0x94, 0x6b), (0x83, 0x00)):
self._write_u8(pair[0], pair[1])
start = time.monotonic()
while self._read_u8(0x83) == 0x00:
if self.io_timeout_s > 0 and \
(time.monotonic() - start) >= self.io_timeout_s:
raise RuntimeError('Timeout waiting for VL53L0X!')
self._write_u8(0x83, 0x01)
tmp = self._read_u8(0x92)
count = tmp & 0x7F
is_aperture = ((tmp >> 7) & 0x01) == 1
for pair in ((0x81, 0x00), (0xFF, 0x06)):
self._write_u8(pair[0], pair[1])
self._write_u8(0x83, self._read_u8(0x83) & ~0x04)
for pair in ((0xFF, 0x01), (0x00, 0x01), (0xFF, 0x00), (0x80, 0x00)):
self._write_u8(pair[0], pair[1])
return (count, is_aperture)
开发者ID:eiselekd,项目名称:hw,代码行数:25,代码来源:adafruit_vl53l0x.py
示例8: run
def run(self):
while True:
item = None
try:
item = self.work_queue.get(timeout=2)
except(Empty):
pass
if item:
self.transmit_buffer.append(item)
self.last_time = time.monotonic()
delta = time.monotonic() - self.last_time
if self.work_queue.empty() and len(self.transmit_buffer) > 0:
if (delta > 30) or (len(self.transmit_buffer) > 1):
self._transmit.set()
if self._transmit.is_set() and self.work_queue.empty():
self.transmit()
if self.stopped():
while len(self.transmit_buffer) > 0:
self.transmit()
return
开发者ID:bradfier,项目名称:rockclock,代码行数:25,代码来源:transmitter.py
示例9: timer
def timer():
tic = monotonic()
toc = None
try:
yield lambda : toc - tic
finally:
toc = monotonic()
开发者ID:Lothiraldan,项目名称:pyzmq,代码行数:7,代码来源:collect.py
示例10: send_messages
def send_messages(self, message_batch):
if not self._init_es():
return
start_time = time.monotonic()
try:
actions = []
for msg in message_batch:
message = json.loads(msg.decode("utf8"))
timestamp = message.get("timestamp")
if "__REALTIME_TIMESTAMP" in message:
timestamp = datetime.datetime.utcfromtimestamp(message["__REALTIME_TIMESTAMP"])
else:
timestamp = datetime.datetime.utcnow()
message["timestamp"] = timestamp
index_name = "{}-{}".format(self.index_name, datetime.datetime.date(timestamp))
if index_name not in self.indices:
self.create_index_and_mappings(index_name)
actions.append({
"_index": index_name,
"_type": "journal_msg",
"_source": message,
})
if actions:
helpers.bulk(self.es, actions)
self.log.debug("Sent %d log events to ES, took: %.2fs",
len(message_batch), time.monotonic() - start_time)
except Exception as ex: # pylint: disable=broad-except
self.log.warning("Problem sending logs to ES: %r", ex)
return False
return True
开发者ID:aiven,项目名称:journalpump,代码行数:32,代码来源:journalpump.py
示例11: spin
def spin(self, timeout=None):
"""
Runs background processes until timeout expires.
Note that all processing is implemented in one thread.
:param timeout: The method will return once this amount of time expires.
If None, the method will never return.
If zero, the method will handle only those events that are ready, then return immediately.
"""
if timeout != 0:
deadline = (time.monotonic() + timeout) if timeout is not None else sys.float_info.max
def execute_once():
next_event_at = self._poll_scheduler_and_get_next_deadline()
if next_event_at is None:
next_event_at = sys.float_info.max
read_timeout = min(next_event_at, deadline) - time.monotonic()
read_timeout = max(read_timeout, 0)
read_timeout = min(read_timeout, 1)
frame = self._can_driver.receive(read_timeout)
if frame:
self._recv_frame(frame)
execute_once()
while time.monotonic() < deadline:
execute_once()
else:
while True:
frame = self._can_driver.receive(0)
if frame:
self._recv_frame(frame)
else:
break
self._poll_scheduler_and_get_next_deadline()
开发者ID:antoinealb,项目名称:pyuavcan,代码行数:35,代码来源:node.py
示例12: run_baton_query
def run_baton_query(self, baton_binary: BatonBinary, program_arguments: List[str]=None, input_data: Any=None) \
-> List[Dict]:
"""
Runs a baton query.
:param baton_binary: the baton binary to use
:param program_arguments: arguments to give to the baton binary
:param input_data: input data to the baton binary
:return: parsed serialization returned by baton
"""
if program_arguments is None:
program_arguments = []
baton_binary_location = os.path.join(self._baton_binaries_directory, baton_binary.value)
program_arguments = [baton_binary_location] + program_arguments
_logger.info("Running baton command: '%s' with data '%s'" % (program_arguments, input_data))
start_at = time.monotonic()
baton_out = self._run_command(program_arguments, input_data=input_data)
time_taken_to_run_query = time.monotonic() - start_at
_logger.debug("baton output (took %s seconds, wall time): %s" % (time_taken_to_run_query, baton_out))
if len(baton_out) == 0:
return []
if len(baton_out) > 0 and baton_out[0] != '[':
# If information about multiple files is returned, baton does not return valid JSON - it returns a line
# separated list of JSON, where each line corresponds to a different file
baton_out = "[%s]" % baton_out.replace('\n', ',')
baton_out_as_json = json.loads(baton_out)
BatonRunner._raise_any_errors_given_in_baton_out(baton_out_as_json)
return baton_out_as_json
开发者ID:wtsi-hgi,项目名称:python-baton-wrapper,代码行数:32,代码来源:_baton_runner.py
示例13: _select_next_server
def _select_next_server(self):
"""
Looks up in the server pool for an available server
and attempts to connect.
"""
srv = None
now = time.monotonic()
for s in self._server_pool:
if s.reconnects > self.options["max_reconnect_attempts"]:
continue
if s.did_connect and now > s.last_attempt + self.options["reconnect_time_wait"]:
yield from asyncio.sleep(self.options["reconnect_time_wait"], loop=self._loop)
try:
s.last_attempt = time.monotonic()
r, w = yield from asyncio.open_connection(
s.uri.hostname,
s.uri.port,
loop=self._loop,
limit=DEFAULT_BUFFER_SIZE)
srv = s
self._io_reader = r
self._io_writer = w
s.did_connect = True
break
except Exception as e:
self._err = e
if srv is None:
raise ErrNoServers
self._current_server = srv
开发者ID:habibutsu,项目名称:asyncio-nats,代码行数:30,代码来源:client.py
示例14: dispatch
def dispatch(self, wdelay=0):
""" xxx """
self.round += 1
start = time.monotonic()
with self._lock:
xqueue = self._queue[:]
while True:
if not xqueue:
break
event = xqueue[0]
now = time.monotonic()
if event.atime > (now + wdelay):
time.sleep(wdelay)
break
elif event.atime <= (now + wdelay) and event.atime > now:
# D("sleep {}".format(event.atime - now))
time.sleep(event.atime - now)
else: # event.atime <= now:
xqueue.pop(0)
more = event._call()
if more is not None and more > 0:
event.atime = time.monotonic() + event.delay
else:
self._queue.pop(0)
heapq.heapify(self._queue)
return time.monotonic() - start
开发者ID:hevi9,项目名称:hevi-lib,代码行数:26,代码来源:dispatchers.py
示例15: __next__
def __next__(self):
with self.lock:
t = time.monotonic()
if t < self.next_yield:
time.sleep(self.next_yield - t)
t = time.monotonic()
self.next_yield = t + self.interval
开发者ID:jpn--,项目名称:larch,代码行数:7,代码来源:rate_limiter.py
示例16: _handle_status
def _handle_status(self, msg):
"""
Reimplemented to refresh the namespacebrowser after kernel
restarts
"""
state = msg['content'].get('execution_state', '')
msg_type = msg['parent_header'].get('msg_type', '')
if state == 'starting':
# This is needed to show the time a kernel
# has been alive in each console.
self.ipyclient.t0 = time.monotonic()
self.ipyclient.timer.timeout.connect(self.ipyclient.show_time)
self.ipyclient.timer.start(1000)
# This handles restarts when the kernel dies
# unexpectedly
if not self._kernel_is_starting:
self._kernel_is_starting = True
elif state == 'idle' and msg_type == 'shutdown_request':
# This handles restarts asked by the user
if self.namespacebrowser is not None:
self.set_namespace_view_settings()
self.refresh_namespacebrowser()
self.ipyclient.t0 = time.monotonic()
else:
super(NamepaceBrowserWidget, self)._handle_status(msg)
开发者ID:burrbull,项目名称:spyder,代码行数:26,代码来源:namespacebrowser.py
示例17: should_terminate
def should_terminate(self):
if self.stopping_start is None:
self.stopping_start = monotonic()
return False
else:
dt = monotonic() - self.stopping_start
return dt if dt >= ACTOR_ACTION_TIMEOUT else False
开发者ID:quantmind,项目名称:pulsar,代码行数:7,代码来源:proxy.py
示例18: execute_cli_command
def execute_cli_command(self, command, timeout=None):
# While the command is being executed, incoming frames will be lost.
# SLCAN driver from PyUAVCAN goes at great lengths to properly separate CLI response lines
# from SLCAN messages in real time with minimal additional latency, so use it if you care about this.
timeout = self._resolve_timeout(timeout)
self.port.writeTimeout = timeout
command += '\r\n'
self._write(command)
deadline = time.monotonic() + (timeout if timeout is not None else 999999999)
self.port.timeout = 1
response = bytes()
while True:
if time.monotonic() > deadline:
raise TimeoutException('SLCAN CLI response timeout; command: %r' % command)
b = self.port.read()
if b == self.CLI_END_OF_TEXT:
break
if b:
response += b
# Removing SLCAN lines from response
return re.sub(r'.*\r[^\n]', '', response.decode()).strip().replace(command, '')
开发者ID:Zubax,项目名称:drwatson,代码行数:25,代码来源:can.py
示例19: _poll_for_io
def _poll_for_io(self):
if self._sleeping:
timeout = self._sleeping[0][0] - time.monotonic()
else:
timeout = None
events = self._selector.select(timeout)
for key, mask in events:
task = key.data
self._selector.unregister(key.fileobj)
self._reschedule_task(task)
# Process sleeping tasks (if any)
if self._sleeping:
current = time.monotonic()
while self._sleeping and self._sleeping[0][0] <= current:
tm, _, task, sleep_type = heapq.heappop(self._sleeping)
# When a task wakes, verify that the timeout value matches that stored
# on the task. If it differs, it means that the task completed its
# operation, was cancelled, or is no longer concerned with this
# sleep operation. In that case, we do nothing
if tm == task.timeout:
if sleep_type == 'sleep':
self._reschedule_task(task)
elif sleep_type == 'timeout':
self._cancel_task(task, exc=TimeoutError)
开发者ID:Hellowlol,项目名称:curio,代码行数:26,代码来源:kernel.py
示例20: zipTest
def zipTest(self, f, compression):
# Create the ZIP archive.
zipfp = zipfile.ZipFile(f, "w", compression)
# It will contain enough copies of self.data to reach about 6 GiB of
# raw data to store.
filecount = 6*1024**3 // len(self.data)
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
for num in range(filecount):
zipfp.writestr("testfn%d" % num, self.data)
# Print still working message since this test can be really slow
if next_time <= time.monotonic():
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
print((
' zipTest still writing %d of %d, be patient...' %
(num, filecount)), file=sys.__stdout__)
sys.__stdout__.flush()
zipfp.close()
# Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression)
for num in range(filecount):
self.assertEqual(zipfp.read("testfn%d" % num), self.data)
# Print still working message since this test can be really slow
if next_time <= time.monotonic():
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
print((
' zipTest still reading %d of %d, be patient...' %
(num, filecount)), file=sys.__stdout__)
sys.__stdout__.flush()
zipfp.close()
开发者ID:Victor-Savu,项目名称:cpython,代码行数:32,代码来源:test_zipfile64.py
注:本文中的time.monotonic函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论