本文整理汇总了Python中urllib.addinfourl函数的典型用法代码示例。如果您正苦于以下问题:Python addinfourl函数的具体用法?Python addinfourl怎么用?Python addinfourl使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了addinfourl函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: testscheme_open
def testscheme_open(self, req):
try:
selector = req.get_selector()
if selector == u'/ws_newcompass.asmx?WSDL':
return urllib.addinfourl(
pkg_resources.resource_stream(__name__, 'tests/testdata/wsdl.xml'),
httplib.HTTPMessage(open('/dev/null')),
req.get_full_url(),
200
)
elif selector == u'/ws_newcompass.asmx':
soapResponse = urlparse.urlparse(req.get_header('Soapaction')).path.strip('"').split('/')[-1] + '.xml'
return urllib.addinfourl(
pkg_resources.resource_stream(__name__, 'tests/testdata/' + soapResponse),
httplib.HTTPMessage(open('/dev/null')),
req.get_full_url(),
200
)
elif selector == u'/biomuta.tsv':
return urllib2.addinfourl(
pkg_resources.resource_stream(__name__, 'tests/testdata/Biomuta.tsv'),
httplib.HTTPMessage(open('/dev/null')),
req.get_full_url(),
200
)
else:
raise urllib2.URLError('Not found')
except Exception:
raise urllib2.URLError('Not found')
开发者ID:MCLConsortium,项目名称:jpl.mcl.protocol,代码行数:29,代码来源:testing.py
示例2: fake_urlopen
def fake_urlopen(self, url):
"""Fake urlopen using test client"""
if 'example' in url:
response = cStringIO.StringIO('')
return addinfourl(response, {'X-Pingback': '/xmlrpc.php'}, url)
elif 'localhost' in url:
response = cStringIO.StringIO('<link rel="pingback" href="/xmlrpc/">')
return addinfourl(response, {}, url)
开发者ID:FrankBie,项目名称:django-blog-zinnia,代码行数:8,代码来源:ping.py
示例3: fake_urlopen
def fake_urlopen(self, url):
"""Fake urlopen using test client"""
if 'example' in url:
response = cStringIO.StringIO('')
return addinfourl(response, {'X-Pingback': '/xmlrpc.php'}, url)
else:
response = cStringIO.StringIO(self.client.get(url).content)
return addinfourl(response, {}, url)
开发者ID:wedontplayfair,项目名称:django-blog-zinnia,代码行数:8,代码来源:tests.py
示例4: fake_urlopen
def fake_urlopen(self, url):
"""Fake urlopen using test client"""
if 'example' in url:
response = cStringIO.StringIO('')
return addinfourl(response, {'X-Pingback': '/xmlrpc.php',
'Content-Type': 'text/html'}, url)
elif 'localhost' in url:
response = cStringIO.StringIO(
'<link rel="pingback" href="/xmlrpc/">')
return addinfourl(response, {'Content-Type': 'text/xhtml'}, url)
elif 'google' in url:
response = cStringIO.StringIO('PNG CONTENT')
return addinfourl(response, {'content-type': 'image/png'}, url)
elif 'error' in url:
raise URLError('Invalid ressource')
开发者ID:anujag,项目名称:django-gstudio,代码行数:15,代码来源:ping.py
示例5: do_open
def do_open(self, http_class, req):
host = req.get_host()
if not host:
raise URLError('no host given')
h = http_class(host, timeout=req.timeout)
h.set_debuglevel(self._debuglevel)
headers = dict(req.unredirected_hdrs)
headers.update(dict(((k, v) for k, v in req.headers.items() if k not in headers)))
headers['Connection'] = 'close'
headers = dict(((name.title(), val) for name, val in headers.items()))
if req._tunnel_host:
tunnel_headers = {}
proxy_auth_hdr = 'Proxy-Authorization'
if proxy_auth_hdr in headers:
tunnel_headers[proxy_auth_hdr] = headers[proxy_auth_hdr]
del headers[proxy_auth_hdr]
h.set_tunnel(req._tunnel_host, headers=tunnel_headers)
try:
h.request(req.get_method(), req.get_selector(), req.data, headers)
try:
r = h.getresponse(buffering=True)
except TypeError:
r = h.getresponse()
except socket.error as err:
raise URLError(err)
fp = socket._fileobject(RecvWrapper(r), close=True)
resp = addinfourl(fp, r.msg, req.get_full_url())
resp.code = r.status
resp.msg = r.reason
return resp
开发者ID:connoryang,项目名称:dec-eve-serenity,代码行数:32,代码来源:urllib2.py
示例6: exec_open
def exec_open (self, req):
path = req.get_selector()
args = path.split("?", 1)
if len(args) == 1: args.append('')
#print "args ", args
# Prepare CGI-like environment
os.putenv ('GATEWAY_INTERFACE', 'CGI/1.1')
os.putenv ('HTTP_ACCEPT_ENCODING', req.headers.get ('Accept-encoding'))
os.putenv ('HTTP_USER_AGENT', 'DBS-CGI-Direct-call')
os.putenv ('REQUEST_METHOD', 'POST')
os.putenv ('CONTENT_LENGTH', str(req.headers.get ('Content-length')))
os.putenv ('CONTENT_TYPE', req.headers.get ('Content-type'))
os.putenv ('QUERY_STRING', args[1])
os.putenv ('REQUEST_URI', path)
os.putenv ('SCRIPT_NAME', args[0])
os.putenv ('SERVER_NAME', 'localhost')
os.putenv ('SERVER_PORT', str(80))
os.putenv ('SERVER_PROTOCOL', 'HTTP/1.1')
os.putenv ('SERVER_SOFTWARE', 'Builtin')
# Open subprocess and write form data
r, w = os.popen2(args[0])
r.write (req.get_data())
r.close ()
# Read back headers, then leave the body to be read
msg = httplib.HTTPMessage (w, 0)
msg.fp = None
return urllib.addinfourl (w, msg, path)
开发者ID:bbockelm,项目名称:DBS,代码行数:29,代码来源:dbsCgiApi.py
示例7: http_error_302
def http_error_302(self, req, fp, code, msg, headers):
infourl = urllib.addinfourl(fp, headers, req.get_full_url())
infourl.status = code
infourl.code = code
logging.debug('NoRedirectHandler got redirect to + ' + headers['Location'])
self.got_redirect = True
return infourl
开发者ID:johnboiles,项目名称:hotspot_autologin,代码行数:7,代码来源:hotspot_autologin.py
示例8: test_returns_response_when_successful_response
def test_returns_response_when_successful_response(self, urlopen):
resp = addinfourl(StringIO(u"mock_content"), "mock headers", url="http://www.example.com/", code="200")
urlopen.return_value = resp
api_stub = ClientStub()
response = api_stub.do_something()
self.assertEqual(resp, response)
开发者ID:madisona,项目名称:apyclient,代码行数:7,代码来源:test_apyclient.py
示例9: decode
def decode (page):
"""Gunzip or deflate a compressed page."""
log.debug(LOG_CHECK, "page info %d %s", page.code, str(page.info()))
encoding = page.info().get("Content-Encoding")
if encoding in ('gzip', 'x-gzip', 'deflate'):
# cannot seek in socket descriptors, so must get content now
content = page.read()
try:
if encoding == 'deflate':
fp = StringIO(zlib.decompress(content))
else:
fp = gzip.GzipFile('', 'rb', 9, StringIO(content))
except zlib.error as msg:
log.debug(LOG_CHECK, "uncompressing had error "
"%s, assuming non-compressed content", str(msg))
fp = StringIO(content)
# remove content-encoding header
headers = httplib.HTTPMessage(StringIO(""))
ceheader = re.compile(r"(?i)content-encoding:")
for h in page.info().keys():
if not ceheader.match(h):
headers[h] = page.info()[h]
newpage = urllib.addinfourl(fp, headers, page.geturl())
newpage.code = page.code
newpage.msg = page.msg
return newpage
return page
开发者ID:Zearin,项目名称:linkchecker,代码行数:27,代码来源:httputil.py
示例10: open_local_file
def open_local_file(self, req):
import mimetypes
import mimetools
host = req.get_host()
file = req.get_selector()
localfile = urllib.url2pathname(file)
stats = os.stat(localfile)
size = stats[stat.ST_SIZE]
modified = rfc822.formatdate(stats[stat.ST_MTIME])
mtype = mimetypes.guess_type(file)[0]
if host:
host, port = urllib.splitport(host)
if port or socket.gethostbyname(host) not in self.get_names():
raise urllib2.URLError("file not on local host")
fo = open(localfile, "rb")
brange = req.headers.get("Range", None)
brange = range_header_to_tuple(brange)
assert brange != ()
if brange:
(fb, lb) = brange
if lb == "":
lb = size
if fb < 0 or fb > size or lb > size:
raise RangeError("Requested Range Not Satisfiable")
size = lb - fb
fo = RangeableFileObject(fo, (fb, lb))
headers = mimetools.Message(
StringIO(
"Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n" % (mtype or "text/plain", size, modified)
)
)
return urllib.addinfourl(fo, headers, "file:" + file)
开发者ID:Lukc,项目名称:ospace-lukc,代码行数:33,代码来源:byterange.py
示例11: open_local_file
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.get_host()
filename = req.get_selector()
localfile = url2pathname(filename)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
headers = mimetools.Message(
StringIO(
"Content-type: %s\nContent-length: %d\nLast-modified: %s\n"
% (mtype or "text/plain", size, modified)
)
)
if host:
host, port = splitport(host)
if not host or (not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = "file://" + host + filename
else:
origurl = "file://" + filename
return addinfourl(open(localfile, "rb"), headers, origurl)
except OSError, msg:
# urllib2 users shouldn't expect OSErrors coming from urlopen()
raise URLError(msg)
开发者ID:JoJoBond,项目名称:The-Powder-Toy,代码行数:29,代码来源:urllib2.py
示例12: open_local_file
def open_local_file(self, req):
try:
import email.utils as emailutils
except ImportError:
# python 2.4
import email.Utils as emailutils
import mimetypes
host = req.get_host()
file = req.get_selector()
localfile = url2pathname(file)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = emailutils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(file)[0]
headers = mimetools.Message(StringIO(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified)))
if host:
host, port = splitport(host)
if not host or \
(not port and socket.gethostbyname(host) in self.get_names()):
return addinfourl(open(localfile, 'rb'),
headers, 'file:'+file)
except OSError, msg:
# urllib2 users shouldn't expect OSErrors coming from urlopen()
raise URLError(msg)
开发者ID:ufal,项目名称:lindat-aai-shibbie,代码行数:27,代码来源:_urllib2_fork.py
示例13: open
def open(self, fullurl, data=None, method=None):
"""Use URLopener().open(file) instead of open(file, 'r')."""
fullurl = unwrap(toBytes(fullurl))
# percent encode url, fixing lame server errors for e.g, like space
# within url paths.
fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
if self.tempcache and fullurl in self.tempcache:
filename, headers = self.tempcache[fullurl]
fp = open(filename, 'rb')
return addinfourl(fp, headers, fullurl)
urltype, url = splittype(fullurl)
if not urltype:
urltype = 'file'
if urltype in self.proxies:
proxy = self.proxies[urltype]
urltype, proxyhost = splittype(proxy)
host, selector = splithost(proxyhost)
url = (host, fullurl) # Signal special case to open_*()
else:
proxy = None
name = 'open_' + urltype
self.type = urltype
name = name.replace('-', '_')
if not hasattr(self, name):
if proxy:
return self.open_unknown_proxy(proxy, fullurl, data)
else:
return self.open_unknown(fullurl, data)
try:
return getattr(self, name)(url, data, method)
except socket.error, msg:
raise IOError, ('socket error', msg), sys.exc_info()[2]
开发者ID:croot,项目名称:buster-bot,代码行数:32,代码来源:opener.py
示例14: decode
def decode (page):
"gunzip or deflate a compressed page"
#print page.info().headers
encoding = page.info().get("Content-Encoding")
if encoding in ('gzip', 'x-gzip', 'deflate'):
from cStringIO import StringIO
# cannot seek in socket descriptors, so must get content now
content = page.read()
if encoding == 'deflate':
import zlib
fp = StringIO(zlib.decompress(content))
else:
import gzip
fp = gzip.GzipFile('', 'rb', 9, StringIO(content))
# remove content-encoding header
headers = httplib.HTTPMessage(StringIO(""))
ceheader = re.compile(r"(?i)content-encoding:")
for h in page.info().keys():
if not ceheader.match(h):
headers[h] = page.info()[h]
newpage = urllib.addinfourl(fp, headers, page.geturl())
# Propagate code, msg through
if hasattr(page, 'code'):
newpage.code = page.code
if hasattr(page, 'msg'):
newpage.msg = page.msg
return newpage
return page
开发者ID:ideamonk,项目名称:apt-offline,代码行数:28,代码来源:AptOffline_urlutils.py
示例15: do_open
def do_open(self, http_class, req):
host = req.get_host()
if not host:
raise URLError('no host given')
try:
h = http_class(host) # will parse host:port
if req.has_data():
data = req.get_data()
h.putrequest('POST', req.get_selector())
if not req.headers.has_key('Content-type'):
h.putheader('Content-type',
'application/x-www-form-urlencoded')
if not req.headers.has_key('Content-length'):
h.putheader('Content-length', '%d' % len(data))
else:
h.putrequest('GET', req.get_selector())
except socket.error(err):
raise URLError(err)
h.putheader('Host', host)
for args in self.parent.addheaders:
h.putheader(*args)
for k, v in req.headers.items():
h.putheader(k, v)
h.endheaders()
if req.has_data():
h.send(data)
code, msg, hdrs = h.getreply()
fp = h.getfile()
if code == 200:
return addinfourl(fp, hdrs, req.get_full_url())
else:
return self.parent.error('http', req, fp, code, msg, hdrs)
开发者ID:rallen71366,项目名称:jawfish,代码行数:35,代码来源:urllib2.py
示例16: open_local_file
def open_local_file(self, req):
import mimetypes
import email
host = req.get_host()
file = req.get_selector()
localfile = urllib.url2pathname(file)
stats = os.stat(localfile)
size = stats[stat.ST_SIZE]
modified = email.Utils.formatdate(stats[stat.ST_MTIME])
mtype = mimetypes.guess_type(file)[0]
if host:
host, port = urllib.splitport(host)
if port or socket.gethostbyname(host) not in self.get_names():
raise urllib2.URLError('file not on local host')
fo = open(localfile,'rb')
brange = req.headers.get('Range', None)
brange = range_header_to_tuple(brange)
assert brange != ()
if brange:
(fb, lb) = brange
if lb == '':
lb = size
if fb < 0 or fb > size or lb > size:
raise RangeError('Requested Range Not Satisfiable')
size = (lb - fb)
fo = RangeableFileObject(fo, (fb, lb))
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-Modified: %s\n' %
(mtype or 'text/plain', size, modified))
return urllib.addinfourl(fo, headers, 'file:'+file)
开发者ID:MezzLabs,项目名称:mercurial,代码行数:30,代码来源:byterange.py
示例17: text_to_twill
def text_to_twill(self, text):
"""
Wrap text to work with Twill.
"""
headers_msg = 'Content: text-plain; encoding=utf-8\n'
headers_msg = StringIO(headers_msg)
headers = httplib.HTTPMessage(headers_msg)
status_code = 200
url = 'text://'
io_response = StringIO(text)
urllib_response = addinfourl(io_response,
headers,
url,
status_code)
urllib_response._headers = headers
urllib_response._url = url
urllib_response.msg = u'OK'
urllib_response.seek = urllib_response.fp.seek
self.get_browser()._browser._factory.set_response(urllib_response)
self.get_browser().result = ResultWrapper(status_code, url, text)
self._apply_xhtml()
开发者ID:bloodpet,项目名称:tddspry,代码行数:25,代码来源:cases.py
示例18: _make_response
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
开发者ID:yehuaqing,项目名称:pweb,代码行数:7,代码来源:browser.py
示例19: http_error_302
def http_error_302(self, req, fp, code, msg, headers):
import urllib
infourl = urllib.addinfourl(fp, headers, headers["Location"])
infourl.status = code
infourl.code = code
return infourl
开发者ID:zorg1977,项目名称:plugin.video.quasar,代码行数:7,代码来源:navigation.py
示例20: response_to_twill
def response_to_twill(self, response):
"""
Wrap Django response to work with Twill.
"""
path = response.request.get('PATH_INFO')
url = path and SITE + path.lstrip('/') or path
headers_msg = '\n'.join('%s: %s' % (k, v) for k, v in response.items())
headers_msg = StringIO(headers_msg)
headers = httplib.HTTPMessage(headers_msg)
io_response = StringIO(response.content)
urllib_response = addinfourl(io_response,
headers,
url,
response.status_code)
urllib_response._headers = headers
urllib_response._url = url
urllib_response.msg = u'OK'
urllib_response.seek = urllib_response.fp.seek
self.get_browser()._browser._set_response(urllib_response, False)
self.get_browser().result = ResultWrapper(response.status_code,
url,
response.content)
self._apply_xhtml()
开发者ID:bloodpet,项目名称:tddspry,代码行数:27,代码来源:cases.py
注:本文中的urllib.addinfourl函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论