本文整理汇总了Python中urllib.request.add_header函数的典型用法代码示例。如果您正苦于以下问题:Python add_header函数的具体用法?Python add_header怎么用?Python add_header使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了add_header函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: post
def post(build: Build):
if not SLACK_NOTIFICATION and SLACK_NOTIFICATION_URL and AUR_PACKAGER_BASE_URL:
return
detail_url = AUR_PACKAGER_BASE_URL + str(reverse_lazy('manager:build_detail',
kwargs={'package_name': build.package.name,
'build_number': 1}))
base = '<{}|{}> {}: <{}|{}>'.format(
package_url(aur_server_tag=build.package.server, package_name=build.package.name), build.package.name,
build.version, detail_url, build.status)
if build.status == Build.SUCCESS:
emoji = ':+1:'
sha256s = json.loads(build.sha256)
artifacts = []
for artifact in Artifact.objects.filter(package=build.package):
download_url = AUR_PACKAGER_BASE_URL + str(reverse_lazy('manager:build_download',
kwargs={'package_name': artifact.name,
'build_number': 1}))
sha256 = sha256s[artifact.name]
s = '<{}|:arrow_down: {}> sha256: {}'.format(download_url, artifact.name, sha256)
artifacts.append(s)
text = '\n'.join([base] + artifacts)
else:
emoji = ':ghost:'
text = base
name = '{}: {} {}'.format(build.status, build.package.name, build.version)
data = {'text': text, 'username': name, 'icon_emoji': emoji}
request = urllib.request.Request(SLACK_NOTIFICATION_URL)
request.add_header('Content-type', 'application/json')
try:
urllib.request.urlopen(request, json.dumps(data).encode())
except urllib.error.URLError:
pass
开发者ID:colajam93,项目名称:aurpackager,代码行数:34,代码来源:slack.py
示例2: get_blob
def get_blob(self, thread_id, blob_id):
"""Return a file-like object with the contents of the given blob.
The object is described in detail here:
https://docs.python.org/2/library/urllib2.html#urllib2.urlopen
"""
request = urllib.request.Request(
url=self._url("blob/%s/%s" % (thread_id, blob_id)))
if self.access_token:
request.add_header("Authorization", "Bearer " + self.access_token)
try:
return urllib.request.urlopen(request,
timeout=self.request_timeout)
except urllib.request.HTTPError as error:
try:
# Extract the developer-friendly error message
message = json.loads(error.read())["error_description"]
except Exception:
raise error
if (self.retry_rate_limit and error.code == 503 and
message == "Over Rate Limit"):
# Retry later.
reset_time = float(error.headers.get("X-RateLimit-Reset"))
delay = max(2, reset_time - time.time() + 1)
logging.warning("Rate Limit, delaying for %d seconds" % delay)
time.sleep(delay)
return self.get_blob(thread_id, blob_id)
else:
raise QuipError(error.code, message, error)
开发者ID:THADEUSH123,项目名称:quip-python,代码行数:29,代码来源:__init__.py
示例3: __send_xml_str
def __send_xml_str(self, xml_str):
logger.debug("Sending: %s" % xml_str)
xml_data = urllib.parse.urlencode({'XML': xml_str})
request = urllib.request.Request(self.door_url(), xml_data)
base64string = base64.encodestring('%s:%s' % (self.door_user, self.door_pass)).replace('\n', '')
request.add_header("Authorization", "Basic %s" % base64string)
context = ssl._create_unverified_context()
context.set_ciphers('RC4-SHA')
self.lock.acquire()
try:
result = urllib.request.urlopen(request, context=context)
return_code = result.getcode()
return_xml = result.read()
result.close()
finally:
self.lock.release()
logger.debug("Response code: %d" % return_code)
logger.debug("Response: %s" % return_xml)
if return_code != 200:
raise Exception("Did not receive 200 return code")
error = get_attribute(return_xml, "errorMessage")
if error:
raise Exception("Received an error: %s" % error)
return return_xml
开发者ID:nadineproject,项目名称:nadine,代码行数:28,代码来源:hid_control.py
示例4: _defaultFetcher
def _defaultFetcher(url):
"""Retrieve data from ``url``. cssutils default implementation of fetch
URL function.
Returns ``(encoding, string)`` or ``None``
"""
try:
request = urllib.request.Request(url)
request.add_header('User-agent',
'cssutils %s (http://www.cthedot.de/cssutils/)' % VERSION)
res = urllib.request.urlopen(request)
except urllib.error.HTTPError as e:
# http error, e.g. 404, e can be raised
log.warn('HTTPError opening url=%s: %s %s' %
(url, e.code, e.msg), error=e)
except urllib.error.URLError as e:
# URLError like mailto: or other IO errors, e can be raised
log.warn('URLError, %s' % e.reason, error=e)
except OSError as e:
# e.g if file URL and not found
log.warn(e, error=OSError)
except ValueError as e:
# invalid url, e.g. "1"
log.warn('ValueError, %s' % e.args[0], error=ValueError)
else:
if res:
mimeType, encoding = encutils.getHTTPInfo(res)
if mimeType != 'text/css':
log.error('Expected "text/css" mime type for url=%r but found: %r' %
(url, mimeType), error=ValueError)
content = res.read()
if hasattr(res, 'close'):
res.close()
return encoding, content
开发者ID:CudaText-addons,项目名称:cuda_css_prefixer,代码行数:34,代码来源:_fetch.py
示例5: query
def query(resource, mbid, includes=[]):
"""Queries MusicBrainz' web service for *resource* with *mbid* and the given list of includes.
Returns an LXML ElementTree root node. All namespaces are removed from the result.
"""
url = '{}/{}/{}'.format(wsURL, resource, mbid)
if queryCallback:
queryCallback(url)
if len(includes) > 0:
url += '?inc={}'.format('+'.join(includes))
logging.debug(__name__, 'querying {}'.format(url))
ans = db.query("SELECT xml FROM {}musicbrainzqueries WHERE url=?".format(db.prefix), url)
try:
data = ans.getSingle()
except db.EmptyResultException:
try:
request = urllib.request.Request(url)
request.add_header('User-Agent',
'Maestro/0.4.0 (https://github.com/maestromusic/maestro)')
with urllib.request.urlopen(request) as response:
data = response.read()
except urllib.error.HTTPError as e:
if e.code == 404:
raise e
else:
raise ConnectionError(e.msg)
db.query("INSERT INTO {}musicbrainzqueries (url, xml) VALUES (?,?)"
.format(db.prefix), url, data)
root = etree.fromstring(data)
# remove namespace tags
for node in root.iter():
if node.tag.startswith('{'):
node.tag = node.tag.rsplit('}', 1)[-1]
return root
开发者ID:maestromusic,项目名称:maestro,代码行数:34,代码来源:xmlapi.py
示例6: glsrequest
def glsrequest(uri, method, data=None):
'''
Returns xml node tree as Element instance.
'uri' may be absolute or relative to _BASEURI.
'method' in ('GET', 'POST', 'PUT')
'data' can be a string or Element instance
'''
if method not in {'GET', 'POST', 'PUT'}:
raise GlslibException(MSGUNSUPPORTEDMETHOD % method)
if not uri.startswith(_BASEURI):
uri = _BASEURI.rstrip('/') + '/' + uri.lstrip('/')
request = urllib.request.Request(uri)
request.add_header("Authorization", "Basic %s" % _AUTHSTR)
if etree.iselement(data):
# tostring generates bytestring (as required for data)
data = etree.tostring(data)
request.add_header('Content-Type', 'application/xml')
request.add_data(data)
request.get_method = lambda: method
msg = '%s %s\n%s\n%s' % (request.get_method(),
request.get_full_url(),
request.headers,
data.decode('utf-8') if data else '')
logger.debug(msg)
try:
r = urllib.request.urlopen(request)
return etree.XML(r.read())
except urllib.error.HTTPError as httperr:
logger.error(httperr.read())
raise
except urllib.error.URLError as urlerr:
logger.error(request.get_full_url())
raise
开发者ID:delocalizer,项目名称:GenoLogics-LIMS-QCMG-API-Python-Tools,代码行数:34,代码来源:glslib.py
示例7: get_recent_jobs
def get_recent_jobs(self, n_jobs=10):
"""
Returns the user's N most recently submitted jobs on the GenePattern server.
Args: If not specified, n_jobs = 10.
Returns: An array of GPJob objects.
"""
# Query the server for the list of jobs
request = urllib.request.Request(self.url + '/rest/v1/jobs/?pageSize=' +
str(n_jobs) + '&userId=' + str(urllib.parse.quote(self.username)) +
'&orderBy=-dateSubmitted')
if self.authorization_header() is not None:
request.add_header('Authorization', self.authorization_header())
request.add_header('User-Agent', 'GenePatternRest')
response = urllib.request.urlopen(request)
response_string = response.read().decode('utf-8')
response_json = json.loads(response_string)
# For each job in the JSON Array, build a GPJob object and add to the job list
job_list = []
for job_json in response_json['items']:
job_id = job_json['jobId']
job = GPJob(self, job_id)
job.info = job_json
job.load_info()
job_list.append(job)
return job_list
开发者ID:genepattern,项目名称:genepattern-python,代码行数:30,代码来源:core.py
示例8: sendRequest
def sendRequest(self, path, data = {}, token = True, post = True, headers = {}):
response = None
request = None
if post:
headers['Content-Type'] = 'application/xml; charset=UTF-8'
if token:
headers['Authorization'] = '%s' % self.token
try:
if post:
request = urllib.request.Request(self.apiURL+path, data.encode('utf8'))
elif len(data) == 0:
request = urllib.request.Request(self.apiURL+path)
else:
print('I have data in sendRequest but i don\'t know what i should do with it :D')
if request is not None:
for k,v in headers.items():
request.add_header(k, v)
response = urllib.request.urlopen(request)
except urllib.error.HTTPError as e:
print('Error while requesting API call: %s (%s)' % (e.msg, e.code))
print('URL: %s' % (self.apiURL+path))
except urllib.error.URLError as e:
print('Error while requesting API call: %s' % (e.reason))
return response
开发者ID:HonestQiao,项目名称:SugarSync-Python-Client,代码行数:30,代码来源:SugarSync.py
示例9: _request
def _request(self, method, url, get=None, post=None, auth=False):
if get:
url = "{}?{}".format(url, urllib.parse.urlencode(get))
if post:
post = urllib.parse.urlencode(post).encode('utf-8')
request = urllib.request.Request(self.url + url, post)
request.get_method = lambda: method
if auth:
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
request.add_header('Authorization', '{0} {1}'.format(
self._get_userconfig('token_type').capitalize(),
self._get_userconfig('access_token'),
))
try:
response = self.opener.open(request, timeout = 10)
return json.loads(response.read().decode('utf-8'))
except urllib.request.HTTPError as e:
if e.code == 400:
raise utils.APIError("Invalid PIN. It is either probably expired or meant for another application.")
else:
raise utils.APIError("Connection error: %s" % e)
except socket.timeout:
raise utils.APIError("Connection timed out.")
开发者ID:ToostInc,项目名称:wmal-python,代码行数:26,代码来源:libanilist.py
示例10: retrieve_page
def retrieve_page(dbinfo, url):
"""
Retrieve a web page, with retries if necessary.
"""
crawl_delay = CRAWL_DELAY
html = ''
attempt = 1
while True:
try:
request = urllib.request.Request(url)
request.add_header(
'User-Agent',
('Mozilla/5.0 (Macintosh; Intel Mac OS X 10.8; rv:21.0) ' +
'Gecko/20100101 Firefox/21.0') )
request.add_header(
'Accept',
('text/html,application/xhtml+xml,application/xml;' +
'q=0.9,*/*;q=0.8') )
html = urllib.request.urlopen(request).read().decode('utf-8')
return html
except:
if attempt >= RETRY_ATTEMPTS:
log(dbinfo, 'ERROR',
'Error retrieving web page, too many retries: ' + url)
return None
else:
log(dbinfo, 'WARNING',
'Problem retrieving web page, retrying: ' + url)
sleep(crawl_delay)
crawl_delay = crawl_delay * 2
attempt += 1
开发者ID:anukat2015,项目名称:snac,代码行数:31,代码来源:retrieve_blog_posts_parallel.py
示例11: make_call
def make_call(api_url, query_args=None):
# api_url is expected to be the fully constructed URL, with any needed
# arguments appended.
# This function will simply make the call, and return the response as
# an ElementTree object for parsing. If response cannot be parsed
# because it is not valid XML, this function assumes an API error and
# raises an APIException, passing forward the pages contents (which
# generally gives some indication of the error.
if query_args is not None:
get_params = urlencode_no_plus.urlencode_no_plus(query_args)
request = urllib.request.Request(api_url + '%s' % get_params)
else:
request = urllib.request.Request(api_url)
# Added these readers to avoid some weird errors from the host.
request.add_header('Referer', 'http://thegamesdb.net/')
request.add_header('User-agent', 'Mozilla/5.0')
response = urllib.request.urlopen(request)
page = response.read()
# Make sure the XML Parser doesn't return a ParsError. If it does,
# it's probably and API Issue, so raise an exception, printing the
# response from the API call.
try:
xml_response = ET.fromstring(page)
except ET.ParseError:
raise APIException(page)
return xml_response
开发者ID:mabernardo,项目名称:python-gamesdb,代码行数:29,代码来源:api.py
示例12: get_credit
def get_credit(opener):
url = emuch_url + '/memcp.php?action=getcredit'
values = {'formhash': '2c8099cd',
'getmode': '1', #2
'message': '',
'creditsubmit': b'\xc1\xec\xc8\xa1\xba\xec\xb0\xfc' #u'领取红包'.encode('gbk')
}
data = urllib.parse.urlencode(values)
data = data.encode('utf-8')
request = urllib.request.Request(url, data)#, method='POST')
request.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8")
request.add_header('User-Agent', user_agent)
print_log('try to get credit...')
r = opener.open(request)
body = r.read().decode('gbk')
info = [b'\xb9\xa7\xcf\xb2\xa3\xa1\xc4\xe3\xbb\xf1\xb5\xc3'.decode('gbk'), #u'恭喜!你获得'
b'\xbd\xf1\xcc\xec\xb5\xc4\xba\xec\xb0\xfc\xa3\xac\xc4\xfa\xd2\xd1\xbe\xad\xc1\xec\xc8\xa1\xc1\xcb\xa3\xac\xd2\xbb\xcc\xec\xbe\xcd\xd2\xbb\xb4\xce\xbb\xfa\xbb\xe1'.decode('gbk'),#u'今天的红包,您已经领取了,一天就一次机会'
'',
]
msgs = ['get credit successfully!', 'can not get twice!', 'undefined error!']
#out_html(body, "get_credit")
for i, s in enumerate(info):
if s in body:
print_log(msgs[i])
return i
开发者ID:shouxi,项目名称:pywebrobot,代码行数:29,代码来源:emucher.py
示例13: get_msgbox
def get_msgbox(opener):
url = emuch_url + '/box.php'
request = urllib.request.Request(url)
request.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8")
request.add_header('User-Agent', user_agent)
return opener.open(request)
开发者ID:shouxi,项目名称:pywebrobot,代码行数:7,代码来源:emucher.py
示例14: login_emuch
def login_emuch(opener, username, passwd):
url = emuch_url + '/logging.php?action=login'
values = {'formhash': 'f6ac2e8a',
'referer': 'http://emuch.net/bbs/index.php',
'username': username,
'password': passwd,
'cookietime': '31536000',
'loginsubmit': b'\xbb\xe1\xd4\xb1\xb5\xc7\xc2\xbc' #u'会员登录'.encode('gbk')
}
data = urllib.parse.urlencode(values)
data = data.encode('utf-8')
request = urllib.request.Request(url, data)#, method='POST')
request.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8")
request.add_header('User-Agent', user_agent)
r = opener.open(request)
body = r.read().decode('gbk')
#out_html(body, 'login')
es = b'\xca\xe4\xc8\xeb\xb5\xc4\xd5\xca\xba\xc5\xc3\xdc\xc2\xeb\xb4\xed\xce\xf3\xa3\xac\xc7\xeb\xd6\xd8\xca\xd4'.decode('gbk')
#输入的帐号密码错误,请重试
f = es in body
print_log({0: "logined successfully!", 1 :'error usename or password!'}[f])
return not f
开发者ID:shouxi,项目名称:pywebrobot,代码行数:25,代码来源:emucher.py
示例15: SendRequest
def SendRequest(host, session, requestString):
data=bytes(json.dumps({ "query" : requestString }), "ASCII")
request=urllib.request.Request(host + ":16742", data)
request.add_header("Cookie", "session=" + session)
response = json.loads(urllib.request.urlopen(request).readall().decode('ascii'))
#sys.stderr.write(response + "\n")
return response
开发者ID:HackerDom,项目名称:ructf-2013-final,代码行数:7,代码来源:exploit.py
示例16: order_complete
def order_complete(request):
#토큰 얻기
data = urllib.parse.urlencode({"imp_key":IMP_KEY,"imp_secret":IMP_SECRET})
data = data.encode('UTF-8')
f = urllib.request.urlopen('https://api.iamport.kr/users/getToken/',data)
result = f.read().decode('UTF-8')
imp_uid = request.POST.get('imp_uid')
paid_amount = request.POST.get('paid_amount')
result_json=json.loads(result)
access_token=result_json['response']['access_token']
#imp_uid로 요청
url = 'https://api.iamport.kr/payments/'+imp_uid
request = urllib.request.Request(url)
request.add_header("X-ImpTokenHeader",access_token)
response = urllib.request.urlopen(request)
result2 = response.read().decode('UTF-8')
result2_json=json.loads(result2)
#결과 받기
pay_amount = result2_json['response']['amount']#int로 들어옴
pay_status = result2_json['response']['status']
pay_method = result2_json['response']['pay_method']
if pay_status == 'paid' and str(pay_amount) == paid_amount:
return HttpResponse('{"check":true}')
elif pay_status == 'ready' and pay_method == 'vbank':
return HttpResponse('{"check":true}')
else:
return HttpResponse('{"check":false,"pay_status":'+pay_status+'}')
开发者ID:LEESM,项目名称:beaucrux,代码行数:27,代码来源:views.py
示例17: _query
def _query(self, url, request, data=None):
"""
Cette fonction à usage interne est appelée par get(), post(), put(),
etc. Elle reçoit en argument une url et un
"""
try:
# si on a un identifiant de session, on le renvoie au serveur
if self.session:
request.add_header("Cookie", self.session)
# lance la requête. Si data n'est pas None, la requête aura un
# corps non-vide, avec data dedans.
with urllib.request.urlopen(request, data) as connexion:
# récupère les en-têtes HTTP et le corps de la réponse, puis
# ferme la connection
headers = dict(connexion.info())
result = connexion.read()
# si on envoie un identifiant de session, on le stocke
if "Set-Cookie" in headers:
self.session = headers["Set-Cookie"]
# on effectue le post-processing, puis on renvoie les données.
# c'est fini.
return self._post_processing(result, headers)
except urllib.error.HTTPError as e:
# On arrive ici si le serveur a renvoyé un code d'erreur HTTP
# (genre 400, 403, 404, etc.). On récupère le corps de la réponse
# car il y a peut-être des explications dedans. On a besoin des
# en-tête pour le post-processing.
headers = dict(e.headers)
message = e.read()
raise ServerError(e.code, self._post_processing(message, headers)) from None
开发者ID:Tikiwinkie,项目名称:PAC,代码行数:33,代码来源:client.py
示例18: download
def download(url, user_agent = 'wswp', num_retries = 2, charset = 'utf-8', proxy = None):
"""
Use user-agent to download website pages and return HTML TEXT, and will try a few times after website return 5** error code
return urllib.request.urlopen(urllib.request.Request(url)).read().decode('utf-8')
"""
print('Downloading:', url)
request = urllib.request.Request(url)
request.add_header('User-agent', user_agent) #用户代理下载网页
try:
if proxy:
proxy_support = urllib.request.ProxyHandler({'http': proxy}) #代理访问页面
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
resp = urllib.request.urlopen(request)
cs = resp.headers.get_content_charset()
if not cs:
cs = charset
# Decode by utf-8
html = resp.read().decode(cs)
except (URLError, HTTPError, ContentTooShortError) as e:
print('Download error:', e.reason)
html = None
if num_retries > 0:
if hasattr(e, 'code') and 500 <= e.code < 600:
return download(url, num_retries - 1)
return html
开发者ID:leroncheung,项目名称:leroncheung.github.io,代码行数:26,代码来源:pyscrap.py
示例19: __call
def __call(self, url=API_URL, params={}, data=None, headers={}):
"""Common method for API call.
url: API URL
params: query string parameters
data: POST data
headers: additional request headers
Return: parsed JSON structure or raise GooglError.
"""
params.update(key=self.key)
if self.userip is not None:
params.update(userip=self.userip)
full_url = "%s?%s" % (url % self.api, urllib.parse.urlencode(params))
request = urllib.request.Request(full_url, data=bytes(data, encoding="UTF-8"), headers=headers)
if self.referer is not None:
request.add_header("Referer", self.referer)
if self.client_login is not None:
request.add_header("Authorization", "GoogleLogin auth=%s" % self.client_login)
try:
response = urllib.request.urlopen(request)
return json.loads(str(response.read(), encoding="UTF-8"))
except urllib.error.HTTPError as e:
error = json.loads(e.fp.read())
raise GooglError(error["error"]["code"], error["error"]["message"])
开发者ID:GabrielRF,项目名称:telegram-urlprobot,代码行数:29,代码来源:googl.py
示例20: getWebPage
def getWebPage(url, headers, cookies, postData=None):
try:
if (postData):
params = urllib.parse.urlencode(postData)
params = params.encode('utf-8')
request = urllib.request.Request(url, data=params, headers=headers)
else:
print('Fetching '+url)
request = urllib.request.Request(url, None, headers)
request.add_header('Cookie', cookies)
if (postData):
response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
else:
response = urllib.request.urlopen(request)
if response.info().get('Content-Encoding') == 'gzip':
buf = BytesIO(response.read())
f = gzip.GzipFile(fileobj=buf)
r = f.read()
else:
r = response.read()
return r
except Exception as e:
print("Error processing webpage: "+str(e))
return None
开发者ID:aclima93,项目名称:AutoSteamGifts,代码行数:25,代码来源:steamgifts.py
注:本文中的urllib.request.add_header函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论