• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python time.sleep函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中time.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: reconnect

    def reconnect(self, source, event, args, message):
        '''
        Handles disconnection by trying to reconnect the configured number of times
        before quitting
        '''
        # if we have been kicked, don"t attempt a reconnect
        # TODO : rejoin channels we were supposed to be in
        if event == nu.BOT_KILL:
            self.log.info("No reconnection attempt due to being killed")
            self.close()

        self.log.error("Lost connection to server:{0}".format(message))
        if self.times_reconnected >= self.max_reconnects:
            self.log.error("Unable to reconnect to server on third attempt")
            self.close()

        else:
            self.log.info(
                u"Sleeping before reconnection attempt, {0} seconds".format((self.times_reconnected + 1) * 60)
            )
            time.sleep((self.times_reconnected + 1) * 60)
            self.registered = False
            self.times_reconnected += 1
            self.log.info(u"Attempting reconnection, attempt no: {0}".format(self.times_reconnected))
            # set up events to connect and send USER and NICK commands
            self.irc.connect(self.network, self.port)
            self.irc.user(self.nick, self.realname)
            self.irc.nick(self.nick)
开发者ID:optimumtact,项目名称:simplepybot,代码行数:28,代码来源:commandbot.py


示例2: FetchOrCreateItem

	def FetchOrCreateItem(cls,**params):
		"""
		Either find an existing entry matching the config & results in the
		params dictionary, or create a new entry.
		"""
		key = cls._CalculateKey(**params)
		
		if key in cls.__cache:
			return cls.__cache[key];
		
		params1 = dict(params)
		params1.pop("servername",None)
		params1.pop("part_of_run", None)
		
		while True:
			created =False
			try:
				sid = transaction.savepoint()
				item,created = ProbeCommonResultEntry.objects.get_or_create(key=key, defaults=params1)
				transaction.savepoint_commit(sid)
			except DatabaseError:
				transaction.savepoint_rollback(sid)
				time.sleep(0.1)
				continue
			except IntegrityError:
				transaction.savepoint_rollback(sid)
				time.sleep(0.1)
				continue
			break;
		
		if not created:
			cls.__cache[key] = item
		return item
开发者ID:AbhinavBansal,项目名称:tlsprober,代码行数:33,代码来源:models.py


示例3: FetchOrCreate

	def FetchOrCreate(cls,s):
		"""
		Find an entry for this set of intolerances, check the cache first.
		Otherwise check the database, if necessary creating a new item.
		"""
		if s in cls.__cache:
			return cls.__cache[s];
		
		while True:
			created =False
			try:
				sid = transaction.savepoint()
				e, created = cls.objects.get_or_create(intolerant_for_extension = s)
				transaction.savepoint_commit(sid)
			except DatabaseError:
				transaction.savepoint_rollback(sid)
				time.sleep(0.1)
				continue
			except IntegrityError:
				transaction.savepoint_rollback(sid)
				time.sleep(0.1)
				continue
			break;

		if not created:
			cls.__cache[s]=e
		return e;
开发者ID:AbhinavBansal,项目名称:tlsprober,代码行数:27,代码来源:models.py


