本文整理汇总了Python中urllib2.Request类的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: modified_run
def modified_run(self):
import sys
try:
try:
from urllib2 import HTTPHandler, build_opener
from urllib2 import urlopen, Request
from urllib import urlencode
except ImportError:
from urllib.request import HTTPHandler, build_opener
from urllib.request import urlopen, Request
from urllib.parse import urlencode
os_ver = platform.system()
py_ver = "_".join(str(x) for x in sys.version_info)
now_ver = __version__.replace(".", "_")
code = "os:{0},py:{1},now:{2}".format(os_ver, py_ver, now_ver)
action = command_subclass.action
cid = getnode()
payload = {"v": "1", "tid": "UA-61791314-1", "cid": str(cid), "t": "event", "ec": action, "ea": code}
url = "http://www.google-analytics.com/collect"
data = urlencode(payload).encode("utf-8")
request = Request(url, data=data)
request.get_method = lambda: "POST"
connection = urlopen(request)
except:
pass
orig_run(self)
开发者ID:hugobowne,项目名称:noworkflow,代码行数:30,代码来源:setup.py
示例2: hit_endpoint
def hit_endpoint(self, url, data_dict={}, http_method='GET'):
"""
A reusable method that actually performs the request to the specified Atlas API endpoint.
"""
if self.verbose == 'true':
print "HIT_ENDPOINT"
data_dict.update({ "access_token" : self.access_token })
if self.verbose == 'true':
print " Added access_token to data_dict (inside hit_endpoint)"
if self.verbose == 'true':
print " Constructing request URL"
request = Request(url, urllib.urlencode(data_dict))
if self.verbose == 'true':
print " Setting request http_method: %s" % http_method
request.get_method = lambda: http_method
try:
if self.verbose == 'true':
print " Opening Request URL: %s?%s" % (request.get_full_url(),request.get_data())
response = urlopen(request)
except URLError, e:
raise SystemExit(e)
开发者ID:bkyoung,项目名称:sysadmin-utils,代码行数:26,代码来源:atlas-uploader.py
示例3: report_now
def report_now(self, registry=None, timestamp=None):
if self.autocreate_database and not self._did_create_database:
self._create_database()
timestamp = timestamp or int(round(self.clock.time()))
metrics = (registry or self.registry).dump_metrics()
post_data = []
for key, metric_values in metrics.items():
if not self.prefix:
table = key
else:
table = "%s.%s" % (self.prefix, key)
values = ",".join(["%s=%s" % (k, v if type(v) is not str \
else '"{}"'.format(v))
for (k, v) in metric_values.items()])
line = "%s %s %s" % (table, values, timestamp)
post_data.append(line)
post_data = "\n".join(post_data)
path = "/write?db=%s&precision=s" % self.database
url = "%s://%s:%s%s" % (self.protocol, self.server, self.port, path)
request = Request(url, post_data.encode("utf-8"))
if self.username:
auth = _encode_username(self.username, self.password)
request.add_header("Authorization", "Basic %s" % auth.decode('utf-8'))
try:
response = urlopen(request)
response.read()
except URLError as err:
LOG.warning("Cannot write to %s: %s",
self.server, err.reason)
开发者ID:hajs,项目名称:pyformance,代码行数:29,代码来源:influx.py
示例4: test_http_doubleslash
def test_http_doubleslash(self):
# Checks that the presence of an unnecessary double slash in a url doesn't break anything
# Previously, a double slash directly after the host could cause incorrect parsing of the url
h = urllib2.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html",
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
开发者ID:005,项目名称:gevent,代码行数:25,代码来源:test_urllib2.py
示例5: _send_webhook_msg
def _send_webhook_msg(self, ip, port, payload_str, url_path='',
content_len=-1, content_type='application/json',
get_method=None):
headers = {
'content-type': content_type,
}
if not payload_str:
content_len = None
payload = None
else:
payload = bytes(payload_str, encoding='utf-8')
if content_len == -1:
content_len = len(payload)
if content_len is not None:
headers['content-length'] = str(content_len)
url = 'http://{ip}:{port}/{path}'.format(ip=ip, port=port,
path=url_path)
req = Request(url, data=payload, headers=headers)
if get_method is not None:
req.get_method = get_method
return urlopen(req)
开发者ID:MikeVister,项目名称:python-telegram-bot,代码行数:29,代码来源:test_updater.py
示例6: test_compare_triples
def test_compare_triples():
for mime, fext in MIME_TYPES.items():
dump_path = path.join(DUMP_DIR, path.basename(mime))
for url in URLs:
if six.PY2:
fname = '%s.%s' % (path.basename(urlparse.urlparse(url).path), fext)
else:
fname = '%s.%s' % (path.basename(urlparse(url).path), fext)
fname = path.join(dump_path, fname)
req = Request(url)
req.add_header('Accept', mime)
res = urlopen(req)
g_fdp.parse(data=res.read(), format=mime)
g_dump.parse(fname, format=mime)
both, first, second = graph_diff(g_fdp, g_dump)
n_first = len(first)
# n_second = len(second)
# n_both = len(both)
assert_equals(
n_first, 0, '{} triple(s) different from reference:\n\n{}===\n{}\n'.format(
n_first, first.serialize(format='turtle'), second.serialize(format='turtle')))
开发者ID:NLeSC,项目名称:ODEX-FAIRDataPoint,代码行数:27,代码来源:test_fdp.py
示例7: get
def get(self, bugid):
"""Get an ElementTree representation of a bug.
@param bugid: bug id
@type bugid: int
@rtype: ElementTree
"""
if not self.authenticated and not self.skip_auth:
self.auth()
qparams = config.params['show'].copy()
qparams['id'] = bugid
req_params = urlencode(qparams, True)
req_url = urljoin(self.base, config.urls['show'])
req_url += '?' + req_params
req = Request(req_url, None, config.headers)
if self.httpuser and self.httppassword:
base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
req.add_header("Authorization", "Basic %s" % base64string)
resp = self.opener.open(req)
fd = StringIO(resp.read())
# workaround for ill-defined XML templates in bugzilla 2.20.2
parser = ForcedEncodingXMLTreeBuilder(encoding = 'utf-8')
etree = ElementTree.parse(fd, parser)
bug = etree.find('.//bug')
if bug and bug.attrib.has_key('error'):
return None
else:
return etree
开发者ID:PabloCastellano,项目名称:pybugz,代码行数:32,代码来源:bugzilla.py
示例8: fetch
def fetch(url, auth=None):
"""Fetch URL, optional with HTTP Basic Authentication."""
if not (url.startswith('http://') or url.startswith('https://')):
try:
with io.open(url, 'r', encoding='utf-8', errors='replace') as fp:
return u''.join(fp.readlines())
except OSError as e:
raise AcrylamidException(e.args[0])
req = Request(url)
if auth:
req.add_header('Authorization', 'Basic ' + b64encode(auth))
try:
r = urlopen(req)
except HTTPError as e:
raise AcrylamidException(e.msg)
if r.getcode() == 401:
user = input('Username: ')
passwd = getpass.getpass()
fetch(url, user + ':' + passwd)
elif r.getcode() == 200:
try:
enc = re.search('charset=(.+);?', r.headers.get('Content-Type', '')).group(1)
except AttributeError:
enc = 'utf-8'
return u'' + r.read().decode(enc)
raise AcrylamidException('invalid status code %i, aborting.' % r.getcode())
开发者ID:DebVortex,项目名称:acrylamid,代码行数:31,代码来源:imprt.py
示例9: __init__
def __init__(self, url, method="GET", headers=None, data=None,
downloadTo=None, closeConnection=False, proxy=None,
redirectedFrom=None, unredirectedHeaders=None, **kw):
"""
"""
headers = headers or dict()
urllib2_Request.__init__(
self, str(url), data=data, headers=headers,
origin_req_host=kw.get("origin_req_host", redirectedFrom),
unverifiable=kw.get("unverifiable", False),
)
Message.__init__(self, url, method, self.headers)
self.host = self._url.host
self.port = self._url.port
self.setProxy(proxy)
assert isinstance(self.headers, util.InsensitiveDict)
unredirectedHeaders = unredirectedHeaders or dict()
self.unredirectedHeaders = util.InsensitiveDict(unredirectedHeaders)
self.closeConnection = closeConnection is True
self.downloadTo = downloadTo
self.redirectedTo = None
self.redirectedFrom = tuple()
self.response = defer.Deferred()
开发者ID:alexstaytuned,项目名称:tx-pendrell,代码行数:26,代码来源:messages.py
示例10: makeRequest
def makeRequest(self, method, path, params=None):
if not params:
params = {}
params['key'] = self.api_key
params['token'] = self.oauth_token
url = self.base_url + path
data = None
if method == 'GET':
url += '?' + urlencode(params)
elif method in ['DELETE', 'POST', 'PUT']:
data = urlencode(params).encode('utf-8')
request = Request(url)
if method in ['DELETE', 'PUT']:
request.get_method = lambda: method
try:
if data:
response = urlopen(request, data=data)
else:
response = urlopen(request)
except HTTPError as e:
print(e)
print(e.read())
result = None
else:
result = json.loads(response.read().decode('utf-8'))
return result
开发者ID:bdoms,项目名称:trello,代码行数:31,代码来源:__init__.py
示例11: auth
def auth(self):
"""Authenticate a session.
"""
if self.try_auth():
return
# prompt for username if we were not supplied with it
if not self.user:
self.log('No username given.')
self.user = self.get_input('Username: ')
# prompt for password if we were not supplied with it
if not self.password:
self.log('No password given.')
self.password = getpass.getpass()
# perform login
qparams = config.params['auth'].copy()
qparams['Bugzilla_login'] = self.user
qparams['Bugzilla_password'] = self.password
req_url = urljoin(self.base, config.urls['auth'])
req = Request(req_url, urlencode(qparams), config.headers)
if self.httpuser and self.httppassword:
base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
req.add_header("Authorization", "Basic %s" % base64string)
resp = self.opener.open(req)
if resp.info().has_key('Set-Cookie'):
self.authenticated = True
if not self.forget:
self.cookiejar.save()
os.chmod(self.cookiejar.filename, 0700)
return True
else:
raise RuntimeError("Failed to login")
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:35,代码来源:bugzilla.py
示例12: try_auth
def try_auth(self):
"""Check whether the user is already authenticated."""
if self.authenticated:
return True
# try seeing if we really need to request login
if not self.forget:
try:
self.cookiejar.load()
except IOError:
pass
req_url = urljoin(self.base, config.urls['auth'])
req_url += '?GoAheadAndLogIn=1'
req = Request(req_url, None, config.headers)
if self.httpuser and self.httppassword:
base64string = base64.encodestring('%s:%s' % (self.httpuser, self.httppassword))[:-1]
req.add_header("Authorization", "Basic %s" % base64string)
resp = self.opener.open(req)
re_request_login = re.compile(r'<title>.*Log in to Bugzilla</title>')
if not re_request_login.search(resp.read()):
self.log('Already logged in.')
self.authenticated = True
return True
return False
开发者ID:dhirajkhatiwada1,项目名称:uludag,代码行数:25,代码来源:bugzilla.py
示例13: download_checked
def download_checked(cls, url, target_file, expected_digest):
if os.path.exists(target_file):
# file already exists, see if we can use it.
if PBFile.md5_digest(target_file) == expected_digest:
# local file is ok
return
else:
os.unlink(target_file)
user_agent = ("pbunder/%s " % (cls.my_version) +
"(http://github.com/zeha/pbundler/issues)")
try:
req = Request(url)
req.add_header("User-Agent", user_agent)
req.add_header("Accept", "*/*")
with file(target_file, 'wb') as f:
sock = urlopen(req)
try:
f.write(sock.read())
finally:
sock.close()
except Exception as ex:
raise PBundlerException("Downloading %s failed (%s)" % (url, ex))
local_digest = PBFile.md5_digest(target_file)
if local_digest != expected_digest:
os.unlink(target_file)
msg = ("Downloading %s failed (MD5 Digest %s did not match expected %s)" %
(url, local_digest, expected_digest))
raise PBundlerException(msg)
else:
# local file is ok
return
开发者ID:cirnatdan,项目名称:pbundler,代码行数:35,代码来源:util.py
示例14: _download_as_child
def _download_as_child(url, if_modified_since):
from httplib import HTTPException
from urllib2 import urlopen, Request, HTTPError, URLError
try:
#print "Child downloading", url
if url.startswith('http:') or url.startswith('https:') or url.startswith('ftp:'):
req = Request(url)
if url.startswith('http:') and if_modified_since:
req.add_header('If-Modified-Since', if_modified_since)
src = urlopen(req)
else:
raise Exception(_('Unsupported URL protocol in: %s') % url)
try:
sock = src.fp._sock
except AttributeError:
sock = src.fp.fp._sock # Python 2.5 on FreeBSD
while True:
data = sock.recv(256)
if not data: break
os.write(1, data)
sys.exit(RESULT_OK)
except (HTTPError, URLError, HTTPException) as ex:
if isinstance(ex, HTTPError) and ex.code == 304: # Not modified
sys.exit(RESULT_NOT_MODIFIED)
print >>sys.stderr, "Error downloading '" + url + "': " + (str(ex) or str(ex.__class__.__name__))
sys.exit(RESULT_FAILED)
开发者ID:gvsurenderreddy,项目名称:zeroinstall,代码行数:28,代码来源:_download_child.py
示例15: parseFeed
def parseFeed(cls, url, lastModified=None, etag=None):
'''
Return tuple of feed object, last-modified, etag.
'''
req = Request(normalize_url(url))
if lastModified:
req.add_header('if-modified-since', lastModified)
if etag:
req.add_header('if-none-match', etag)
resp = None
try:
resp = urlopen(req, None, 10)
except HTTPError as error:
# HTTP 304 not modifed raise an exception
resp = error
# url of local file returns empty code
if resp.code and resp.code != 200:
return None
feedDoc = etree.parse(resp)
feedType = None
for ft in cls._feedTypes:
if ft.accept(feedDoc):
feedType = ft
break
if not feedType:
raise ValueError('Cannot handle ' + feedDoc.getroot().tag)
return (feedType(url, feedDoc),
resp.headers.get('last-modified'),
resp.headers.get('etag'))
开发者ID:goncha,项目名称:feed2mobi,代码行数:31,代码来源:feed.py
示例16: openURL
def openURL(url_base, data, method='Get', cookies=None, username=None, password=None):
''' function to open urls - wrapper around urllib2.urlopen but with additional checks for OGC service exceptions and url formatting, also handles cookies and simple user password authentication'''
url_base.strip()
lastchar = url_base[-1]
if lastchar not in ['?', '&']:
if url_base.find('?') == -1:
url_base = url_base + '?'
else:
url_base = url_base + '&'
if username and password:
# Provide login information in order to use the WMS server
# Create an OpenerDirector with support for Basic HTTP
# Authentication...
passman = HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, url_base, username, password)
auth_handler = HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(auth_handler)
openit = opener.open
else:
openit = urlopen
try:
if method == 'Post':
req = Request(url_base, data)
else:
req=Request(url_base + data)
if cookies is not None:
req.add_header('Cookie', cookies)
u = openit(req)
except HTTPError, e: #Some servers may set the http header to 400 if returning an OGC service exception or 401 if unauthorised.
if e.code in [400, 401]:
raise ServiceException, e.read()
else:
raise e
开发者ID:sabman,项目名称:OWSLib,代码行数:35,代码来源:util.py
示例17: get_page_urls
def get_page_urls(url, user_agent=None):
req = Request(url)
if user_agent:
req.add_header('User-Agent', user_agent)
response = urlopen(req)
urls = REGEX_URLS.findall(str(response.read()))
return set(url[0] for url in urls)
开发者ID:afraca,项目名称:cryptophp,代码行数:7,代码来源:check_url.py
示例18: howmuch
def howmuch(loc_param, date_param, apt, pyung):
res=''
data=[]
start = time.time()
request = Request(url+'&LAWD_CD='+loc_param+'&DEAL_YMD='+date_param)
request.get_method = lambda: 'GET'
try:
res_body = urlopen(request).read()
except:
return '',[], traceback.format_exc().splitlines()[-1]
end = time.time()
soup = BeautifulSoup(res_body, 'html.parser')
items = soup.findAll('item')
for item in items:
item = item.text.encode('utf-8')
item = re.sub('<.*?>', '|', item)
parsed = item.split('|')
try:
#res = parsed[3]+' '+parsed[4]+', '+parsed[7]+'m², '+parsed[9]+'F, '+parsed[1].strip()+'만원\n'
if parsed[7]==pyung and parsed[4].find(apt)>-1:
res+='<tr><td>'+parsed[2]+'/'+parsed[5]+'/'+parsed[6]+'</td><td>'+parsed[3]+' '+parsed[4]+'</td><td>'+parsed[7]+'</td><td>'+parsed[9]+'</td><td>'+parsed[1]+'</td></tr>'
data.append( (parsed[2],parsed[5],parsed[6], parsed[1]) )
except IndexError:
continue
return res, data, None, int((end-start)*1000)
开发者ID:alotofthings,项目名称:realestate-bot,代码行数:28,代码来源:r.py
示例19: namedcmd
def namedcmd(self, cmd):
"""Run command stored in Bugzilla by name.
@return: Result from the stored command.
@rtype: list of dicts
"""
if not self.authenticated and not self.skip_auth:
self.auth()
qparams = config.params['namedcmd'].copy()
# Is there a better way of getting a command with a space in its name
# to be encoded as foo%20bar instead of foo+bar or foo%2520bar?
qparams['namedcmd'] = quote(cmd)
req_params = urlencode(qparams, True)
req_params = req_params.replace('%25','%')
req_url = urljoin(self.base, config.urls['list'])
req_url += '?' + req_params
req = Request(req_url, None, config.headers)
if self.user and self.hpassword:
base64string = base64.encodestring('%s:%s' % (self.user, self.hpassword))[:-1]
req.add_header("Authorization", "Basic %s" % base64string)
resp = self.opener.open(req)
return self.extractResults(resp)
开发者ID:PabloCastellano,项目名称:pybugz,代码行数:26,代码来源:bugzilla.py
示例20: _uploadBatch
def _uploadBatch(self, batch):
"""
Uploads a batch to the server
"""
url = "%s/ImportXML" % self._url
self._logger.debug('getting a batch')
tstart = time.time()
# get a batch
self._logger.info('Generating metadata')
data = self._agent._getMetadata(batch, logger=self._logger)
self._logger.info('Metadata ready ')
postData = {
'xml': data
}
tgen = time.time() - tstart
req = Request(url)
# remove line break
cred = base64.encodestring(
'%s:%s' % (self._username, self._password)).strip()
req.add_header("Authorization", "Basic %s" % cred)
try:
result = urlopen(req, data=urlencode(postData))
except HTTPError, e:
self._logger.exception("Status %s: \n %s" % (e.code, e.read()))
raise Exception('upload failed')
开发者ID:aninhalacerda,项目名称:indico,代码行数:34,代码来源:agent.py
注:本文中的urllib2.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论