本文整理汇总了Python中ptime.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_cancel
def test_cancel(self):
for i in xrange(5):
self.task_m.add(Task(.1, self.callback_f, i))
c_task = Task(.1, self.callback_f, 5)
self.task_m.add(c_task)
for i in xrange(6,10):
self.task_m.add(Task(.1, self.callback_f, i))
while True:
task = self.task_m.consume_task()
if task is None:
break
task.fire_callback()
logger.debug('%s' % self.callback_order)
assert self.callback_order == []
ok_(not c_task.cancelled)
c_task.cancel()
ok_(c_task.cancelled)
time.sleep(.1)
while True:
task = self.task_m.consume_task()
if task is None:
break
task.fire_callbacks()
logger.debug('%s' % self.callback_order)
assert self.callback_order == [0,1,2,3,4, 6,7,8,9]
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:26,代码来源:test_minitwisted.py
示例2: test_different_delay
def test_different_delay(self):
# NOTICE: this test might fail if your configuration
# (interpreter/processor) is too slow
task_delays = (1, 1, 1, .5, 1, 1, 2, 1, 1, 1,
1, 1.5, 1, 1, 1, 1, .3)
expected_list = ([],
['a', 16, 3, 'b'], #9 is cancelled
['a', 0, 1, 2, 4, 5, 7, 8, 10, 12, 13, 15, 'c', 'b'],
['a', 11, 'c', 'b'],
['a', 6, 'c', 'b'],
)
tasks = [Task(delay, self.callback_f, i) \
for i, delay in enumerate(task_delays)]
for task in tasks:
self.task_m.add(task)
for i, expected in enumerate(expected_list):
while True:
task = self.task_m.consume_task()
if task is None:
break
task.fire_callbacks()
logger.debug('#: %d, result: %s, expected: %s' % (i,
self.callback_order, expected))
assert self.callback_order == expected
self.callback_order = []
self.task_m.add(Task(0, self.callback_f, 'a'))
self.task_m.add(Task(.5, self.callback_f, 'b'))
self.task_m.add(Task(1, self.callback_f, 'c'))
time.sleep(.5)
tasks[9].cancel() # too late (already fired)
tasks[14].cancel() # should be cancelled
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:34,代码来源:test_minitwisted.py
示例3: test_capture
def test_capture(self):
self.reactor.start_capture()
ts1 = time.time()
time.sleep(tc.TASK_INTERVAL/2)
# out > DATAGRAM1 (main_loop)
self.reactor.run_one_step()
ts2 = time.time()
incoming_datagram = Datagram(DATA1, tc.SERVER_ADDR)
self.reactor.s.put_datagram_received(incoming_datagram)
time.sleep(tc.TASK_INTERVAL/2)
self.reactor.run_one_step()
# in < incoming_datagram (socket)
# out > DATAGRAM3 (on_datagram_received)
captured_msgs = self.reactor.stop_and_get_capture()
eq_(len(captured_msgs), 3)
for msg in captured_msgs:
print msg
assert ts1 < captured_msgs[0][0] < ts2
eq_(captured_msgs[0][1], tc.SERVER_ADDR)
eq_(captured_msgs[0][2], True) #outgoing
eq_(captured_msgs[0][3], DATA1)
assert captured_msgs[1][0] > ts2
eq_(captured_msgs[1][1], DATAGRAM1.addr)
eq_(captured_msgs[1][2], False) #incoming
eq_(captured_msgs[1][3], DATAGRAM1.data)
assert captured_msgs[2][0] > captured_msgs[1][0]
eq_(captured_msgs[2][1], DATAGRAM3.addr)
eq_(captured_msgs[2][2], True) #outgoing
eq_(captured_msgs[2][3], DATAGRAM3.data)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:30,代码来源:test_minitwisted.py
示例4: stop
def stop(self):
self._lock.acquire()
try:
self.stop_flag = True
finally:
self._lock.release()
time.sleep(self.task_interval)
开发者ID:salekseev,项目名称:freestream,代码行数:8,代码来源:minitwisted.py
示例5: test_get_expired_value
def test_get_expired_value(self):
self.t.put(KEYS[0], PEERS[0])
eq_(self.t.num_keys, 1)
eq_(self.t.num_peers, 1)
time.sleep(30)
eq_(self.t.get(KEYS[0]), [])
#eq_(self.t.num_keys, 0)
eq_(self.t.num_peers, 0)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:8,代码来源:test_tracker.py
示例6: test_network_callback
def test_network_callback(self):
self.client_r.sendto(DATA, tc.SERVER_ADDR)
time.sleep(tc.TASK_INTERVAL)
with self.lock:
first_datagram = self.datagrams_received.pop(0)
logger.debug('first_datagram: %s, %s' % (
first_datagram,
(DATA, tc.CLIENT_ADDR)))
assert first_datagram, (DATA, tc.CLIENT_ADDR)
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:9,代码来源:test_minitwisted.py
示例7: test_callback_list
def test_callback_list(self):
self.task_m.add(Task(tc.TASK_INTERVAL/2,
[self._callback1, self._callback2],
1, 2))
ok_(self.task_m.consume_task() is None)
eq_(self.callback_order, [])
time.sleep(tc.TASK_INTERVAL)
self.task_m.consume_task().fire_callbacks()
eq_(self.callback_order, [1,2])
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:9,代码来源:test_minitwisted.py
示例8: test_recvfrom
def test_recvfrom(self):
self.r.start()
r2 = ThreadedReactor()
r2.listen_udp(tc.SERVER_ADDR[1], lambda x,y:None)
logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
r2.sendto('z', tc.CLIENT_ADDR)
# self.r will call recvfrom (which raises socket.error)
time.sleep(tc.TASK_INTERVAL)
ok_(not self.callback_fired)
self.r.stop()
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:10,代码来源:test_minitwisted.py
示例9: test_call_main_loop
def test_call_main_loop(self):
time.sleep(tc.TASK_INTERVAL)
# main_loop is called right away
with self.lock:
# FIXME: this assert fails sometimes!!!!
eq_(self.main_loop_call_counter, 1)
time.sleep(0.1 + tc.TASK_INTERVAL)
with self.lock:
# FIXME: this crashes when recompiling
eq_(self.main_loop_call_counter, 2)
开发者ID:csasm,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py
示例10: recvfrom
def recvfrom(self, buffer_size):
datagram_received = None
for i in xrange(9):
time.sleep(tc.TASK_INTERVAL / 10)
with self.lock:
if self.datagrams_received:
datagram_received = self.datagrams_received.pop(0)
if datagram_received:
return (datagram_received.data, datagram_received.addr)
raise socket.timeout
开发者ID:csasm,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py
示例11: test_sendto
def test_sendto(self):
logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
assert not self.main_loop_send_called
# self.r.start()
while not self.r.running:
time.sleep(tc.TASK_INTERVAL)
while not self.main_loop_send_called:
time.sleep(tc.TASK_INTERVAL)
assert self.r.s.error_raised
assert self.r.running # reactor doesn't crashed
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py
示例12: stop
def stop(self):
"""Stop the thread. It cannot be resumed afterwards????"""
#with self._lock:
self._lock.acquire()
try:
self.stop_flag = True
finally:
self._lock.release()
# wait a little for the thread to end
time.sleep(self.task_interval)
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:10,代码来源:minitwisted.py
示例13: test_failed_join
def test_failed_join(self):
self.lock = threading.RLock()
self.reactor = ThreadedReactor(self._main_loop,
tc.CLIENT_PORT,
self._on_datagram_received,
task_interval=tc.TASK_INTERVAL)
self.reactor.s = _SocketMock(tc.TASK_INTERVAL)
# self.reactor.start()
self.reactor.call_asap(self._very_long_callback)
time.sleep(tc.TASK_INTERVAL*2)
assert_raises(Exception, self.reactor.stop)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:11,代码来源:test_minitwisted.py
示例14: test_on_datagram_received_callback
def test_on_datagram_received_callback(self):
# This is equivalent to sending a datagram to reactor
self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
datagram = Datagram(DATA1, tc.SERVER_ADDR)
print "--------------", datagram, datagram.data, datagram.addr
time.sleep(tc.TASK_INTERVAL * 1)
with self.lock:
datagram = self.datagrams_received.pop(0)
print "popped>>>>>>>>>>>>>>>", datagram
eq_(datagram.data, DATA1)
eq_(datagram.addr, tc.SERVER_ADDR)
开发者ID:csasm,项目名称:pymdht,代码行数:11,代码来源:test_minitwisted.py
示例15: stop
def stop(self):#, stop_callback):
"""Stop the thread. It cannot be resumed afterwards"""
self.running = False
self.join(self.task_interval*20)
if self.isAlive():
logger.info('minitwisted thread still alive. Wait a little more')
time.sleep(self.task_interval*20)
self.join(self.task_interval*20)
if self.isAlive():
raise Exception, 'Minitwisted thread is still alive!'
开发者ID:fege,项目名称:Thesis-Project,代码行数:11,代码来源:minitwisted.py
示例16: test_ping_with_timeout
def test_ping_with_timeout(self):
# Client creates a query
ping_msg = clients_msg_f.outgoing_ping_query(tc.SERVER_NODE)
q = ping_msg
# Client registers query
bencoded_msg = self.querier.register_queries([q])
# Client sends bencoded_msg
time.sleep(3)
# The server never responds and the timeout is triggered
timeout_queries = self.querier.get_timeout_queries()
eq_(len(timeout_queries[1]), 1)
assert timeout_queries[1][0] is ping_msg
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:12,代码来源:test_querier.py
示例17: test_call_main_loop
def test_call_main_loop(self):
eq_(self.main_loop_call_counter, 0)
self.reactor.run_one_step()
# main_loop is called right away
eq_(self.main_loop_call_counter, 1)
self.reactor.run_one_step()
# no events
eq_(self.main_loop_call_counter, 1)
time.sleep(self.main_loop_delay)
self.reactor.run_one_step()
# main_loop is called again after
eq_(self.main_loop_call_counter, 2)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:12,代码来源:test_minitwisted.py
示例18: test_call_asap
def test_call_asap(self):
with self.lock:
eq_(self.callback_values, [])
self.reactor.call_asap(self._callback, 0)
time.sleep(tc.TASK_INTERVAL * 2)
with self.lock:
eq_(self.callback_values, [0])
for i in xrange(1, 5):
self.reactor.call_asap(self._callback, i)
time.sleep(tc.TASK_INTERVAL * 3)
with self.lock:
eq_(self.callback_values, range(i + 1))
开发者ID:csasm,项目名称:pymdht,代码行数:13,代码来源:test_minitwisted.py
示例19: test_block_flood
def test_block_flood(self):
from floodbarrier import MAX_PACKETS_PER_PERIOD as FLOOD_LIMIT
for _ in xrange(FLOOD_LIMIT):
self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
time.sleep(tc.TASK_INTERVAL * 5)
with self.lock:
eq_(len(self.datagrams_received), FLOOD_LIMIT)
for _ in xrange(10):
self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
time.sleep(tc.TASK_INTERVAL * 3)
with self.lock:
eq_(len(self.datagrams_received), FLOOD_LIMIT)
logger.warning("TESTING LOGS ** IGNORE EXPECTED WARNING **")
开发者ID:csasm,项目名称:pymdht,代码行数:14,代码来源:test_minitwisted.py
示例20: test_error_received
def test_error_received(self):
# Client creates a query
msg = message.OutgoingPingQuery(tc.SERVER_NODE, tc.CLIENT_ID)
q = msg
# Client registers query
bencoded_msg = self.querier.register_queries([q])
# Client sends bencoded_msg
time.sleep(1)
# Server gets bencoded_msg and creates response
ping_r_msg_out = message.OutgoingErrorMsg(tc.CLIENT_NODE, message.GENERIC_E)
bencoded_r = ping_r_msg_out.stamp(msg.tid)
# The client receives the bencoded message
ping_r_in = message.IncomingMsg(Datagram(bencoded_r, tc.SERVER_ADDR))
related_query = self.querier.get_related_query(ping_r_in)
assert related_query is msg
开发者ID:csasm,项目名称:pymdht,代码行数:15,代码来源:test_querier.py
注:本文中的ptime.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论