本文整理汇总了Python中urllib.request.urlopen函数的典型用法代码示例。如果您正苦于以下问题:Python urlopen函数的具体用法?Python urlopen怎么用?Python urlopen使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了urlopen函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: delete_note
def delete_note(self, note_id):
""" method to permanently delete a note
Arguments:
- note_id (string): key of the note to trash
Returns:
A tuple `(note, status)`
- note (dict): an empty dict or an error message
- status (int): 0 on sucesss and -1 otherwise
"""
# notes have to be trashed before deletion
note, status = self.trash_note(note_id)
if (status == -1):
return note, status
params = '/{0}?auth={1}&email={2}'.format(str(note_id), self.get_token(),
self.username)
request = Request(url=DATA_URL+params, method='DELETE')
try:
urllib2.urlopen(request)
except IOError as e:
return e, -1
return {}, 0
开发者ID:enra64,项目名称:SimpleSimpleNoteClient,代码行数:26,代码来源:sn.py
示例2: testPasswordProtectedSite
def testPasswordProtectedSite(self):
support.requires('network')
with support.transient_internet('mueblesmoraleda.com'):
url = 'http://mueblesmoraleda.com'
robots_url = url + "/robots.txt"
# First check the URL is usable for our purposes, since the
# test site is a bit flaky.
try:
urlopen(robots_url)
except HTTPError as e:
if e.code not in {401, 403}:
self.skipTest(
"%r should return a 401 or 403 HTTP error, not %r"
% (robots_url, e.code))
else:
self.skipTest(
"%r should return a 401 or 403 HTTP error, not succeed"
% (robots_url))
parser = urllib.robotparser.RobotFileParser()
parser.set_url(url)
try:
parser.read()
except URLError:
self.skipTest('%s is unavailable' % url)
self.assertEqual(parser.can_fetch("*", robots_url), False)
开发者ID:AlexHorlenko,项目名称:ironpython3,代码行数:25,代码来源:test_robotparser.py
示例3: __query_website
def __query_website(d):
""" Communicate with the CMD website """
webserver = 'http://stev.oapd.inaf.it'
print('Interrogating {0}...'.format(webserver))
# url = webserver + '/cgi-bin/cmd_2.8'
url = webserver + '/cgi-bin/cmd'
q = urlencode(d)
# print('Query content: {0}'.format(q))
if py3k:
req = request.Request(url, q.encode('utf8'))
c = urlopen(req).read().decode('utf8')
else:
c = urlopen(url, q).read()
aa = re.compile('output\d+')
fname = aa.findall(c)
if len(fname) > 0:
url = '{0}/~lgirardi/tmp/{1}.dat'.format(webserver, fname[0])
print('Downloading data...{0}'.format(url))
bf = urlopen(url)
r = bf.read()
typ = file_type(r, stream=True)
if typ is not None:
r = zlib.decompress(bytes(r), 15 + 32)
return r
else:
# print(c)
print(url + q)
if "errorwarning" in c:
p = __CMD_Error_Parser()
p.feed(c)
print('\n', '\n'.join(p.data).strip())
raise RuntimeError('Server Response is incorrect')
开发者ID:hypergravity,项目名称:bopy,代码行数:32,代码来源:cmd.py
示例4: ping_google
def ping_google(sitemap_url=None, ping_url=PING_URL):
"""
Alerts Google that the sitemap for the current site has been updated.
If sitemap_url is provided, it should be an absolute path to the sitemap
for this site -- e.g., '/sitemap.xml'. If sitemap_url is not provided, this
function will attempt to deduce it by using urlresolvers.reverse().
"""
if sitemap_url is None:
try:
# First, try to get the "index" sitemap URL.
sitemap_url = urlresolvers.reverse("django.contrib.sitemaps.views.index")
except urlresolvers.NoReverseMatch:
try:
# Next, try for the "global" sitemap URL.
sitemap_url = urlresolvers.reverse("django.contrib.sitemaps.views.sitemap")
except urlresolvers.NoReverseMatch:
pass
if sitemap_url is None:
raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.")
from django.contrib.sites.models import Site
current_site = Site.objects.get_current()
url = "http://%s%s" % (current_site.domain, sitemap_url)
params = urlencode({"sitemap": url})
urlopen("%s?%s" % (ping_url, params))
开发者ID:streeter,项目名称:django,代码行数:27,代码来源:__init__.py
示例5: get_fb_post_json
def get_fb_post_json(user):
"""
:param user: 유저 객체
:return: 유저의 모든 게시물 json 의 url
"""
url = 'https://graph.facebook.com/me?access_token=%s&fields=posts' % user.access_token
json_data = json.loads(urlopen(url).read())
for article in json_data['posts']['data']:
article['image'] = get_fb_images_from_article(user, article['id'])
all_post_data = json_data['posts']['data']
url = json_data['posts']['paging']['next']
while True:
json_data = json.loads(urlopen(url).read())
if len(json_data['data']) == 0:
break
url = json_data['paging']['next']
for article in json_data['data']:
article['image'] = get_fb_images_from_article(user, article['id'])
all_post_data.append(json_data['data'])
return all_post_data
开发者ID:Kcrong,项目名称:SNS-article-parser,代码行数:30,代码来源:fb_parser.py
示例6: getStats
def getStats(gen):
#f = open('stats.html', 'r+')
#s = f.read()
#f.close()
if (gen == 1):
s = urlopen("http://bulbapedia.bulbagarden.net/wiki/List_of_Pok%C3%A9mon_by_base_stats_%28Generation_I%29").read().decode("utf-8")
elif (gen < 6):
s = urlopen("http://bulbapedia.bulbagarden.net/wiki/List_of_Pok%C3%A9mon_by_base_stats_%28Generation_II-V%29").read().decode("utf-8")
else:
s = urlopen("http://bulbapedia.bulbagarden.net/wiki/List_of_Pok%C3%A9mon_by_base_stats_%28Generation_VI-present%29").read().decode("utf-8")
#step1 = s.split('<')
step1 = s.splitlines()
step2 = [x for x in step1 if (('FF5959' in x) or
('F5AC78' in x) or
('FAE078' in x) or
('9DB7F5' in x) or
('A7DB8D' in x) or
('FA92B2' in x) or
('(Pokémon)' in x))]
step3 = removeABs(step2)
step4 = [x[1:] for x in step3]
step5 = toDict(step4)
return step5
开发者ID:GYD102,项目名称:pokeStats,代码行数:26,代码来源:getStuff.py
示例7: run
def run(self):
global lock
global num
while not self.work_queue.empty(): # 队列非空时,一直循环
url = self.work_queue.get() # 取出一条数据
try:
try:
r = request.urlopen(url["url"], timeout=60) # 下载图片,超时为60秒
except:
r = request.urlopen(url["url"], timeout=120) # 如果超时,再次下载,超时为120秒
if "Content-Type" in r.info():
fileName = os.path.join(
self.fold, replace(url["name"] + "." + r.info()["Content-Type"].split("image/")[1])
) # 根据查看返回的“Content-Type”来判断图片格式,然后生成保存路径
if lock.acquire(): # 线程同步
print("开始下载第" + str(num) + "张照片")
if os.path.exists(fileName):
# 图片名称若存在,则重命名图片名称
fileName = os.path.join(
self.fold,
replace("重命名_图片_" + str(num) + "." + r.info()["Content-Type"].split("image/")[1]),
)
num = num + 1
lock.release()
f = open(fileName, "wb")
f.write(r.read())
f.close()
except:
print(url["url"] + ":下载超时!")
开发者ID:yumige,项目名称:WebDataAnalysis,代码行数:31,代码来源:get-data-from-qq.py
示例8: wmo_importer
def wmo_importer(url='https://raw.githubusercontent.com/flyingeek/editolido/gh-pages/ext-sources/nsd_bbsss.txt'):
# http://tgftp.nws.noaa.gov/data/nsd_bbsss.txt
if PY2:
delimiter = b';'
data = urlopen(url)
else:
delimiter = ';'
import codecs
data = codecs.iterdecode(urlopen(url), 'utf-8')
reader = csv.reader(data, delimiter=delimiter, quoting=csv.QUOTE_NONE)
def geo_normalize(value):
# recognize NSEW or undefined (which is interpreted as North)
orientation = value[-1]
sign = -1 if orientation in 'SW' else 1
coords = value if orientation not in 'NEWS' else value[:-1]
coords += '-0-0' # ensure missing seconds or minutes are 0
degrees, minutes, seconds = map(float, coords.split('-', 3)[:3])
return sign * (degrees + (minutes / 60) + (seconds / 3600))
not_airport = '----'
for row in reader:
name = row[0] + row[1] if row[2] == not_airport else row[2]
yield name, row[0] + row[1], geo_normalize(row[8]), geo_normalize(row[7])
开发者ID:flyingeek,项目名称:editolido,代码行数:25,代码来源:geoindex.py
示例9: test_MockApp_assert_called_once_with_two_calls
def test_MockApp_assert_called_once_with_two_calls():
app = MockApp()
with mock_server(app) as port:
urlopen('http://127.0.0.1:%d/hello' % port)
urlopen('http://127.0.0.1:%d/world' % port)
assert_raises(AssertionError, lambda:
app.assert_called_once_with('GET /world'))
开发者ID:djmitche,项目名称:webmock,代码行数:7,代码来源:test_webmock.py
示例10: test_MockApp_assert_has_calls_unordered_fails
def test_MockApp_assert_has_calls_unordered_fails():
app = MockApp()
with mock_server(app) as port:
urlopen('http://127.0.0.1:%d/hello' % port)
urlopen('http://127.0.0.1:%d/world' % port)
assert_raises(AssertionError, lambda:
app.assert_has_calls(['GET /cruel', 'GET /planet'], any_order=True))
开发者ID:djmitche,项目名称:webmock,代码行数:7,代码来源:test_webmock.py
示例11: a
def a(url):
file = url.split('/')[-1]
u = urlopen(url)
meta = u.info()
file_size = int(meta.get_all("Content-Length")[0])
file_dl = 0
block_sz = 8192
if os.path.exists(file) and file_size == os.path.getsize(file):
print("The file '%s' already exist." % file)
exit()
elif os.path.exists(file) and file_size != os.path.getsize(file):
print("Resuming Download")
f = open(file, "ab")
dld = os.path.getsize(file)
print("Downloading: {} Bytes: {}".format(file, file_size))
while True:
buffer = u.read(dld)
if not buffer:
break
req = Request(url)
req.headers['Range'] = 'bytes=%s-%s' % (dld, file_size)
buffer = urlopen(req).read()
file_dl += len(buffer)
f.write(buffer)
remain = dld * 100./ file_size
status = "\r%10d [%3.2f%%]" % (file_dl, file_dl * remain / file_size)
status = status + chr(8)*(len(status)+1)
time.sleep(1)
sys.stdout.write(status)
sys.stdout.flush()
f.close()
print("File: %s Downloaded Successfully" % (file))
exit()
f = open(file, 'wb')
print("Downloading: {} Bytes: {}".format(file, file_size))
while True:
buffer = u.read(block_sz)
if not buffer:
break
file_dl += len(buffer)
f.write(buffer)
status = "\r%10d [%3.2f%%]" % (file_dl, file_dl * 100. / file_size)
status = status + chr(8)*(len(status)+1)
time.sleep(1)
sys.stdout.write(status)
sys.stdout.flush()
f.close()
print("File: %s Downloaded Successfully" % (file))
开发者ID:danycoro,项目名称:pGet,代码行数:60,代码来源:pGet.py
示例12: create_app
def create_app(name, engine):
"""
Create a Skeleton application (needs internet connection to github)
"""
try:
if engine.lower() == "sqlalchemy":
url = urlopen(SQLA_REPO_URL)
dirname = "Flask-AppBuilder-Skeleton-master"
elif engine.lower() == "mongoengine":
url = urlopen(MONGOENGIE_REPO_URL)
dirname = "Flask-AppBuilder-Skeleton-me-master"
zipfile = ZipFile(BytesIO(url.read()))
zipfile.extractall()
os.rename(dirname, name)
click.echo(click.style("Downloaded the skeleton app, good coding!", fg="green"))
return True
except Exception as e:
click.echo(click.style("Something went wrong {0}".format(e), fg="red"))
if engine.lower() == "sqlalchemy":
click.echo(
click.style(
"Try downloading from {0}".format(SQLA_REPO_URL), fg="green"
)
)
elif engine.lower() == "mongoengine":
click.echo(
click.style(
"Try downloading from {0}".format(MONGOENGIE_REPO_URL), fg="green"
)
)
return False
开发者ID:dpgaspar,项目名称:Flask-AppBuilder,代码行数:31,代码来源:cli.py
示例13: main
def main():
width = 550
height = 550
print("Updating...")
with urlopen("http://himawari8-dl.nict.go.jp/himawari8/img/D531106/latest.json") as latest_json:
latest = strptime(loads(latest_json.read().decode("utf-8"))["date"], "%Y-%m-%d %H:%M:%S")
print("Latest version: {} GMT\n".format(strftime("%Y/%m/%d/%H:%M:%S", latest)))
url_format = "http://himawari8.nict.go.jp/img/D531106/{}d/{}/{}_{}_{}.png"
png = Image.new('RGB', (width*level, height*level))
print("Downloading tiles: 0/{} completed".format(level*level), end="\r")
for x in range(level):
for y in range(level):
with urlopen(url_format.format(level, width, strftime("%Y/%m/%d/%H%M%S", latest), x, y)) as tile_w:
tiledata = tile_w.read()
tile = Image.open(BytesIO(tiledata))
png.paste(tile, (width*x, height*y, width*(x+1), height*(y+1)))
print("Downloading tiles: {}/{} completed".format(x*level + y + 1, level*level), end="\r")
print("\nDownloaded\n")
makedirs(split(output_file)[0], exist_ok=True)
png.save(output_file, "PNG")
call(["feh", "--bg-fill", "--no-fehbg", output_file])
print("Done!\n")
开发者ID:adnidor,项目名称:himawaripy,代码行数:32,代码来源:himawaripy.py
示例14: submit_request
def submit_request(self, request, return_response=False):
'''submit_request will make the request,
via a stream or not. If return response is True, the
response is returned as is without further parsing.
Given a 401 error, the update_token function is called
to try the request again, and only then the error returned.
'''
try:
response = urlopen(request)
# If we have an HTTPError, try to follow the response
except HTTPError as error:
# Case 1: we have an http 401 error, and need to refresh token
bot.debug('Http Error with code %s' % (error.code))
if error.code == 401:
self.update_token(response=error)
try:
request = self.prepare_request(request.get_full_url(),
headers=self.headers)
response = urlopen(request)
except HTTPError as error:
bot.debug('Http Error with code %s' % (error.code))
return error
else:
return error
return response
开发者ID:yqin,项目名称:singularity,代码行数:30,代码来源:base.py
示例15: vola_importer
def vola_importer(url="https://raw.githubusercontent.com/flyingeek/editolido/gh-pages/ext-sources/vola_legacy_report.txt"):
# https://oscar.wmo.int/oscar/vola/vola_legacy_report.txt
if PY2:
delimiter = b'\t'
data = urlopen(url)
else:
delimiter = '\t'
import codecs
data = codecs.iterdecode(urlopen(url), 'utf-8')
reader = csv.reader(data, delimiter=delimiter, quoting=csv.QUOTE_NONE)
def geo_normalize(value):
# recognize NSEW or undefined (which is interpreted as North)
orientation = value[-1]
sign = -1 if orientation in 'SW' else 1
coords = value if orientation not in 'NEWS' else value[:-1]
coords += ' 0 0' # ensure missing seconds or minutes are 0
degrees, minutes, seconds = map(float, coords.split(' ', 3)[:3])
return sign * (degrees + (minutes / 60) + (seconds / 3600))
headers = next(reader)
for row in reader:
name = row[5]
if not name:
continue
yield name, geo_normalize(row[9]), geo_normalize(row[8]), row[28].split(', ')
开发者ID:flyingeek,项目名称:editolido,代码行数:26,代码来源:geoindex.py
示例16: get_from_wiki
def get_from_wiki(file_name):
"""We host some larger files used for the test suite separately on the TreeCorr wiki repo
so people don't need to download them with the code when checking out the repo.
Most people don't run the tests after all.
"""
import os
local_file_name = os.path.join('data',file_name)
url = 'https://github.com/rmjarvis/TreeCorr/wiki/' + file_name
if not os.path.isfile(local_file_name):
try:
from urllib.request import urlopen
except ImportError:
from urllib import urlopen
import shutil
print('downloading %s from %s...'%(local_file_name,url))
# urllib.request.urlretrieve(url,local_file_name)
# The above line doesn't work very well with the SSL certificate that github puts on it.
# It works fine in a web browser, but on my laptop I get:
# urllib.error.URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:600)>
# The solution is to open a context that doesn't do ssl verification.
# But that can only be done with urlopen, not urlretrieve. So, here is the solution.
# cf. http://stackoverflow.com/questions/7243750/download-file-from-web-in-python-3
# http://stackoverflow.com/questions/27835619/ssl-certificate-verify-failed-error
try:
import ssl
context = ssl._create_unverified_context()
u = urlopen(url, context=context)
except (AttributeError, TypeError):
# Note: prior to 2.7.9, there is no such function or even the context keyword.
u = urlopen(url)
with open(local_file_name, 'wb') as out:
shutil.copyfileobj(u, out)
u.close()
print('done.')
开发者ID:kilianbreathnach,项目名称:TreeCorr,代码行数:35,代码来源:test_helper.py
示例17: main
def main(self):
"""
dummy for main core method.
"""
url = 'http://%s:%s/orlangur/_authenticate' % (
self.api.config.options.app.orlangur_server, self.api.config.options.app.orlangur_port)
data = {'username': self.api.config.options.app.orlangur_user,
'password': self.api.config.options.app.orlangur_password}
from urllib.error import URLError
try:
r = req.Request(url, parse.urlencode(data).encode('utf8'))
req.urlopen(r).read()
connection = Connection(self.api.config.options.app.orlangur_server,
self.api.config.options.app.orlangur_port)
db = connection.orlangur
self.db = db
except URLError:
QMessageBox.warning(self.app, 'Error',
'Orlangur server seems down')
return
self.compiler = pystache
self.app.async(self.getConfig, self.assignConfig)
开发者ID:averrin,项目名称:demesne,代码行数:26,代码来源:core.py
示例18: nothings
def nothings(input):
response = str(BS(urlopen(url+str(input))).text)
for each_try in range(input+400):
try:
response = str(BS(urlopen(url+str([int(s) for s in response.split() if s.isdigit()][0]))).text);print(str([int(s) for s in response.split() if s.isdigit()][0]))
except:
return("Non-Nothing URL found!", response)
开发者ID:Zenohm,项目名称:Functional-Python-Programs,代码行数:7,代码来源:omnicrawler.py
示例19: urlget
def urlget(n, url):
if n == 1:
with request.urlopen(url) as f:
data = f.read()
for k, v in f.getheaders():
print('%s: %s' % (k, v))
#print('Data:', data.decode('utf-8'))
js = json.loads(data.decode('utf-8'))
print('JSON: ', end='')
pp_json(js)
if n == 2:
with request.urlopen(url) as f:
for k, v in f.getheaders():
print('%s: %s' % (k, v))
s = f.read().decode('GB2312')
print('\n\nData:\n', s)
file_name = r'600518.htm'
with open(file_name, 'w') as ff:
ff.write(s)
if n == 3:
with request.urlopen(url) as f:
for k, v in f.getheaders():
print('%s: %s' % (k, v))
s = f.read()
file_name = r'StockAList.htm'
with open(file_name, 'wb') as ff:
ff.write(s)
开发者ID:chao98,项目名称:Python,代码行数:31,代码来源:try_urllib.py
示例20: getPublicIP
def getPublicIP(v6=True):
if v6:
# try:
text=urlopen("http://ipv6.ip6.me/").read()
if v3:
match=re.search(bytes("\+3>([^<]+)<", 'ascii'), text)
else:
match=re.search("\+3>([^<]+)<", text)
ip=match.group(1)
ip=ip.decode('ascii')
return ip
# except Exception as e:
# print(e)
# ip=urlopen("http://whatismyv6ip.com/myip").read()
# return ip.decode('ascii')
else:
text=urlopen("http://ip4.me/").read()
if v3:
match=re.search(bytes("\+3>([^<]+)<", 'ascii'), text)
else:
match=re.search("\+3>([^<]+)<", text)
# ip=urlopen("http://whatismyv6ip.com/myip").read()
# return ip.decode('ascii')
ip=match.group(1)
ip=ip.decode('ascii')
return ip
开发者ID:antiface,项目名称:CreditCoin,代码行数:26,代码来源:util.py
注:本文中的urllib.request.urlopen函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论