本文整理汇总了Python中urllib.request.Request类的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: shorten
def shorten(url):
"""
与えられたURLをgoo.glを使って短縮します
settings.GOOGLE_URL_SHORTENER_API_KEYが設定されているときはそれを使って短縮します。
詳細は以下を参照してください
https://developers.google.com/url-shortener/v1/getting_started#auth
"""
api_key = getattr(settings, 'GOOGLE_URL_SHORTENER_API_KEY', None)
try:
api_url = API_URL
if api_key:
api_url = '{}?key={}'.format(api_url, api_key)
data = json.dumps({'longUrl': url})
data = data.encode('utf-8')
request = Request(api_url, data)
request.add_header('Content-Type', 'application/json')
r = urlopen(request)
json_string = r.read().decode("utf-8")
return json.loads(json_string)['id']
except Exception as e:
# fail silently
logger = logging.getLogger('kawaz.core.utils')
logger.exception("Failed to shorten `{}`".format(url))
return url
开发者ID:fordream,项目名称:Kawaz3rd,代码行数:25,代码来源:shortenurl.py
示例2: parase_fq_factor
def parase_fq_factor(code):
symbol = _code_to_symbol(code)
try:
request = Request(ct.HIST_FQ_FACTOR_URL%(symbol))
request.add_header("User-Agent", ct.USER_AGENT)
text = urlopen(request, timeout=20).read()
text = text[1:len(text)-1]
text = text.decode('utf-8') if ct.PY3 else text
text = text.replace('{_', '{"')
text = text.replace('total', '"total"')
text = text.replace('data', '"data"')
text = text.replace(':"', '":"')
text = text.replace('",_', '","')
text = text.replace('_', '-')
text = json.loads(text)
df = pd.DataFrame({'date':list(text['data'].keys()), 'fqprice':list(text['data'].values())})
df['date'] = df['date'].map(_fun_except) # for null case
if df['date'].dtypes == np.object:
df['date'] = df['date'].astype(np.str)
df = df.drop_duplicates('date')
df = df.sort('date', ascending=False)
df = df.set_index("date")
df['fqprice'] = df['fqprice'].astype(float)
return df
except Exception as e:
print(e)
开发者ID:hezhenke,项目名称:AshareBackTest,代码行数:26,代码来源:fq.py
示例3: charger
def charger(chemin):
request = Request(chemin)
request.add_header('Accept-encoding', 'gzip')
print('Connexion...')
response = urlopen(request)
info = response.info()
sys.stdout.write('Chargement... ')
sys.stdout.flush()
temps = time.time()
data = response.read()
buf = BytesIO(data)
temps = time.time() - temps
print ('terminé en %.3f secondes, %d octets lus' % (temps, len(data)))
sys.stdout.write('Decompression... ')
sys.stdout.flush()
temps = time.time()
f = gzip.GzipFile(fileobj=buf)
data = f.read()
temps = time.time() - temps
print('terminé en %.3f secondes, %d octets décompressés' % (temps, len(data)))
texte = data.decode(errors='ignore')
return texte
开发者ID:ax13090,项目名称:Bittorrent-IP-Filter-Generator,代码行数:26,代码来源:charger_listes.py
示例4: send_request
def send_request(
self, url, payload="", content_type="application/json", method="GET", raw=False, timeout=30, silent=False
):
try:
opener = build_opener(HTTPHandler)
request = Request(url, data=bytes(payload, "UTF-8") if sys.version_info >= (3,) else bytes(payload))
request.add_header("Content-Type", content_type)
request.get_method = lambda: method
response = opener.open(request, timeout=timeout)
buf = ""
while 1:
data = response.read()
if not data:
break
buf += str(data, "UTF-8") if sys.version_info >= (3,) else data
return json.loads(buf) if not raw else buf
except socket.timeout:
if not silent:
print_color("Error: timed out while trying to communicate with %s:%d" % (self.host, self.port))
except URLError as e:
if not silent:
print_color("Error: %s while attempting to communicate with %s:%d" % (e.reason, self.host, self.port))
except ValueError as e:
if not silent:
print_color("Error: %s while trying to process result from Riak" % e)
return None
开发者ID:jasonfagan,项目名称:riaktool,代码行数:27,代码来源:riak_tool.py
示例5: test_http_doubleslash
def test_http_doubleslash(self):
# Checks the presence of any unnecessary double slash in url does not
# break anything. Previously, a double slash directly after the host
# could could cause incorrect parsing.
h = urllib.request.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html"
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
开发者ID:pogigroo,项目名称:py3k-__format__,代码行数:26,代码来源:test_urllib2.py
示例6: cypher
def cypher(query, **args):
data = {
"query": query,
"params": args
}
data = json.dumps(data)
req = Request(
url="http://localhost:7474/db/data/cypher",
data=data)
req.add_header('Accept', 'application/json')
req.add_header('Content-Type', 'application/json')
try:
resp = urlopen(req)
except HTTPError as err:
if err.code == 400:
err = json.loads(err.read())
return print_error('', query, err)
else:
print(err)
return
else:
resp = json.loads(resp.read())
columns = resp['columns']
rows = resp['data']
print_table(columns, rows)
开发者ID:jRiest,项目名称:sublime-cypher,代码行数:31,代码来源:Cypher.py
示例7: login
def login(self, loc=None):
if loc is None:
loc = self.get_setting('BASE_LOCATION')
login_url = self.get_setting('BASE_URL') + \
self.get_setting('LOGIN_LOCATION')
cookies = HTTPCookieProcessor()
opener = build_opener(cookies)
opener.open(login_url)
try:
token = [x.value for x in cookies.cookiejar if x.name == 'csrftoken'][0]
except IndexError:
raise IOError("No csrf cookie found")
params = dict(username=self.get_setting('LOGIN_USERNAME'),
password=self.get_setting('LOGIN_PASSWORD'),
next=loc,
csrfmiddlewaretoken=token)
encoded_params = urlparse.urlencode(params).encode('utf-8')
req = Request(login_url, encoded_params)
req.add_header('Referer', login_url)
response = opener.open(req)
if response.geturl() == login_url:
raise IOError("Authentication refused")
return opener, response
开发者ID:team294,项目名称:signinapp,代码行数:26,代码来源:backend_roster.py
示例8: download
def download(url, target):
'''
download(string, string) -> boolean
Downloads content from a given URL and saves it to the
given target location. Return True if the content
was successfully downloaded, or False if the download
fails (no content, target already exists).
Keyword arguments:
url -- url of content (e.g: http://../hello.png)
target -- filesystem location to save the content to
return True if success, False if otherwise
'''
if path.exists(target):
print("Error retrieving image: file target '%s' already exists." % target)
return False
opener = build_opener()
req = Request(url)
req.add_header('User-Agent', 'Mozilla/5.0')
try:
fp = opener.open(req)
with open(target, "wb") as fo:
fo.write(fp.read())
return True
except:
print("Error fetching content: ", sys.exc_info()[0])
return False
开发者ID:veiset,项目名称:pyImgSearch,代码行数:32,代码来源:downloader.py
示例9: download
def download(self, request):
""" """
mimetype = 'application/x-ofx'
HTTPheaders = {'Content-type': mimetype, 'Accept': '*/*, %s' % mimetype}
# py3k - ElementTree.tostring() returns bytes not str
request = self.ofxheader + ET.tostring(request).decode()
# py3k: urllib.request wants bytes not str
request = Request(self.url, request.encode(), HTTPheaders)
try:
with contextlib.closing(urlopen(request)) as response:
# py3k: urlopen returns bytes not str
response_ = response.read().decode()
# urllib2.urlopen returns an addinfourl instance, which supports
# a limited subset of file methods. Copy response to a StringIO
# so that we can use tell() and seek().
source = StringIO()
source.write(response_)
# After writing, rewind to the beginning.
source.seek(0)
self.response = source
return source
except HTTPError as err:
# FIXME
print(err.info())
raise
开发者ID:P-Laf,项目名称:ofxtools,代码行数:25,代码来源:Client.py
示例10: authenticate
def authenticate(self, request):
adhocracy_base_url = settings.PC_SERVICES["references"]["adhocracy_api_base_url"]
user_path = request.META.get("HTTP_X_USER_PATH")
user_token = request.META.get("HTTP_X_USER_TOKEN")
user_url = urljoin(adhocracy_base_url, user_path)
if user_path is None and user_token is None:
return None
elif user_path is None or user_token is None:
raise exceptions.AuthenticationFailed("No `X-User-Path` and `X-User-Token` header provided.")
request = Request("%s/principals/groups/gods" % adhocracy_base_url)
request.add_header("X-User-Path", user_path)
request.add_header("X-User-Token", user_token)
response = urlopen(request)
if response.status == 200:
content_type, params = parse_header(response.getheader("content-type"))
encoding = params["charset"].lower()
if content_type != "application/json":
exceptions.AuthenticationFailed("Adhocracy authentification failed due wrong response.")
resource_as_string = response.read().decode(encoding)
gods_group_resource = json.loads(resource_as_string)
gods = gods_group_resource["data"]["adhocracy_core.sheets.principal.IGroup"]["users"]
if user_url in gods:
is_god = True
else:
is_god = False
return AdhocracyUser(user_path, is_god), None
else:
raise exceptions.AuthenticationFailed("Adhocracy authentification failed due invalid credentials.")
开发者ID:almey,项目名称:policycompass-services,代码行数:34,代码来源:auth.py
示例11: getURLInfo
def getURLInfo(self, url=None):
'''
@see: IURLInfoService.getURLInfo
'''
if not url: raise InputError('Invalid URL %s' % url)
assert isinstance(url, str), 'Invalid URL %s' % url
url = unquote(url)
try:
with urlopen(url) as conn:
urlInfo = URLInfo()
urlInfo.URL = url
urlInfo.Date = datetime.now()
contentType = None
for tag, val in conn.info().items():
if tag == 'Content-Type': contentType = val.split(';')[0].strip().lower(); break
if not contentType or contentType != 'text/html':
req = Request(url)
selector = req.get_selector().strip('/')
if selector:
parts = selector.split('/')
if parts: urlInfo.Title = parts[len(parts) - 1]
else:
urlInfo.Title = req.get_host()
return urlInfo
elif contentType == 'text/html': urlInfo.ContentType = contentType
extr = HTMLInfoExtractor(urlInfo)
try: extr.feed(conn.read().decode())
except (AssertionError, HTMLParseError, UnicodeDecodeError): pass
return extr.urlInfo
except (URLError, ValueError): raise InputError('Invalid URL %s' % url)
开发者ID:Halfnhav4,项目名称:Superdesk,代码行数:31,代码来源:url_info.py
示例12: http_request
def http_request(self, request):
"""Processes cookies for a HTTP request.
@param request: request to process
@type request: urllib2.Request
@return: request
@rtype: urllib2.Request
"""
COOKIE_HEADER_NAME = "Cookie"
tmp_request = Request_(request.get_full_url(), request.data, {},
request.origin_req_host,
request.unverifiable)
self.cookiejar.add_cookie_header(tmp_request)
# Combine existing and new cookies.
new_cookies = tmp_request.get_header(COOKIE_HEADER_NAME)
if new_cookies:
if request.has_header(COOKIE_HEADER_NAME):
# Merge new cookies with existing ones.
old_cookies = request.get_header(COOKIE_HEADER_NAME)
merged_cookies = '; '.join([old_cookies, new_cookies])
request.add_unredirected_header(COOKIE_HEADER_NAME,
merged_cookies)
else:
# No existing cookies so just set new ones.
request.add_unredirected_header(COOKIE_HEADER_NAME, new_cookies)
return request
开发者ID:cedadev,项目名称:ndg_httpsclient,代码行数:25,代码来源:utils.py
示例13: upload
def upload(recipe, result, server, key=None):
'''upload build'''
branch = result.pop('branch', 'unknown')
# FIXME: use urljoin
request = Request('{}/build/{}/{}/{}'.format(
server, quote(recipe['name']), quote(branch),
quote('{} {}'.format(sys.platform, platform.machine()))))
request.add_header('Content-Type', 'application/json')
if key is not None:
request.add_header('Authorization', key)
try:
urlopen(request, json.dumps(result).encode('UTF-8'))
except HTTPError as exc:
logging.error("The server couldn't fulfill the request.")
logging.error('Error code: %s', exc.code)
if exc.code == 400:
logging.error("Client is broken, wrong syntax given to server")
elif exc.code == 401:
logging.error("Wrong key provided for project.")
logging.error("%s", exc.read())
return False
except URLError as exc:
logging.error('Failed to reach a server.')
logging.error('Reason: %s', exc.reason)
return False
return True
开发者ID:Cloudef,项目名称:buildhck,代码行数:30,代码来源:buildhck.py
示例14: _add_logo
def _add_logo(self, show, audio):
# APIC part taken from http://mamu.backmeister.name/praxis-tipps/pythonmutagen-audiodateien-mit-bildern-versehen/
url = show.station.logo_url
if url is not None:
request = Request(url)
request.get_method = lambda: 'HEAD'
try:
response = urlopen(request)
logo_type = response.info().gettype()
if logo_type in ['image/jpeg', 'image/png']:
img_data = urlopen(url).read()
img = APIC(
encoding=3, # 3 is for utf-8
mime=logo_type,
type=3, # 3 is for the cover image
desc=u'Station logo',
data=img_data
)
audio.add(img)
except (HTTPError, URLError) as e:
message = "Error during capturing %s - %s" % (url, e)
self.log.error(message)
except Exception as e:
raise e
开发者ID:pimpmypixel,项目名称:capturadio,代码行数:25,代码来源:__init__.py
示例15: data
def data(self):
if not hasattr(self, '_data'):
request = URLRequest(self.url)
# Look in the cache for etag / last modified headers to use
# TODO: "expires" header could be supported
if self.env and self.env.cache:
headers = self.env.cache.get(
('url', 'headers', self.url))
if headers:
etag, lmod = headers
if etag: request.add_header('If-None-Match', etag)
if lmod: request.add_header('If-Modified-Since', lmod)
# Make a request
try:
response = urlopen(request)
except HTTPError as e:
if e.code != 304:
raise
# Use the cached version of the url
self._data = self.env.cache.get(('url', 'contents', self.url))
else:
with contextlib.closing(response):
self._data = response.read()
# Cache the info from this request
if self.env and self.env.cache:
self.env.cache.set(
('url', 'headers', self.url),
(response.headers.getheader("ETag"),
response.headers.getheader("Last-Modified")))
self.env.cache.set(('url', 'contents', self.url), self._data)
return self._data
开发者ID:thebbgroup,项目名称:webassets,代码行数:34,代码来源:merge.py
示例16: test_can_post_data
def test_can_post_data(self):
cas_base = 'https://example.com'
url = 'https://example.com/abc'
headers = {
'soapaction': 'http://www.oasis-open.org/committees/security',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'accept': 'text/xml',
'connection': 'keep-alive',
'content-type': 'text/xml'
}
params = {'TARGET': url}
uri = '{}cas/samlValidate?{}'.format(cas_base, urlencode(params))
request = Request(uri, '', headers)
request.data = get_saml_assertion('ticket')
try:
urlopen(request)
except URLError:
# As long as this isn't a TypeError, and the url request
# was actually made, then we can assert that
# get_saml_assertion() is good. This is to prevent an
# issue introduced since Python 3:
#
# POST data should be bytes or an iterable of bytes. It
# cannot be of type str.
#
pass
开发者ID:ccnmtl,项目名称:djangowind,代码行数:27,代码来源:test_auth.py
示例17: download
def download(url, tor=False):
def create_connection(address, timeout=None, source_address=None):
sock = socks.socksocket()
sock.connect(address)
return sock
if tor:
if not HAVE_SOCKS:
print_error("Missing dependency, install socks (`pip install SocksiPy`)")
return None
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, '127.0.0.1', 9050)
socket.socket = socks.socksocket
socket.create_connection = create_connection
try:
req = Request(url)
req.add_header('User-agent', 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0)')
res = urlopen(req)
data = res.read()
except HTTPError as e:
print_error(e)
except URLError as e:
if tor and e.reason.errno == 111:
print_error("Connection refused, maybe Tor is not running?")
else:
print_error(e)
except Exception as e:
print_error("Failed download: {0}".format(e))
else:
return data
开发者ID:killbug2004,项目名称:CIRTKit,代码行数:32,代码来源:network.py
示例18: __init__
def __init__(self, name) :
"""
@param name: URL to be opened
@keyword additional_headers: additional HTTP request headers to be added to the call
"""
try :
# Note the removal of the fragment ID. This is necessary, per the HTTP spec
req = Request(url=name.split('#')[0])
req.add_header('Accept', 'text/html, application/xhtml+xml')
self.data = urlopen(req)
self.headers = self.data.info()
if URIOpener.CONTENT_LOCATION in self.headers :
self.location = urlparse.urljoin(self.data.geturl(),self.headers[URIOpener.CONTENT_LOCATION])
else :
self.location = name
except urllib_HTTPError :
e = sys.exc_info()[1]
from pyMicrodata import HTTPError
msg = BaseHTTPRequestHandler.responses[e.code]
raise HTTPError('%s' % msg[1], e.code)
except Exception :
e = sys.exc_info()[1]
from pyMicrodata import MicrodataError
raise MicrodataError('%s' % e)
开发者ID:yarikoptic,项目名称:rdflib,代码行数:28,代码来源:utils.py
示例19: load
def load(self, location, data=None, headers={}):
if not location:
raise LoginError()
self.last_url = re.sub(r"https?:\/\/[^/]+", r"", location)
heads = {"Accept-Encoding": "gzip, deflate",
"User-Agent": self.core_cfg.get("User-Agent", "OTRS_US/0.0")}
if "Cookies" in self.runt_cfg:
heads["Cookie"] = self.runt_cfg["Cookies"]
heads.update(headers)
r = Request(location, data, headers=heads)
try:
pg = urlopen(r, timeout=60)
except HTTPError as err:
self.echo("HTTP Error:", err.getcode())
return
except Exception as err:
self.echo(repr(err))
return
pd = pg.read()
if pg.getheader("Content-Encoding") == "gzip":
pd = decompress(pd)
self.dump_data(pg, pd)
if not self.check_login(pd.decode(errors="ignore")):
raise LoginError(r.get_full_url())
return self.parse(pd)
开发者ID:Lysovenko,项目名称:OTRS_US,代码行数:25,代码来源:pgload.py
示例20: get_page_urls
def get_page_urls(url, user_agent=None):
req = Request(url)
if user_agent:
req.add_header('User-Agent', user_agent)
response = urlopen(req)
urls = REGEX_URLS.findall(str(response.read()))
return set(url[0].strip('"\'') for url in urls)
开发者ID:angrytomatoe,项目名称:cryptophp,代码行数:7,代码来源:check_url.py
注:本文中的urllib.request.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论