示例4: __init__

    def __init__(self, serial_number=None, label=None):
        """Constructor
        
        Args:
            serial_number (str): S/N of the device
            label (str): optional name of the device
        """

        super(Controller, self).__init__()
   
        dev = pylibftdi.Device(mode='b', device_id=serial_number)
        dev.baudrate = 115200
    
        def _checked_c(ret):
            if not ret == 0:
                raise Exception(dev.ftdi_fn.ftdi_get_error_string())
    
        _checked_c(dev.ftdi_fn.ftdi_set_line_property(8, # number of bits
                                                      1, # number of stop bits
                                                      0  # no parity
                                                      ))
        time.sleep(50.0/1000)
        dev.flush(pylibftdi.FLUSH_BOTH)
        time.sleep(50.0/1000)

        # skipping reset part since it looks like pylibftdi does it already

        # this is pulled from ftdi.h
        SIO_RTS_CTS_HS = (0x1 << 8)
        _checked_c(dev.ftdi_fn.ftdi_setflowctrl(SIO_RTS_CTS_HS))
        _checked_c(dev.ftdi_fn.ftdi_setrts(1))

        self.serial_number = serial_number
        self.label         = label
        self._device       = dev

        # some conservative limits
        self.max_velocity     = 0.3     # mm/s
        self.max_acceleration = 0.3     # mm/s/s

        # these define how encode count translates into position, velocity
        # and acceleration. e.g. 1 mm is equal to 1 * self.position_scale
        # these are set to None on purpose - you should never use this class
        # as is.
        self.position_scale     = None
        self.velocity_scale     = None
        self.acceleration_scale = None

        # defines the linear, i.e. distance, range of the controller
        # unit is in mm
        self.linear_range = (0,10)

        # whether or not sofware limit in position is applied
        self.soft_limits = True

        # the message queue are messages that are sent asynchronously. For
        # example if we performed a move, and are waiting for move completed
        # message, any other message received in the mean time are place in the
        # queue.
        self.message_queue = []
开发者ID:weinshec,项目名称:pyAPT,代码行数:60,代码来源:controller.py


示例5: SocketClient

def SocketClient(address):
    '''
    Return a connection object connected to the socket given by `address`
    '''
    family = address_type(address)
    s = socket.socket(getattr(socket, family))
    t = _init_timeout()

    while 1:
        try:
            s.connect(address)
        except socket.error as exc:
            if get_errno(exc) != errno.ECONNREFUSED or _check_timeout(t):
                debug('failed to connect to address %s', address)
                raise
            time.sleep(0.01)
        else:
            break
    else:
        raise

    fd = duplicate(s.fileno())
    conn = Connection(fd)
    s.close()
    return conn
开发者ID:whoshuu,项目名称:billiard,代码行数:25,代码来源:connection.py


示例6: get_users

def get_users():
    
    # each time ran, clean the user_list
    with open('user_list.txt', 'w'):
        pass
    
    count = 0

    # let's try and get a list of users some how.  
    r = praw.Reddit('User-Agent: user_list (by /u/XjCrazy09)')
    
    # check to see if user already exists.  Because if so they have already been scraped. 
    while count < 100:
        submissions = r.get_random_subreddit().get_top(limit=None)
        print "Running..."
        for i in submissions: 
            print i.author.name
            # run a tally
            count+=1 
            with open('user_list.txt', 'a') as output:
                output.write(i.author.name + "\n")
        print "Finished... \n"
        print "count: ", count
        time.sleep(5)
        
    usersList()
开发者ID:XjCrazy09,项目名称:RedditPasswords,代码行数:26,代码来源:main.py


示例7: task2

def task2(ident):
	global running
	for i in range(numtrips):
		if ident == 0:
			# give it a good chance to enter the next
			# barrier before the others are all out
			# of the current one
			delay = 0.001
		else:
			rmutex.acquire()
			delay = random.random() * numtasks
			rmutex.release()
		if verbose:
		    print 'task', ident, 'will run for', round(delay, 1), 'sec'
		time.sleep(delay)
		if verbose:
		    print 'task', ident, 'entering barrier', i
		bar.enter()
		if verbose:
		    print 'task', ident, 'leaving barrier', i
	mutex.acquire()
	running = running - 1
	if running == 0:
		done.release()
	mutex.release()
开发者ID:Claruarius,项目名称:stblinux-2.6.37,代码行数:25,代码来源:test_thread.py


示例8: loop

