本文整理汇总了Python中time.wait函数的典型用法代码示例。如果您正苦于以下问题:Python wait函数的具体用法?Python wait怎么用?Python wait使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了wait函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: search
def search(query, num=10, start=0, sleep=True, recent=None,domain=".com"):
if sleep:
wait(1)
url = generate_url(query, str(num), str(start), recent, domain)
soup = BeautifulSoup(requests.get(url).text, "html.parser")
results = Google.scrape_search_result(soup)
related_queries = Google.scrape_related(soup)
raw_total_results = soup.find('div', attrs = {'class' : 'sd'}).string
total_results = 0
if raw_total_results is not None:
for i in raw_total_results:
try:
temp = int(i)
total_results = total_results * 10 + temp
except:
continue
temp = {'results' : results,
'url' : url,
'expected_num' : num,
'received_num' : len(results),
'start' : start,
'search_engine': 'google' + domain,
'related_queries' : related_queries,
'total_results' : total_results,
}
return temp
开发者ID:zatmar,项目名称:py-web-search,代码行数:28,代码来源:google.py
示例2: __init__
def __init__(self, link_uri):
"""
Initializes the control class and executes all needed functions.
"""
# Connect Crazyflie
self.Crazyflie = Crazyflie()
self.Connected = False
self.Connect(link_uri)
while not self.Connected:
wait(0.1)
pass
# Start Program
self.t0 = 0#getTime()
self.Running = True
# Initialize
self.SetInitialState()
self.InitializeReferenceCS()
if Plotting:
self.InitializePlotting()
if GUI:
self.InitializeGUI()
if Animation:
self.InitializeAnimation()
# Run Main Loops
Thread(target=self.MainLoop).start()
if GUI:
Thread(target=self.GUILoop).start()
if Animation:
Thread(target=self.AnimationLoop).start()
开发者ID:OpenGelo,项目名称:drone-tracking,代码行数:34,代码来源:CrazyflieControl.py
示例3: initLoop
def initLoop():
#TODO IRC
while doLoop == True:
for plugin in pluginList:
getattr(pluginList[plugin], "tick")();
time.wait(1/30);
开发者ID:HurbDurb,项目名称:Payay-bot,代码行数:7,代码来源:Bot.py
示例4: search
def search(query, num=10, start=0, sleep=True, recent=None):
if sleep:
wait(1)
url = generate_url(query, str(num), str(start), recent)
soup = BeautifulSoup(requests.get(url).text)
results = Google.scrape_search_result(soup)
related_queries = Google.scrape_related(soup)
raw_total_results = soup.find('div', attrs = {'class' : 'sd'}).string
total_results = 0
for i in raw_total_results:
try:
temp = int(i)
total_results = total_results * 10 + temp
except:
continue
temp = {'results' : results,
'url' : url,
'num' : num,
'start' : start,
'search_engine': 'google',
'related_queries' : related_queries,
'total_results' : total_results,
}
return temp
开发者ID:Franky12,项目名称:py-web-search,代码行数:26,代码来源:google.py
示例5: store
def store(sqlDb, alnScore):
"""store the results in sqlite"""
# pdb.set_trace()
if alnScore:
c = sqlite3.connect(sqlDb)
for aln in alnScore:
row = alnScore[aln]
row += (len(alnScore),)
print row
print "\n\n"
try:
c.execute(
"insert into blast (seq, match_cnt, e_value, perfect, types, positions, matches) values (?,?,?,?,?,?,?)",
row,
)
except:
# rollback the pending transaction
c.rollback()
# wait for the dbase lock
time.wait(0.2)
c.execute(
"insert into blast (seq, match_cnt, e_value, perfect, types, positions) values (?,?,?,?,?,?)", row
)
c.commit()
c.close()
else:
pass
开发者ID:BadDNA,项目名称:seqcap,代码行数:27,代码来源:summaryBlast.py
示例6: set_motor
def set_motor(io_type, port, settings):
try:
if io_type == 'large motor':
i = ev3.LargeMotor(port)
elif io_type == 'medium motor':
i = ev3.MediumMotor(port)
power = int(settings['power'])
if settings['motor_mode'] == 'run forever':
i.run_forever(duty_cycle_sp=power)
time.wait(1) # this will cause server 500 errors when you call run forever because time.wait doesn't exist
# BUT this must be here because it allows run forever to work as it sets off to find something it can't; otherwise the motor just twitches
# I believe this creates a new thread which is why the EV3 can still process new input; it just keeps the motor running
elif settings['motor_mode'] == 'run timed':
time_picked = settings['time']
i.run_timed(time_sp=time_picked, duty_cycle_sp=power) # might also need the time.wait fix; didn't test
elif settings['motor_mode'] == 'stop':
stop_type = settings['stop_type']
i.stop()
elif settings['motor_mode'] == 'reset': # should reset motor encoders, aka I believe changes the position to 0, stops motors
i.reset()
elif settings['motor_mode'] == 'switch':
i.duty_cycle_sp = i.duty_cycle_sp * -1
return "successful set"
except ValueError:
return "Not found"
开发者ID:CEEO-DrEsLab,项目名称:IoT,代码行数:25,代码来源:EV3JSON.py
示例7: search
def search(query, num=10, start=0, sleep=True, recent=None):
results = []
_start = start # Remembers the initial value of start for later use
_url = None
related_queries = None
while len(results) < num:
if sleep: # Prevents loading too many pages too soon
wait(1)
url = generate_url(query, str(start), recent)
if _url is None:
_url = url # Remembers the first url that is generated
soup = BeautifulSoup(requests.get(url).text)
new_results = Bing.scrape_search_result(soup)
results += new_results
start += len(new_results)
if related_queries is None:
related_queries = scrape_related(soup)
results = results[:num]
temp = {'results' : results,
'url' : _url,
'num' : num,
'start' : _start,
'search_engine' : 'bing',
'related_queries' : related_queries,
}
return temp
开发者ID:Franky12,项目名称:py-web-search,代码行数:30,代码来源:bing.py
示例8: searchAndRepostBot
def searchAndRepostBot():
r = praw.Reddit(user_agent = AGENT)
print("Logging in to Reddit...")
try:
r.login(USERNAME, PASSWORD)
except:
print("LOGIN FAILED")
sys.exit()
for SOURCE in SOURCES:
subreddit = r.get_subreddit(SOURCE)
repository = r.get_subreddit(REPOSITORY)
print("Visiting Subreddit...(" + SOURCE + ")")
submissions = subreddit.get_hot(limit=25)
repositorySubmissions = subreddit.get_hot(limit=25)
print("Parsing posts...")
for submission in submissions:
try:
sbody = submission.selftext.lower()
stitle = submission.title.lower()
if any(key.lower() in sbody for key in KEYWORDS or key.lower() in stitle for key in KEYWORDS):
print("Result found: ")
print(submission.url)
print("Posting...")
r.submit(repository, "[X-Post " + SOURCE + "] " + submission.title, submission.url)
time.wait(2)
except AttributeError:
pass
print("DONE")
开发者ID:EB-Github,项目名称:Reddit_Crosspost_bot,代码行数:34,代码来源:KeywordRepostBot.py
示例9: single_acquisition_example
def single_acquisition_example(name, n_events, trigger, trigger_channel):
""" Acquire a set of triggerred single acquisitions for two channels."""
tek_scope = scopes.Tektronix2000(scope_connections.VisaUSB())
# First setup the scope, lock the front panel
tek_scope.lock()
tek_scope.set_active_channel(1)
tek_scope.set_active_channel(2)
tek_scope.set_single_acquisition() # Single signal acquisition mode
tek_scope.set_edge_trigger(trigger, trigger_channel, True) # Falling edge trigger
tek_scope.set_data_mode(49500, 50500)
tek_scope.lock() # Re acquires the preamble
# Now create a HDF5 file and save the meta information
file_name = name + "_" + str(datetime.date.today())
results = utils.HDF5File(file_name, 2)
results.add_meta_data("trigger", trigger)
results.add_meta_data("trigger_channel", trigger_channel)
results.add_meta_data("ch1_timeform", tek_scope.get_timeform(1))
results.add_meta_data("ch2_timeform", tek_scope.get_timeform(2))
results.add_meta_dict(tek_scope.get_preamble(1), "ch1_")
results.add_meta_dict(tek_scope.get_preamble(2), "ch2_")
last_save_time = time.time()
print "Starting data taking at time", time.strftime("%Y-%m-%d %H:%M:%S")
for event in range(0, n_events):
tek_scope.acquire()
try:
results.add_data(tek_scope.get_waveform(1), 1)
results.add_data(tek_scope.get_waveform(2), 2)
except Exception, e:
print "Scope died, acquisition lost."
print e
except visa_exceptions.VisaIOError, e:
print "Serious death"
time.wait(1)
开发者ID:mjmottram,项目名称:AcquireTek,代码行数:34,代码来源:single_acquisition_example.py
示例10: search
def search(self):
urls = []
for page in range(0, self.pages):
url = UrlGenerator(self.query, self.num, (self.start + (10*page)), self.recent, self.site).web_url
urls.append(url)
for url in urls:
if self.sleep:
wait(1)
html = requests.get(url, headers=self.headers).text
soup = BeautifulSoup(html, 'html.parser')
self.big_soup.body.append(soup.body)
results = self.scrape_search_result(self.big_soup)
related_queries = self.scrape_related_queries(self.big_soup)
raw_total_results = self.big_soup.find('div', attrs={'class': 'sd'}).string
total_results = int(raw_total_results.replace('About ', '').replace(' results', '').replace(',', ''))
data = dict()
data['source'] = 'google'
data['expected_num'] = self.num * self.pages
data['received_num'] = len(results)
data['first_page_url'] = urls[0]
data['related_queries'] = related_queries
data['total_results'] = total_results
data['results'] = results
return data
开发者ID:mdonnalley,项目名称:pyGoogleSearch,代码行数:29,代码来源:Google.py
示例11: search
def search(self):
urls = []
for page in range(0, self.pages):
url = UrlGenerator(self.query, self.num, (self.start + (10 * page)), self.recent, self.site).web_url
urls.append(url)
for url in urls:
if self.sleep:
wait(1)
html = requests.get(url, headers=self.headers).text
soup = BeautifulSoup(html, "html.parser")
self.big_soup.body.append(soup.body)
results = self.scrape_search_result(self.big_soup)
related_queries = self.scrape_related_queries(self.big_soup)
raw_total_results = self.big_soup.find("div", attrs={"class": "sd"}).string
total_results = int(raw_total_results.replace("About ", "").replace(" results", "").replace(",", ""))
data = collections.OrderedDict()
data["source"] = "google"
data["expected_num"] = self.num * self.pages
data["received_num"] = len(results)
data["first_page_url"] = urls[0]
data["related_queries"] = related_queries
data["total_results"] = total_results
data["results"] = results
return data
开发者ID:mrmaher,项目名称:pyGoogleSearch,代码行数:29,代码来源:Google.py
示例12: search_news
def search_news(query, num=10, start=0,sleep=True, recent=None, country_code=None, proxies=None):
if sleep:
wait(1)
url = generate_news_url(query, str(num), str(start), country_code, recent)
soup = BeautifulSoup(requests.get(url,proxies).text, "html.parser")
results = Google.scrape_news_result(soup)
raw_total_results = soup.find('div', attrs = {'class' : 'sd'}).string
total_results = 0
for i in raw_total_results:
try:
temp = int(i)
total_results = total_results * 10 + temp
except:
continue
temp = {'results' : results,
'url' : url,
'num' : num,
'start' : start,
'search_engine' : 'google',
'total_results' : total_results,
'country_code': country_code,
}
return temp
开发者ID:BubaVV,项目名称:py-web-search,代码行数:25,代码来源:google.py
示例13: article
def article(self, pageid=None, title=None):
"""
Returns a specific article from Wikipedia,
given its pageid or its title.
Downloads it if necessary
"""
if pageid is None and title is None:
raise Exception('Pageid and title can\'t be None at the same time')
if pageid is None:
d = self.db.articles.find_one({'title': title})
if d is not None:
return d # found it
else:
d = self.db.articles.find_one({'_id': pageid})
if d is not None:
return d # found it
try:
if not(pageid is None):
page = wikipedia.page(pageid=pageid)
else:
page = wikipedia.page(title=title)
except (
wikipedia.exceptions.DisambiguationError,
wikipedia.exceptions.PageError,
wikipedia.exceptions.WikipediaException,
requests.exceptions.RequestException,
ValueError # error decoding JSON response
):
return
try:
time.sleep(0.5)
except:
time.wait(0.5)
# Even if we didn't find pageid or title, it still could be in the DB
# since the title could have changed
try:
d = {
'_id': int(page.pageid),
'title': page.title,
'content': page.content
}
except KeyboardInterrupt: # filter KeyboardInterrupt from here
raise
except Exception:
return # can't add this entry
self.db.articles.update_one(
{'_id': d['_id']},
{'$set': d},
upsert=True
)
return d
开发者ID:aparafita,项目名称:news-similarity,代码行数:60,代码来源:wikipedia.py
示例14: test_ap_ht40_scan
def test_ap_ht40_scan(dev, apdev):
"""HT40 co-ex scan"""
params = { "ssid": "test-ht40",
"channel": "5",
"ht_capab": "[HT40-]"}
hapd = hostapd.add_ap(apdev[0]['ifname'], params)
state = hapd.get_status_field("state")
if state != "HT_SCAN":
time.wait(0.1)
state = hapd.get_status_field("state")
if state != "HT_SCAN":
raise Exception("Unexpected interface state - expected HT_SCAN")
ev = hapd.wait_event(["AP-ENABLED"], timeout=10)
if not ev:
raise Exception("AP setup timed out")
state = hapd.get_status_field("state")
if state != "ENABLED":
raise Exception("Unexpected interface state - expected ENABLED")
freq = hapd.get_status_field("freq")
if freq != "2432":
raise Exception("Unexpected frequency")
pri = hapd.get_status_field("channel")
if pri != "5":
raise Exception("Unexpected primary channel")
sec = hapd.get_status_field("secondary_channel")
if sec != "-1":
raise Exception("Unexpected secondary channel")
dev[0].connect("test-ht40", key_mgmt="NONE", scan_freq=freq)
开发者ID:ArcherHood,项目名称:ti-hostap,代码行数:33,代码来源:test_ap_ht.py
示例15: runFileScriptInSubprocess
def runFileScriptInSubprocess(self,cmd,logfilepath):
# Running another FileScript as a subprocess
apDisplay.printMsg('running FileScript:')
apDisplay.printMsg('------------------------------------------------')
apDisplay.printMsg(cmd)
# stderr=subprocess.PIPE only works with shell=True with python 2.4.
# works on python 2.6. Use shell=True now but shell=True does not
# work with path changed by appionwrapper. It behaves as if the wrapper
# is not used
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout_value = proc.communicate()[0]
while proc.returncode is None:
time.wait(60)
stdout_value = proc.communicate()[0]
try:
logdir = os.path.dirname(logfilepath)
apParam.createDirectory(logdir)
file = open(logfilepath,'w')
except:
apDisplay.printError('Log file can not be created, process did not run.')
file.write(stdout_value)
file.close()
if proc.returncode > 0:
pieces = cmd.split(' ')
apDisplay.printWarning('FileScript %s had an error. Please check its log file: \n%s' % (pieces[0].upper(),logfilepath))
else:
apDisplay.printMsg('FileScript ran successfully')
apDisplay.printMsg('------------------------------------------------')
return proc.returncode
开发者ID:leschzinerlab,项目名称:myami-3.2-freeHand,代码行数:29,代码来源:fileScript.py
示例16: run
def run(self):
while True:
if self.num != end:
self.num += 1
print "Outputting: ", str(self.num)
time.wait(5)
else:
break
开发者ID:rafalcode,项目名称:cwots_python,代码行数:8,代码来源:thre0.py
示例17: Kill
def Kill(self):
print ">>>Killing Drone<<<"
self.Crazyflie.commander.send_setpoint(0, 0, 0, 0)
# Make sure that the last packet leaves before the link is closed
# since the message queue is not flushed before closing
self.Running = False
wait(0.1)
self.Crazyflie.close_link()
开发者ID:OpenGelo,项目名称:drone-tracking,代码行数:8,代码来源:CrazyflieControl.py
示例18: on_error
def on_error(self, status):
if status_code == 420:
self.retryCount += 1
if (self.retryCount > self.retryMax):
return False
else:
time.wait(self.retryTime)
self.retryTime *= 2
return True
开发者ID:rocket-ron,项目名称:Twitter,代码行数:9,代码来源:trackerListener.py
示例19: back
def back(page=2):
if page > 0:
wait(0.5)
click_previous()
page -=1
test_page_and_buttons(page)
return page
开发者ID:lawsofthought,项目名称:wilhelmproject,代码行数:9,代码来源:test_experiment.py
示例20: forward
def forward(page=0):
if page < 2:
wait(0.5)
click_next()
page +=1
test_page_and_buttons(page)
return page
开发者ID:lawsofthought,项目名称:wilhelmproject,代码行数:9,代码来源:test_experiment.py
注:本文中的time.wait函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论