本文整理汇总了Python中utils.utf8函数的典型用法代码示例。如果您正苦于以下问题:Python utf8函数的具体用法?Python utf8怎么用?Python utf8使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了utf8函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _extract_html_
def _extract_html_(rule, doc, url_list, result):
tree = doc.getroottree()
if rule.has_key('xpath'):
node_list = doc.xpath(rule['xpath'])
if rule.has_key('children'):
for node in node_list:
item = {}
for child in rule['children']:
_extract_html_(child, node, url_list, item)
if item:
if rule.has_key('key'):
if not result.has_key(rule['key']):
result[rule['key']] = []
result[rule['key']].append(item)
else:
result.update(item)
else:
for node in node_list:
node = node.strip()
if rule.has_key('type') and rule['type'] == 'url':
url_list.append(
utils.utf8(
consume_absolute_url(
url,
consume_urlencode(
node, tree.docinfo.encoding
)
)
)
)
if rule.has_key('key'):
result[rule['key']] = utils.utf8(node)
开发者ID:chungongyu,项目名称:xfcrawl,代码行数:34,代码来源:extractor.py
示例2: decrypt_data
def decrypt_data():
aes_cipher = AESCipher(client.secret_key)
encrypted_uri = self.handler.request.headers.get('X-Api-Encrypted-Uri')
if encrypted_uri:
request.uri = aes_cipher.decrypt(utf8(encrypted_uri))
logger.debug('decrypted uri %s' % request.uri)
# 因为修改了 uri,需要重新生成 query_arguments
request.path, sep, request.query = request.uri.partition('?')
request.arguments = parse_qs_bytes(request.query, keep_blank_values=True)
request.query_arguments = copy.deepcopy(request.arguments)
encrypted_headers = self.handler.request.headers.get('X-Api-Encrypted-Headers')
if encrypted_headers:
headers_str = aes_cipher.decrypt(utf8(encrypted_headers))
headers = dict(json_decode(headers_str))
# logger.debug('raw headers %s' % request.headers)
for k, v in iteritems(headers):
# 要全部使用 text_type,否则会出现有的为 str,有的为 unicode
# 导致422错误
request.headers[text_type(k)] = text_type(v)
# logger.debug('decrypted headers %s' % request.headers)
if request.body and len(request.body) > 0:
logger.debug('解密 body')
logger.debug(request.body)
request.body = aes_cipher.decrypt(utf8(request.body))
# 因为修改了 body,需要重新 _parse_body
request._parse_body()
开发者ID:nicky1108,项目名称:api-gateway,代码行数:30,代码来源:encrypt.py
示例3: _response_string_to_sign
def _response_string_to_sign(self, response_headers, request, response_body):
"""
Return the canonical StringToSign as well as a dict
containing the original version of all headers that
were included in the StringToSign.
"""
headers_to_sign = self._response_headers_to_sign(response_headers)
canonical_headers = self._canonical_headers(headers_to_sign)
string_to_sign = b'\n'.join([utf8(request.method.upper()),
utf8(self.client.raw_uri),
utf8(canonical_headers),
utf8(response_body)])
return string_to_sign
开发者ID:eaglewei,项目名称:api-gateway,代码行数:13,代码来源:auth.py
示例4: response_string_to_sign
def response_string_to_sign(self, response):
"""
Return the canonical StringToSign as well as a dict
containing the original version of all headers that
were included in the StringToSign.
"""
headers_to_sign = self.response_headers_to_sign(response.headers)
canonical_headers = self.canonical_headers(headers_to_sign)
string_to_sign = b'\n'.join([utf8(self.request_data.method.upper()),
utf8(self.request_data.uri),
utf8(canonical_headers),
utf8(response.content)])
return string_to_sign
开发者ID:eaglewei,项目名称:api-gateway,代码行数:13,代码来源:api_client.py
示例5: h_line
def h_line(self, i):
out = []
for x in i[THING]:
if isinstance(x, (unicode, str)):
out.append(utf8(x))
elif x[WHAT] == 'itpl':
o = self.h(x[NAME])
if x[FILTER]:
o = self.filter(o)
else:
o = (o is not None and utf8(o)) or ""
out.append(o)
else:
raise WTF, x
return ''.join(out)
开发者ID:1ngmar,项目名称:ianki,代码行数:15,代码来源:template.py
示例6: signature_request
def signature_request(self):
string_to_sign = self.string_to_sign()
# 如果不是 unicode 输出会引发异常
# logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
hash_value = sha1(utf8(string_to_sign)).hexdigest()
signature = self.sign_string(hash_value)
return signature
开发者ID:nicky1108,项目名称:api-gateway,代码行数:7,代码来源:api_client.py
示例7: auth_request
def auth_request(self, request):
try:
timestamp = int(request.headers.get('X-Api-Timestamp'))
except ValueError:
raise AuthRequestException('Invalid X-Api-Timestamp Header')
now_ts = int(time.time())
if abs(timestamp - now_ts) > settings.SIGNATURE_EXPIRE_SECONDS:
logger.debug('Expired signature, timestamp: %s' % timestamp)
raise AuthRequestException('Expired Signature')
signature = request.headers.get('X-Api-Signature')
if signature:
del request.headers['X-Api-Signature']
else:
logger.debug('No Signature Provided')
raise AuthRequestException('No Signature Provided')
string_to_sign = self._request_string_to_sign(request)
# 如果不是 unicode 输出会引发异常
# logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
hash_value = sha256(utf8(string_to_sign)).hexdigest()
real_signature = self.sign_string(hash_value)
if signature != real_signature:
logger.debug('Signature not match: %s, %s' % (signature, real_signature))
raise AuthRequestException('Invalid Signature')
开发者ID:eaglewei,项目名称:api-gateway,代码行数:26,代码来源:auth.py
示例8: check_response
def check_response(self, response):
logger.debug(response.headers)
try:
timestamp = int(response.headers.get('X-Api-Timestamp'))
except ValueError:
logger.debug('Invalid X-Api-Timestamp Header')
return False
now_ts = int(time.time())
if abs(timestamp - now_ts) > self.signature_expire_seconds:
logger.debug('Expired signature, timestamp: %s' % timestamp)
logger.debug('Expired Signature')
return False
signature = response.headers.get('X-Api-Signature')
if signature:
del response.headers['X-Api-Signature']
else:
logger.debug('No signature provide')
return False
string_to_sign = self.response_string_to_sign(response)
logger.debug(string_to_sign)
# 如果不是 unicode 输出会引发异常
# logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
hash_value = sha256(utf8(string_to_sign)).hexdigest()
real_signature = self.sign_string(hash_value)
if signature != real_signature:
logger.debug('Signature not match: %s, %s' % (signature, real_signature))
return False
else:
return True
开发者ID:eaglewei,项目名称:api-gateway,代码行数:32,代码来源:api_client.py
示例9: post
def post(self, uri, data=None, json=None, params=None, headers=None, **kwargs):
url = self.prepare_request('POST', uri, params=params,
data=data, json=json, headers=headers)
if self.encrypt_type == 'aes':
url = self.encrypt_data()
self.request_data.headers.update(self.get_auth_headers())
logger.debug(self.request_data.headers)
signature = self.signature_request()
self.request_data.headers['X-Api-Signature'] = signature
r = requests.post(url, headers=self.request_data.headers,
data=utf8(self.request_data.body), **kwargs)
logger.debug(url)
logger.debug(self.request_data.headers)
if r.status_code != GATEWAY_ERROR_STATUS_CODE:
is_valid = self.check_response(r)
if not is_valid:
logger.debug('返回结果签名不正确')
r_encrypt_type = r.headers.get('x-api-encrypt-type', 'raw')
if r_encrypt_type == 'aes':
r._content = self.decrypt_data(r.content)
return r
开发者ID:eaglewei,项目名称:api-gateway,代码行数:26,代码来源:api_client.py
示例10: render
def render(template, params):
jinja = Environment(
loader=FileSystemLoader([os.path.join(ROOT, '_mobi')]),
autoescape=False,
)
jinja.filters.update({'xmldatetime': xmldatetime})
tpl = jinja.get_template(template)
return utf8(tpl.render(params))
开发者ID:daqing15,项目名称:ebook,代码行数:8,代码来源:mobi.py
示例11: header
def header(hdr, value, unique=False):
"""
Adds the header `hdr: value` with the response.
If `unique` is True and a header with that name already exists,
it doesn't add a new one.
"""
hdr, value = utf8(hdr), utf8(value)
# protection against HTTP response splitting attack
if '\n' in hdr or '\r' in hdr or '\n' in value or '\r' in value:
raise ValueError, 'invalid characters in header'
if unique is True:
for h, v in ctx.headers:
if h.lower() == hdr.lower(): return
ctx.headers.append((hdr, value))
开发者ID:stucchio,项目名称:DJ-Pirate,代码行数:17,代码来源:webapi.py
示例12: signature_response
def signature_response(self, response_header, request, response_body):
string_to_sign = self._response_string_to_sign(
response_header, request, response_body)
# logger.debug(string_to_sign.decode('utf-8'))
# 如果不是 unicode 输出会引发异常
# logger.debug('string_to_sign: %s' % string_to_sign.decode('utf-8'))
hash_value = sha256(utf8(string_to_sign)).hexdigest()
signature = self.sign_string(hash_value)
return signature
开发者ID:eaglewei,项目名称:api-gateway,代码行数:9,代码来源:auth.py
示例13: instance_url
def instance_url(self, uid=None):
if not uid and not self.get('uid'):
raise ValueError(
'Could not determine which URL to request: %s instance '
'has invalid ID: %r' % (type(self).__name__, uid), 'id')
uid = utils.utf8(self.get('uid', uid))
extn = urllib.quote_plus(uid)
return "%s%s/" % (self._class_url(), extn)
开发者ID:benny0924,项目名称:sense-python-client,代码行数:9,代码来源:resources.py
示例14: encrypt_data
def encrypt_data(body):
# 如果请求的使用 AES 加密,则加密返回的数据
logger.debug('使用 AES 加密 body')
aes_cipher = AESCipher(client.secret_key)
body = aes_cipher.encrypt(utf8(body))
# 更新为加密后的数据
self.handler.clear_write_buffer()
self.handler.write(body)
self.handler.set_header('X-Api-Encrypt-Type', 'aes')
开发者ID:nicky1108,项目名称:api-gateway,代码行数:9,代码来源:encrypt.py
示例15: urlencode
def urlencode(query):
"""
Same as urllib.urlencode, but supports unicode strings.
>>> urlencode({'text':'foo bar'})
'text=foo+bar'
"""
query = dict([(k, utils.utf8(v)) for k, v in query.items()])
return urllib.urlencode(query)
开发者ID:oopos,项目名称:opspy,代码行数:9,代码来源:http.py
示例16: consume_filter
def consume_filter(options, **parms):
def _filter_js(options, links):
urls = []
for url in links:
if not url.startswith('javascript:'):
urls.append(url)
return urls
def _filter_site(options, links):
urls = []
def _get_tld(url):
try: return utils.utf8(tld.get_tld(url))
except: return None
redisdb = options['redis']
links = map(lambda x: (x, _get_tld(x)), links)
sites = set(filter(
lambda x: x is not None,
map(lambda x: x[1], links)
)
)
site_states = redisdb.smembers('sites_allowed') & sites
for url, site in links:
if site in site_states:
urls.append(url)
return urls
def _filter_state(options, links):
urls = []
redisdb = options['redis']
now = time.time()
url_states = redisdb.mget(links) if len(links) > 0 else []
with redisdb.pipeline() as pipe:
for i, url in enumerate(links):
if url_states[i] is None:
urls.append(url)
pipe.setex(url, json.dumps({'dispatch_time': now}), options.get('urlstate.expire', 24*3600))
pipe.execute()
return urls
if consume_isvalid(**parms):
links = map(lambda x: string.strip(utils.utf8(x)), parms['data']['links'])
for _filter in [_filter_js, _filter_state]:
links = list(_filter(options, links))
return links
return []
开发者ID:chungongyu,项目名称:xfcrawl,代码行数:56,代码来源:dispatcher.py
示例17: encrypt_data
def encrypt_data(self):
aes_cipher = AESCipher(self.secret_key)
headers_str = json_util.dumps(self.request_data.headers)
# 加密 Headers 和 url
self.request_data.headers = {
'Content-Type': 'application/octet-stream',
'X-Api-Encrypted-Headers': aes_cipher.encrypt(utf8(headers_str)),
'X-Api-Encrypted-Uri': aes_cipher.encrypt(utf8(self.request_data.uri))
}
self.request_data.uri = '/?_t=%d&_nonce=%s' % \
(int(time.time()), text_type(random.random()))
# 设置一个新的 url
url = self.api_server.strip() + self.request_data.uri
if self.request_data.body is not None and len(self.request_data.body) > 0:
self.request_data.body = aes_cipher.encrypt(utf8(self.request_data.body))
logger.debug(self.request_data.body)
return url
开发者ID:eaglewei,项目名称:api-gateway,代码行数:19,代码来源:api_client.py
示例18: wsgifunc
def wsgifunc(self, *middleware, **kw):
"""Returns a WSGI-compatible function for this application."""
def peep(iterator):
"""Peeps into an iterator by doing an iteration
and returns an equivalent iterator.
"""
# wsgi requires the headers first
# so we need to do an iteration
# and save the result for later
try:
firstchunk = iterator.next()
except StopIteration:
firstchunk = ""
return itertools.chain([firstchunk], iterator)
def is_generator(x):
return x and hasattr(x, "next")
def wsgi(env, start_resp):
self.load(env)
try:
# allow uppercase methods only
if web.ctx.method.upper() != web.ctx.method:
raise web.nomethod()
result = self.handle_with_processors()
except web.HTTPError, e:
result = e.data
if is_generator(result):
result = peep(result)
else:
result = [utils.utf8(result)]
status, headers = web.ctx.status, web.ctx.headers
start_resp(status, headers)
# @@@
# Since the CherryPy Webserver uses thread pool, the thread-local state is never cleared.
# This interferes with the other requests.
# clearing the thread-local storage to avoid that.
# see utils.ThreadedDict for details
import threading
t = threading.currentThread()
if kw.get("cleanup_threadlocal", True) and hasattr(t, "_d"):
del t._d
return result
开发者ID:mzkmzk,项目名称:jit,代码行数:52,代码来源:application.py
示例19: serialize
def serialize(self, request, content_type, default_serializers=None):
"""Serializes the wrapped object.
Utility method for serializing the wrapped object. Returns a
webob.Response object.
"""
if self.serializer:
serializer = self.serializer
else:
_mtype, _serializer = self.get_serializer(content_type,
default_serializers)
serializer = _serializer()
response = webob.Response()
response.status_int = self.code
for hdr, value in self._headers.items():
response.headers[hdr] = utils.utf8(str(value))
response.headers['Content-Type'] = utils.utf8(content_type)
if self.obj is not None:
response.body = serializer.serialize(self.obj)
return response
开发者ID:back1860,项目名称:cloudemulator,代码行数:23,代码来源:wsgi.py
示例20: set_cookie
def set_cookie(self, name, value, domain=None, expires=None, path="/",
expires_days=None):
"""Sets the given cookie name/value with the given options."""
name = utils.utf8(name)
value = utils.utf8(value)
if re.search(r"[\x00-\x20]", name + value):
# Don't let us accidentally inject bad stuff
raise ValueError("Invalid cookie %r: %r" % (name, value))
cookie = '%s=%s' % (name, value)
buf = [cookie]
if domain:
buf.append('domain=%s' % domain)
if expires_days is not None and not expires:
expires = datetime.datetime.utcnow() + datetime.timedelta(
days=expires_days)
if expires:
timestamp = calendar.timegm(expires.utctimetuple())
expires = email.utils.formatdate(
timestamp, localtime=False, usegmt=True)
buf.append('expires=%s' % expires)
if path:
buf.append('path=%s' % path)
self.response.headers.add_header('Set-Cookie', '; '.join(buf))
开发者ID:mccutchen,项目名称:tweetlocker,代码行数:23,代码来源:handlers.py
注:本文中的utils.utf8函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论