本文整理汇总了Python中time.sleep函数的典型用法代码示例。如果您正苦于以下问题:Python sleep函数的具体用法?Python sleep怎么用?Python sleep使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了sleep函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: reconnect
def reconnect(self, source, event, args, message):
'''
Handles disconnection by trying to reconnect the configured number of times
before quitting
'''
# if we have been kicked, don"t attempt a reconnect
# TODO : rejoin channels we were supposed to be in
if event == nu.BOT_KILL:
self.log.info("No reconnection attempt due to being killed")
self.close()
self.log.error("Lost connection to server:{0}".format(message))
if self.times_reconnected >= self.max_reconnects:
self.log.error("Unable to reconnect to server on third attempt")
self.close()
else:
self.log.info(
u"Sleeping before reconnection attempt, {0} seconds".format((self.times_reconnected + 1) * 60)
)
time.sleep((self.times_reconnected + 1) * 60)
self.registered = False
self.times_reconnected += 1
self.log.info(u"Attempting reconnection, attempt no: {0}".format(self.times_reconnected))
# set up events to connect and send USER and NICK commands
self.irc.connect(self.network, self.port)
self.irc.user(self.nick, self.realname)
self.irc.nick(self.nick)
开发者ID:optimumtact,项目名称:simplepybot,代码行数:28,代码来源:commandbot.py
示例2: FetchOrCreateItem
def FetchOrCreateItem(cls,**params):
"""
Either find an existing entry matching the config & results in the
params dictionary, or create a new entry.
"""
key = cls._CalculateKey(**params)
if key in cls.__cache:
return cls.__cache[key];
params1 = dict(params)
params1.pop("servername",None)
params1.pop("part_of_run", None)
while True:
created =False
try:
sid = transaction.savepoint()
item,created = ProbeCommonResultEntry.objects.get_or_create(key=key, defaults=params1)
transaction.savepoint_commit(sid)
except DatabaseError:
transaction.savepoint_rollback(sid)
time.sleep(0.1)
continue
except IntegrityError:
transaction.savepoint_rollback(sid)
time.sleep(0.1)
continue
break;
if not created:
cls.__cache[key] = item
return item
开发者ID:AbhinavBansal,项目名称:tlsprober,代码行数:33,代码来源:models.py
示例3: FetchOrCreate
def FetchOrCreate(cls,s):
"""
Find an entry for this set of intolerances, check the cache first.
Otherwise check the database, if necessary creating a new item.
"""
if s in cls.__cache:
return cls.__cache[s];
while True:
created =False
try:
sid = transaction.savepoint()
e, created = cls.objects.get_or_create(intolerant_for_extension = s)
transaction.savepoint_commit(sid)
except DatabaseError:
transaction.savepoint_rollback(sid)
time.sleep(0.1)
continue
except IntegrityError:
transaction.savepoint_rollback(sid)
time.sleep(0.1)
continue
break;
if not created:
cls.__cache[s]=e
return e;
开发者ID:AbhinavBansal,项目名称:tlsprober,代码行数:27,代码来源:models.py
示例4: __init__
def __init__(self, serial_number=None, label=None):
"""Constructor
Args:
serial_number (str): S/N of the device
label (str): optional name of the device
"""
super(Controller, self).__init__()
dev = pylibftdi.Device(mode='b', device_id=serial_number)
dev.baudrate = 115200
def _checked_c(ret):
if not ret == 0:
raise Exception(dev.ftdi_fn.ftdi_get_error_string())
_checked_c(dev.ftdi_fn.ftdi_set_line_property(8, # number of bits
1, # number of stop bits
0 # no parity
))
time.sleep(50.0/1000)
dev.flush(pylibftdi.FLUSH_BOTH)
time.sleep(50.0/1000)
# skipping reset part since it looks like pylibftdi does it already
# this is pulled from ftdi.h
SIO_RTS_CTS_HS = (0x1 << 8)
_checked_c(dev.ftdi_fn.ftdi_setflowctrl(SIO_RTS_CTS_HS))
_checked_c(dev.ftdi_fn.ftdi_setrts(1))
self.serial_number = serial_number
self.label = label
self._device = dev
# some conservative limits
self.max_velocity = 0.3 # mm/s
self.max_acceleration = 0.3 # mm/s/s
# these define how encode count translates into position, velocity
# and acceleration. e.g. 1 mm is equal to 1 * self.position_scale
# these are set to None on purpose - you should never use this class
# as is.
self.position_scale = None
self.velocity_scale = None
self.acceleration_scale = None
# defines the linear, i.e. distance, range of the controller
# unit is in mm
self.linear_range = (0,10)
# whether or not sofware limit in position is applied
self.soft_limits = True
# the message queue are messages that are sent asynchronously. For
# example if we performed a move, and are waiting for move completed
# message, any other message received in the mean time are place in the
# queue.
self.message_queue = []
开发者ID:weinshec,项目名称:pyAPT,代码行数:60,代码来源:controller.py
示例5: SocketClient
def SocketClient(address):
'''
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
s = socket.socket(getattr(socket, family))
t = _init_timeout()
while 1:
try:
s.connect(address)
except socket.error as exc:
if get_errno(exc) != errno.ECONNREFUSED or _check_timeout(t):
debug('failed to connect to address %s', address)
raise
time.sleep(0.01)
else:
break
else:
raise
fd = duplicate(s.fileno())
conn = Connection(fd)
s.close()
return conn
开发者ID:whoshuu,项目名称:billiard,代码行数:25,代码来源:connection.py
示例6: get_users
def get_users():
# each time ran, clean the user_list
with open('user_list.txt', 'w'):
pass
count = 0
# let's try and get a list of users some how.
r = praw.Reddit('User-Agent: user_list (by /u/XjCrazy09)')
# check to see if user already exists. Because if so they have already been scraped.
while count < 100:
submissions = r.get_random_subreddit().get_top(limit=None)
print "Running..."
for i in submissions:
print i.author.name
# run a tally
count+=1
with open('user_list.txt', 'a') as output:
output.write(i.author.name + "\n")
print "Finished... \n"
print "count: ", count
time.sleep(5)
usersList()
开发者ID:XjCrazy09,项目名称:RedditPasswords,代码行数:26,代码来源:main.py
示例7: task2
def task2(ident):
global running
for i in range(numtrips):
if ident == 0:
# give it a good chance to enter the next
# barrier before the others are all out
# of the current one
delay = 0.001
else:
rmutex.acquire()
delay = random.random() * numtasks
rmutex.release()
if verbose:
print 'task', ident, 'will run for', round(delay, 1), 'sec'
time.sleep(delay)
if verbose:
print 'task', ident, 'entering barrier', i
bar.enter()
if verbose:
print 'task', ident, 'leaving barrier', i
mutex.acquire()
running = running - 1
if running == 0:
done.release()
mutex.release()
开发者ID:Claruarius,项目名称:stblinux-2.6.37,代码行数:25,代码来源:test_thread.py
示例8: loop
def loop():
time.sleep(1)
print 'Check if server has stuff, and if so send to Arduino.'
# When the Server **sends** data:
# Write to the Arduino's Serial port.
# Currently, timeout = 1
data = get_from_server()
if len(data) > 0:
print "Got data: %s" % data
try:
print 'sending to arduino light?'
shelf_num = int(data)
light_shelf(shelf_num)
time.sleep(5)
print 'done sleeping'
except:
print 'oops not a number!'
# When the Arduino **sends** data:
# POST to the Server.
print 'listening to arduino'
tags_data = get_tags_from_arduino()
print tags_data
# Parse tags
if tags_data is not None:
tag_scan_pairs = []
print tags_data
for tag in tags_data.split('|'):
if len(tag) == 0:
continue
tmp = tag.split(':')
print tmp
call(['afplay','beep-2.wav'])
tag_to_server(int(tmp[0]), int(tmp[1]))
开发者ID:bonniee,项目名称:bib-web-2,代码行数:35,代码来源:daemon.py
示例9: _listen
def _listen():
global _stop
try:
context = zmq.Context()
s_zmq = context.socket(zmq.SUB)
s_zmq.connect(SUB_ADDR)
s_zmq.setsockopt(zmq.LINGER, 0)
time.sleep(0.1) # wait to avoid exception about poller
poller = zmq.Poller()
poller.register(s_zmq, zmq.POLLIN)
try:
intf_to_topic = {}
while not _stop:
intf_to_topic = _update_subscriptions(s_zmq, intf_to_topic)
_read_from_zmq(s_zmq, poller)
finally:
poller.unregister(s_zmq)
s_zmq.close()
context.term()
finally:
global _thread
_thread = None # signals that this thread has ended
开发者ID:yubars,项目名称:mbb,代码行数:25,代码来源:pathinfo.py
示例10: readData
def readData():
global rdObj
rdObj.hostTemp = get_temperature()
for i in range(60):
timebegin = time.time()
get_per_sec_info()
time.sleep(1-(time.time()-timebegin))
开发者ID:abhinavk,项目名称:Virtproj,代码行数:7,代码来源:bgservice.py
示例11: get
def get(self):
if not self.request.get("name"):
self.set_flash("danger", "forbidden-access")
self.redirect("/")
else:
user = users.get_current_user()
if user:
containers = Container.query(Container.user == user)
cont = None
if not containers.iter().has_next():
cont = Container(user = user)
cont.put()
else:
cont = containers.iter().next()
for ind, link in enumerate(cont.links):
if link.name == self.request.get("name"):
cont.links.pop(ind)
break
cont.put()
time.sleep(1)
self.set_flash("success", "link-deleted")
self.redirect("/")
else:
self.set_flash("danger", "not-logged-in")
self.redirect("/")
开发者ID:oknalv,项目名称:linky,代码行数:28,代码来源:deletelink.py
示例12: _stop_instance
def _stop_instance(self, instance, fast):
if self.elastic_ip is not None:
self.conn.disassociate_address(self.elastic_ip.public_ip)
instance.update()
if instance.state not in (SHUTTINGDOWN, TERMINATED):
instance.stop()
log.msg('%s %s terminating instance %s' %
(self.__class__.__name__, self.slavename, instance.id))
duration = 0
interval = self._poll_resolution
if fast:
goal = (SHUTTINGDOWN, TERMINATED)
instance.update()
else:
goal = (TERMINATED,)
while instance.state not in goal:
time.sleep(interval)
duration += interval
if duration % 60 == 0:
log.msg(
'%s %s has waited %d minutes for instance %s to end' %
(self.__class__.__name__, self.slavename, duration//60,
instance.id))
instance.update()
log.msg('%s %s instance %s %s '
'after about %d minutes %d seconds' %
(self.__class__.__name__, self.slavename,
instance.id, goal, duration//60, duration%60))
开发者ID:aloisiojr,项目名称:buildbot,代码行数:28,代码来源:ec2buildslave.py
示例13: __check_ssh_agent
def __check_ssh_agent():
"""Check that an ssh-agent is present and has at least one valid looking
identity loaded into it."""
# There's no way to do this w/ putty/pageant and that's OK because
# they don't hang up on prompting for passwords
if sys.platform == 'win32':
return True
app = wingapi.gApplication
if not app.fSingletons.fFileAttribMgr[_kCheckSSHAgent]:
return True
cmd = 'ssh-add'
handler = app.AsyncExecuteCommandLine(cmd, os.getcwd(), '-l')
end = time.time() + 1.0
while not handler.Iterate() and time.time() < end:
time.sleep(0.01)
stdout, stderr, err, status = handler.Terminate()
if err is None:
out = stdout + stderr
if len(out) > 0 and not out.find('no identities') >= 0 and not out.find('not open') >= 0:
return True
return False
开发者ID:aorfi,项目名称:2015-2016-Backup,代码行数:25,代码来源:cvs.py
示例14: add_engines
def add_engines(n=1, profile='iptest', total=False):
"""add a number of engines to a given profile.
If total is True, then already running engines are counted, and only
the additional engines necessary (if any) are started.
"""
rc = Client(profile=profile)
base = len(rc)
if total:
n = max(n - base, 0)
eps = []
for i in range(n):
ep = TestProcessLauncher()
ep.cmd_and_args = ipengine_cmd_argv + [
'--profile=%s' % profile,
'--InteractiveShell.colors=nocolor'
]
ep.start()
launchers.append(ep)
eps.append(ep)
tic = time.time()
while len(rc) < base+n:
if any([ ep.poll() is not None for ep in eps ]):
raise RuntimeError("A test engine failed to start.")
elif time.time()-tic > 15:
raise RuntimeError("Timeout waiting for engines to connect.")
time.sleep(.1)
rc.close()
return eps
开发者ID:marcjaxa,项目名称:EMAworkbench,代码行数:31,代码来源:test_ema_ipyparallel.py
示例15: next_page
def next_page(reg=re.compile("friends.*&startindex=")):
link = browser.get_link(href=reg)
if not link:
return False
browser.follow_link(link)
time.sleep(0.2) # just to be sure
return True
开发者ID:subv32,项目名称:facelurk,代码行数:7,代码来源:facelurk.py
示例16: run_example_spark_job
def run_example_spark_job(work_dir, timeout=25):
"""Runs a Spark job and checks the result."""
print 'Starting Spark job'
stdout = open(os.path.join(work_dir, 's_stdout.txt'), 'w')
stderr = open(os.path.join(work_dir, 's_stderr.txt'), 'w')
register_exit(lambda: stdout.close())
register_exit(lambda: stderr.close())
spark = subprocess.Popen([
os.path.join(spark_path(), 'bin/spark-submit'),
'--master', 'mesos://%s' % MESOS_MASTER_CIDR,
os.path.join(spark_path(), 'examples/src/main/python/pi.py'), '5'],
stdin=None,
stdout=stdout,
stderr=stderr)
register_exit(lambda: spark.kill() if spark.poll() is None else '')
while timeout:
if spark.poll() is not None:
break
time.sleep(1)
timeout -= 1
if timeout <= 0:
return False
with open(os.path.join(work_dir, 's_stdout.txt'), 'r') as f:
result = f.read()
return 'Pi is roughly 3' in result
开发者ID:kaysoky,项目名称:mesos-integration,代码行数:30,代码来源:spark_utils.py
示例17: waitFinish
def waitFinish(self):
"""
Block while the job queue is not empty. Once empty, this method will begin closing down
the thread pools and perform a join. Once the last thread exits, we return from this
method.
There are two thread pools in play; the Run pool which is executing all the testers,
and the Status pool which is handling the printing of tester statuses. Because the
Status pool will always have the last item needing to be 'printed', we close and join
the Run pool first, and then, we close the Status pool.
"""
try:
while self.job_queue_count > 0:
# One of our children died :( so exit uncleanly (do not join)
if self.error_state:
self.killRemaining()
return
sleep(0.5)
self.run_pool.close()
self.run_pool.join()
self.status_pool.close()
self.status_pool.join()
# Notify derived schedulers we are exiting
self.notifyFinishedSchedulers()
except KeyboardInterrupt:
# This method is called by the main thread, and therefor we must print the appropriate
# keyboard interrupt response.
self.killRemaining()
print('\nExiting due to keyboard interrupt...')
开发者ID:zachmprince,项目名称:moose,代码行数:32,代码来源:Scheduler.py
示例18: _child_main_loop
def _child_main_loop(self, queue):
while True:
url = "http://geekhost.net/OK"
f = urllib.urlopen(url)
data = f.read()
#print data
abcPattern = re.compile(r'OK')
if abcPattern.match(data):
queue.put('Already logined')
else:
queue.put('Need login')
LOGIN_URL = 'https://auth-wlc.ntwk.dendai.ac.jp/login.html'
#LOGIN_URL = 'http://geekhost.net/checkparams.php'
pd = yaml.load(open('config.yaml').read().decode('utf-8'))
pd['buttonClicked'] = '4'
pd['redirect_url'] = 'http://google.com/'
pd["err_flag"] = "0"
pd["err_msg"] = ""
pd["info_flag"] = "0"
pd["info_msg"] = ""
params = urllib.urlencode(pd)
print repr(params)
up = urllib.urlopen(LOGIN_URL, params)
# あとは寝てる
time.sleep(yaml.load(open('config.yaml').read().decode('utf-8'))['threadtime'])
开发者ID:ikuradon,项目名称:tdulogin,代码行数:25,代码来源:multiprocess.py
示例19: test_appliance_replicate_database_disconnection_with_backlog
def test_appliance_replicate_database_disconnection_with_backlog(request, virtualcenter_provider,
appliance):
"""Tests a database disconnection with backlog
Metadata:
test_flag: replication
"""
appl1, appl2 = get_replication_appliances()
replication_conf = appliance.server.zone.region.replication
def finalize():
appl1.destroy()
appl2.destroy()
request.addfinalizer(finalize)
appl1.ipapp.browser_steal = True
with appl1.ipapp:
configure_db_replication(appl2.hostname)
# Replication is up and running, now stop the DB on the replication parent
virtualcenter_provider.create()
appl2.db.stop_db_service()
sleep(60)
appl2.db.start_db_service()
wait_for(replication_conf.get_replication_status, fail_condition=False, num_sec=360,
delay=10, fail_func=appl1.server.browser.refresh, message="get_replication_status")
assert replication_conf.get_replication_status()
wait_for_a_provider()
appl2.ipapp.browser_steal = True
with appl2.ipapp:
wait_for_a_provider()
assert virtualcenter_provider.exists
开发者ID:jkandasa,项目名称:integration_tests,代码行数:31,代码来源:test_appliance_replication.py
示例20: led_idle
def led_idle(self):
if self.software_only: return True
self.sync(0)
self.send_packet8(DEST_BROADCAST, PACKET_LED_IDLE, 0)
sleep(.01)
self.sync(1)
return True
开发者ID:pmich,项目名称:bartendro,代码行数:7,代码来源:driver.py
注:本文中的time.sleep函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论