def loop():
	time.sleep(1)
	
	print 'Check if server has stuff, and if so send to Arduino.'
	# When the Server **sends** data:
	# Write to the Arduino's Serial port.
	# Currently, timeout = 1
	data = get_from_server()
	if len(data) > 0:
		print "Got data: %s" % data
		try:
			print 'sending to arduino light?'
			shelf_num = int(data)
			light_shelf(shelf_num)
			time.sleep(5)
			print 'done sleeping'
		except:
			print 'oops not a number!'

	# When the Arduino **sends** data:
	# POST to the Server.
	print 'listening to arduino'
	tags_data = get_tags_from_arduino()
	print tags_data
	# Parse tags
	if tags_data is not None:
		tag_scan_pairs = []
		print tags_data
		for tag in tags_data.split('|'):
			if len(tag) == 0:
				continue
			tmp = tag.split(':')
			print tmp
			call(['afplay','beep-2.wav'])
			tag_to_server(int(tmp[0]), int(tmp[1]))
开发者ID:bonniee,项目名称:bib-web-2,代码行数:35,代码来源:daemon.py


示例9: _listen

def _listen():
    global _stop
    try:
        context = zmq.Context()
        s_zmq = context.socket(zmq.SUB)
        s_zmq.connect(SUB_ADDR)
        s_zmq.setsockopt(zmq.LINGER, 0)

        time.sleep(0.1) # wait to avoid exception about poller
        poller = zmq.Poller()
        poller.register(s_zmq, zmq.POLLIN)
        try:
            intf_to_topic = {}
            while not _stop:
                intf_to_topic = _update_subscriptions(s_zmq, intf_to_topic)
                _read_from_zmq(s_zmq, poller)
        
        finally:
            poller.unregister(s_zmq)
            s_zmq.close()
            context.term()

    finally:
        global _thread
        _thread = None # signals that this thread has ended
开发者ID:yubars,项目名称:mbb,代码行数:25,代码来源:pathinfo.py


示例10: readData

def readData():
    global rdObj
    rdObj.hostTemp = get_temperature()
    for i in range(60):
        timebegin = time.time()
        get_per_sec_info()
        time.sleep(1-(time.time()-timebegin))
开发者ID:abhinavk,项目名称:Virtproj,代码行数:7,代码来源:bgservice.py


示例11: get

    def get(self):
        if not self.request.get("name"):
            self.set_flash("danger", "forbidden-access")
            self.redirect("/")
        else:
            user = users.get_current_user()
            if user:
                containers = Container.query(Container.user == user)
                cont = None
                if not containers.iter().has_next():
                    cont = Container(user = user)
                    cont.put()
                else:
                    cont = containers.iter().next()

                for ind, link in enumerate(cont.links):
                    if link.name == self.request.get("name"):
                        cont.links.pop(ind)
                        break

                cont.put()
                time.sleep(1)
                self.set_flash("success", "link-deleted")
                self.redirect("/")

            else:
                self.set_flash("danger", "not-logged-in")
                self.redirect("/")
开发者ID:oknalv,项目名称:linky,代码行数:28,代码来源:deletelink.py


示例12: _stop_instance

 def _stop_instance(self, instance, fast):
     if self.elastic_ip is not None:
         self.conn.disassociate_address(self.elastic_ip.public_ip)
     instance.update()
     if instance.state not in (SHUTTINGDOWN, TERMINATED):
         instance.stop()
         log.msg('%s %s terminating instance %s' %
                 (self.__class__.__name__, self.slavename, instance.id))
     duration = 0
     interval = self._poll_resolution
     if fast:
         goal = (SHUTTINGDOWN, TERMINATED)
         instance.update()
     else:
         goal = (TERMINATED,)
     while instance.state not in goal:
         time.sleep(interval)
         duration += interval
         if duration % 60 == 0:
             log.msg(
                 '%s %s has waited %d minutes for instance %s to end' %
                 (self.__class__.__name__, self.slavename, duration//60,
                  instance.id))
         instance.update()
     log.msg('%s %s instance %s %s '
             'after about %d minutes %d seconds' %
             (self.__class__.__name__, self.slavename,
              instance.id, goal, duration//60, duration%60))
开发者ID:aloisiojr,项目名称:buildbot,代码行数:28,代码来源:ec2buildslave.py


示例13: __check_ssh_agent

def __check_ssh_agent():
  """Check that an ssh-agent is present and has at least one valid looking
  identity loaded into it."""

  # There's no way to do this w/ putty/pageant and that's OK because
  # they don't hang up on prompting for passwords
  if sys.platform == 'win32':
    return True
  
  app = wingapi.gApplication
  if not app.fSingletons.fFileAttribMgr[_kCheckSSHAgent]:
    return True
  
  cmd = 'ssh-add'
  handler = app.AsyncExecuteCommandLine(cmd, os.getcwd(), '-l')
  end = time.time() + 1.0
  while not handler.Iterate() and time.time() < end:
    time.sleep(0.01)
  stdout, stderr, err, status = handler.Terminate()
  if err is None:
    out = stdout + stderr
    if len(out) > 0 and not out.find('no identities') >= 0 and not out.find('not open') >= 0:
      return True
    
  return False
开发者ID:aorfi,项目名称:2015-2016-Backup,代码行数:25,代码来源:cvs.py


示例14: add_engines

def add_engines(n=1, profile='iptest', total=False):
    """add a number of engines to a given profile.
    
    If total is True, then already running engines are counted, and only
    the additional engines necessary (if any) are started.
    """
    rc = Client(profile=profile)
    base = len(rc)
    
    if total:
        n = max(n - base, 0)
    
    eps = []
    for i in range(n):
        ep = TestProcessLauncher()
        ep.cmd_and_args = ipengine_cmd_argv + [
            '--profile=%s' % profile,
            '--InteractiveShell.colors=nocolor'
            ]
        ep.start()
        launchers.append(ep)
        eps.append(ep)
    tic = time.time()
    while len(rc) < base+n:
        if any([ ep.poll() is not None for ep in eps ]):
            raise RuntimeError("A test engine failed to start.")
        elif time.time()-tic > 15:
            raise RuntimeError("Timeout waiting for engines to connect.")
        time.sleep(.1)
    rc.close()
    return eps
开发者ID:marcjaxa,项目名称:EMAworkbench,代码行数:31,代码来源:test_ema_ipyparallel.py


示例15: next_page

def next_page(reg=re.compile("friends.*&startindex=")):
    link = browser.get_link(href=reg)
    if not link:
        return False
    browser.follow_link(link)
    time.sleep(0.2) # just to be sure
    return True
开发者ID:subv32,项目名称:facelurk,代码行数:7,代码来源:facelurk.py


示例16: run_example_spark_job

def run_example_spark_job(work_dir, timeout=25):
    """Runs a Spark job and checks the result."""
    print 'Starting Spark job'
    stdout = open(os.path.join(work_dir, 's_stdout.txt'), 'w')
    stderr = open(os.path.join(work_dir, 's_stderr.txt'), 'w')
    register_exit(lambda: stdout.close())
    register_exit(lambda: stderr.close())

    spark = subprocess.Popen([
        os.path.join(spark_path(), 'bin/spark-submit'),
        '--master', 'mesos://%s' % MESOS_MASTER_CIDR,
        os.path.join(spark_path(), 'examples/src/main/python/pi.py'), '5'],
        stdin=None,
        stdout=stdout,
        stderr=stderr)
    register_exit(lambda: spark.kill() if spark.poll() is None else '')

    while timeout:
        if spark.poll() is not None:
            break

        time.sleep(1)
        timeout -= 1

    if timeout <= 0:
        return False

    with open(os.path.join(work_dir, 's_stdout.txt'), 'r') as f:
        result = f.read()
        return 'Pi is roughly 3' in result
开发者ID:kaysoky,项目名称:mesos-integration,代码行数:30,代码来源:spark_utils.py


示例17: waitFinish

    def waitFinish(self):
        """
        Block while the job queue is not empty. Once empty, this method will begin closing down
        the thread pools and perform a join. Once the last thread exits, we return from this
        method.

        There are two thread pools in play; the Run pool which is executing all the testers,
        and the Status pool which is handling the printing of tester statuses. Because the
        Status pool will always have the last item needing to be 'printed', we close and join
        the Run pool first, and then, we close the Status pool.
        """
        try:
            while self.job_queue_count > 0:
                # One of our children died :( so exit uncleanly (do not join)
                if self.error_state:
                    self.killRemaining()
                    return
                sleep(0.5)

            self.run_pool.close()
            self.run_pool.join()
            self.status_pool.close()
            self.status_pool.join()

            # Notify derived schedulers we are exiting
            self.notifyFinishedSchedulers()

        except KeyboardInterrupt:
            # This method is called by the main thread, and therefor we must print the appropriate
            # keyboard interrupt response.
            self.killRemaining()
            print('\nExiting due to keyboard interrupt...')
开发者ID:zachmprince,项目名称:moose,代码行数:32,代码来源:Scheduler.py


示例18: _child_main_loop

	def _child_main_loop(self, queue):
		while True:
			url = "http://geekhost.net/OK"
			f = urllib.urlopen(url)
			data = f.read()
			#print data
			abcPattern = re.compile(r'OK')
			if abcPattern.match(data):
				queue.put('Already logined')
			else:
				queue.put('Need login')
				LOGIN_URL = 'https://auth-wlc.ntwk.dendai.ac.jp/login.html'
				#LOGIN_URL = 'http://geekhost.net/checkparams.php'
				pd = yaml.load(open('config.yaml').read().decode('utf-8'))
				pd['buttonClicked'] = '4'
				pd['redirect_url'] = 'http://google.com/'
				pd["err_flag"] = "0" 
				pd["err_msg"] = ""
				pd["info_flag"] = "0"
				pd["info_msg"] = ""
				params = urllib.urlencode(pd)
				print repr(params)
				up = urllib.urlopen(LOGIN_URL, params)
			# あとは寝てる
			time.sleep(yaml.load(open('config.yaml').read().decode('utf-8'))['threadtime'])
开发者ID:ikuradon,项目名称:tdulogin,代码行数:25,代码来源:multiprocess.py


示例19: test_appliance_replicate_database_disconnection_with_backlog

def test_appliance_replicate_database_disconnection_with_backlog(request, virtualcenter_provider,
                                                                 appliance):
    """Tests a database disconnection with backlog

    Metadata:
        test_flag: replication
    """
    appl1, appl2 = get_replication_appliances()
    replication_conf = appliance.server.zone.region.replication

    def finalize():
        appl1.destroy()
        appl2.destroy()
    request.addfinalizer(finalize)
    appl1.ipapp.browser_steal = True
    with appl1.ipapp:
        configure_db_replication(appl2.hostname)
        # Replication is up and running, now stop the DB on the replication parent
        virtualcenter_provider.create()
        appl2.db.stop_db_service()
        sleep(60)
        appl2.db.start_db_service()
        wait_for(replication_conf.get_replication_status, fail_condition=False, num_sec=360,
                 delay=10, fail_func=appl1.server.browser.refresh, message="get_replication_status")
        assert replication_conf.get_replication_status()
        wait_for_a_provider()

    appl2.ipapp.browser_steal = True
    with appl2.ipapp:
        wait_for_a_provider()
        assert virtualcenter_provider.exists
开发者ID:jkandasa,项目名称:integration_tests,代码行数:31,代码来源:test_appliance_replication.py


示例20: led_idle

 def led_idle(self):
     if self.software_only: return True
     self.sync(0)
     self.send_packet8(DEST_BROADCAST, PACKET_LED_IDLE, 0)
     sleep(.01)
     self.sync(1)
     return True
开发者ID:pmich,项目名称:bartendro,代码行数:7,代码来源:driver.py



注:本文中的time.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python time.sleep_ms函数代码示例发布时间:2022-05-27
下一篇:
Python time.should_receive函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap