• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python ptime.sleep函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ptime.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_cancel

 def test_cancel(self):
     for i in xrange(5):
         self.task_m.add(Task(.1, self.callback_f, i))
     c_task = Task(.1, self.callback_f, 5)
     self.task_m.add(c_task)
     for i in xrange(6,10):
         self.task_m.add(Task(.1, self.callback_f, i))
     while True:
         task = self.task_m.consume_task()
         if task is None:
             break
         task.fire_callback()
     logger.debug('%s' % self.callback_order)
     assert self.callback_order == []
     ok_(not c_task.cancelled)
     c_task.cancel()
     ok_(c_task.cancelled)
     
     time.sleep(.1)
     while True:
         task = self.task_m.consume_task()
         if task is None:
             break
         task.fire_callbacks()
     logger.debug('%s' % self.callback_order)
     assert self.callback_order == [0,1,2,3,4,  6,7,8,9]
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:26,代码来源:test_minitwisted.py


示例2: test_different_delay

    def test_different_delay(self):
#         NOTICE: this test might fail if your configuration
#         (interpreter/processor) is too slow
        
        task_delays = (1, 1, 1, .5, 1, 1, 2, 1, 1, 1,
                       1, 1.5, 1, 1, 1, 1, .3)
                       
        expected_list = ([],
                         ['a', 16, 3, 'b'], #9 is cancelled
                         ['a', 0, 1, 2, 4, 5, 7, 8, 10, 12, 13, 15, 'c', 'b'],
                         ['a', 11, 'c', 'b'],
                         ['a', 6, 'c', 'b'],
            )
        tasks = [Task(delay, self.callback_f, i) \
                 for i, delay in enumerate(task_delays)]
        for task in tasks:
            self.task_m.add(task)

        for i, expected in enumerate(expected_list):
            while True:
                task = self.task_m.consume_task()
                if task is None:
                    break
                task.fire_callbacks()
            logger.debug('#: %d, result: %s, expected: %s' % (i,
                                              self.callback_order, expected))
            assert self.callback_order == expected
            self.callback_order = []
            self.task_m.add(Task(0, self.callback_f, 'a'))
            self.task_m.add(Task(.5, self.callback_f, 'b'))
            self.task_m.add(Task(1, self.callback_f, 'c'))
            time.sleep(.5)
            tasks[9].cancel() # too late (already fired) 
            tasks[14].cancel() # should be cancelled
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:34,代码来源:test_minitwisted.py


示例3: test_capture

    def test_capture(self):
        self.reactor.start_capture()
        ts1 = time.time()
        time.sleep(tc.TASK_INTERVAL/2)
        # out > DATAGRAM1 (main_loop)
        self.reactor.run_one_step()
        ts2 = time.time()
        incoming_datagram = Datagram(DATA1, tc.SERVER_ADDR)
        self.reactor.s.put_datagram_received(incoming_datagram)
        time.sleep(tc.TASK_INTERVAL/2)
        self.reactor.run_one_step() 
        # in < incoming_datagram (socket)
        # out > DATAGRAM3 (on_datagram_received)
        captured_msgs = self.reactor.stop_and_get_capture()

        eq_(len(captured_msgs), 3)
        for msg in  captured_msgs:
            print msg
        assert ts1 < captured_msgs[0][0] < ts2
        eq_(captured_msgs[0][1], tc.SERVER_ADDR)
        eq_(captured_msgs[0][2], True) #outgoing
        eq_(captured_msgs[0][3], DATA1)
        assert captured_msgs[1][0] > ts2
        eq_(captured_msgs[1][1], DATAGRAM1.addr)
        eq_(captured_msgs[1][2], False) #incoming
        eq_(captured_msgs[1][3], DATAGRAM1.data)
        assert captured_msgs[2][0] > captured_msgs[1][0]
        eq_(captured_msgs[2][1], DATAGRAM3.addr)
        eq_(captured_msgs[2][2], True) #outgoing
        eq_(captured_msgs[2][3], DATAGRAM3.data)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:30,代码来源:test_minitwisted.py


示例4: stop

    def stop(self):
        self._lock.acquire()
        try:
            self.stop_flag = True
        finally:
            self._lock.release()

        time.sleep(self.task_interval)
开发者ID:salekseev,项目名称:freestream,代码行数:8,代码来源:minitwisted.py


示例5: test_get_expired_value

 def test_get_expired_value(self):
     self.t.put(KEYS[0], PEERS[0])
     eq_(self.t.num_keys, 1)
     eq_(self.t.num_peers, 1)
     time.sleep(30)
     eq_(self.t.get(KEYS[0]), [])
     #eq_(self.t.num_keys, 0)
     eq_(self.t.num_peers, 0)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:8,代码来源:test_tracker.py


示例6: test_network_callback

 def test_network_callback(self):
     self.client_r.sendto(DATA, tc.SERVER_ADDR)
     time.sleep(tc.TASK_INTERVAL)
     with self.lock:
         first_datagram = self.datagrams_received.pop(0)
         logger.debug('first_datagram: %s, %s' % (
                 first_datagram,
                 (DATA, tc.CLIENT_ADDR)))
         assert first_datagram, (DATA, tc.CLIENT_ADDR)
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:9,代码来源:test_minitwisted.py


示例7: test_callback_list

 def test_callback_list(self):
     self.task_m.add(Task(tc.TASK_INTERVAL/2,
                           [self._callback1, self._callback2],
                           1, 2))
     ok_(self.task_m.consume_task() is None)
     eq_(self.callback_order, [])
     time.sleep(tc.TASK_INTERVAL)
     self.task_m.consume_task().fire_callbacks()
     eq_(self.callback_order, [1,2])
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:9,代码来源:test_minitwisted.py


示例8: test_recvfrom

 def test_recvfrom(self):
     self.r.start()
     r2 = ThreadedReactor()
     r2.listen_udp(tc.SERVER_ADDR[1], lambda x,y:None)
     logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
     r2.sendto('z', tc.CLIENT_ADDR)
     # self.r will call recvfrom (which raises socket.error)
     time.sleep(tc.TASK_INTERVAL)
     ok_(not self.callback_fired)
     self.r.stop()
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:10,代码来源:test_minitwisted.py


示例9: test_call_main_loop

 def test_call_main_loop(self):
     time.sleep(tc.TASK_INTERVAL)
     # main_loop is called right away
     with self.lock:
         # FIXME: this assert fails sometimes!!!!
         eq_(self.main_loop_call_counter, 1)
     time.sleep(0.1 + tc.TASK_INTERVAL)
     with self.lock:
         # FIXME: this crashes when recompiling
         eq_(self.main_loop_call_counter, 2)
开发者ID:csasm,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py


示例10: recvfrom

 def recvfrom(self, buffer_size):
     datagram_received = None
     for i in xrange(9):
         time.sleep(tc.TASK_INTERVAL / 10)
         with self.lock:
             if self.datagrams_received:
                 datagram_received = self.datagrams_received.pop(0)
         if datagram_received:
             return (datagram_received.data, datagram_received.addr)
     raise socket.timeout
开发者ID:csasm,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py


示例11: test_sendto

    def test_sendto(self):
        logger.critical('TESTING: IGNORE CRITICAL MESSAGE')
        assert not self.main_loop_send_called
#        self.r.start()
        while not self.r.running:
            time.sleep(tc.TASK_INTERVAL)
        while not self.main_loop_send_called:
            time.sleep(tc.TASK_INTERVAL)
        assert self.r.s.error_raised
        assert self.r.running # reactor doesn't crashed
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:10,代码来源:test_minitwisted.py


示例12: stop

 def stop(self):
     """Stop the thread. It cannot be resumed afterwards????"""
     #with self._lock:
     self._lock.acquire()
     try:
         self.stop_flag = True
     finally:
         self._lock.release()
     # wait a little for the thread to end
     time.sleep(self.task_interval)
开发者ID:csasm,项目名称:Look-MLKademlia,代码行数:10,代码来源:minitwisted.py


示例13: test_failed_join

    def test_failed_join(self):
        self.lock = threading.RLock()
        self.reactor = ThreadedReactor(self._main_loop,
                                       tc.CLIENT_PORT,
                                       self._on_datagram_received,
                                       task_interval=tc.TASK_INTERVAL)
        self.reactor.s = _SocketMock(tc.TASK_INTERVAL)
#        self.reactor.start()
        self.reactor.call_asap(self._very_long_callback)
        time.sleep(tc.TASK_INTERVAL*2)
        assert_raises(Exception, self.reactor.stop)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:11,代码来源:test_minitwisted.py


示例14: test_on_datagram_received_callback

 def test_on_datagram_received_callback(self):
     # This is equivalent to sending a datagram to reactor
     self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
     datagram = Datagram(DATA1, tc.SERVER_ADDR)
     print "--------------", datagram, datagram.data, datagram.addr
     time.sleep(tc.TASK_INTERVAL * 1)
     with self.lock:
         datagram = self.datagrams_received.pop(0)
         print "popped>>>>>>>>>>>>>>>", datagram
         eq_(datagram.data, DATA1)
         eq_(datagram.addr, tc.SERVER_ADDR)
开发者ID:csasm,项目名称:pymdht,代码行数:11,代码来源:test_minitwisted.py


示例15: stop

    def stop(self):#, stop_callback):
        """Stop the thread. It cannot be resumed afterwards"""

        self.running = False
        self.join(self.task_interval*20)
        if self.isAlive():
            logger.info('minitwisted thread still alive. Wait a little more')
            time.sleep(self.task_interval*20)
            self.join(self.task_interval*20)
        if self.isAlive():
            raise Exception, 'Minitwisted thread is still alive!'
开发者ID:fege,项目名称:Thesis-Project,代码行数:11,代码来源:minitwisted.py


示例16: test_ping_with_timeout

 def test_ping_with_timeout(self):
     # Client creates a query
     ping_msg = clients_msg_f.outgoing_ping_query(tc.SERVER_NODE)
     q = ping_msg
     # Client registers query
     bencoded_msg = self.querier.register_queries([q])
     # Client sends bencoded_msg
     time.sleep(3)
     # The server never responds and the timeout is triggered
     timeout_queries = self.querier.get_timeout_queries()
     eq_(len(timeout_queries[1]), 1)
     assert timeout_queries[1][0] is ping_msg
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:12,代码来源:test_querier.py


示例17: test_call_main_loop

 def test_call_main_loop(self):
     eq_(self.main_loop_call_counter, 0)
     self.reactor.run_one_step()
     # main_loop is called right away
     eq_(self.main_loop_call_counter, 1)
     self.reactor.run_one_step()
     # no events
     eq_(self.main_loop_call_counter, 1)
     time.sleep(self.main_loop_delay)
     self.reactor.run_one_step()
     # main_loop is called again after 
     eq_(self.main_loop_call_counter, 2)
开发者ID:GlobalSquare,项目名称:pymdht,代码行数:12,代码来源:test_minitwisted.py


示例18: test_call_asap

    def test_call_asap(self):
        with self.lock:
            eq_(self.callback_values, [])
        self.reactor.call_asap(self._callback, 0)
        time.sleep(tc.TASK_INTERVAL * 2)
        with self.lock:
            eq_(self.callback_values, [0])

        for i in xrange(1, 5):
            self.reactor.call_asap(self._callback, i)
            time.sleep(tc.TASK_INTERVAL * 3)
            with self.lock:
                eq_(self.callback_values, range(i + 1))
开发者ID:csasm,项目名称:pymdht,代码行数:13,代码来源:test_minitwisted.py


示例19: test_block_flood

    def test_block_flood(self):
        from floodbarrier import MAX_PACKETS_PER_PERIOD as FLOOD_LIMIT

        for _ in xrange(FLOOD_LIMIT):
            self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
        time.sleep(tc.TASK_INTERVAL * 5)
        with self.lock:
            eq_(len(self.datagrams_received), FLOOD_LIMIT)
        for _ in xrange(10):
            self.s.put_datagram_received(Datagram(DATA1, tc.SERVER_ADDR))
            time.sleep(tc.TASK_INTERVAL * 3)
            with self.lock:
                eq_(len(self.datagrams_received), FLOOD_LIMIT)
                logger.warning("TESTING LOGS ** IGNORE EXPECTED WARNING **")
开发者ID:csasm,项目名称:pymdht,代码行数:14,代码来源:test_minitwisted.py


示例20: test_error_received

 def test_error_received(self):
     # Client creates a query
     msg = message.OutgoingPingQuery(tc.SERVER_NODE, tc.CLIENT_ID)
     q = msg
     # Client registers query
     bencoded_msg = self.querier.register_queries([q])
     # Client sends bencoded_msg
     time.sleep(1)
     # Server gets bencoded_msg and creates response
     ping_r_msg_out = message.OutgoingErrorMsg(tc.CLIENT_NODE, message.GENERIC_E)
     bencoded_r = ping_r_msg_out.stamp(msg.tid)
     # The client receives the bencoded message
     ping_r_in = message.IncomingMsg(Datagram(bencoded_r, tc.SERVER_ADDR))
     related_query = self.querier.get_related_query(ping_r_in)
     assert related_query is msg
开发者ID:csasm,项目名称:pymdht,代码行数:15,代码来源:test_querier.py



注:本文中的ptime.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ptime.time函数代码示例发布时间:2022-05-25
下一篇:
Python timeseries.Timeseries类代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap