• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python pep.native_str函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中pulsar.utils.pep.native_str函数的典型用法代码示例。如果您正苦于以下问题:Python native_str函数的具体用法?Python native_str怎么用?Python native_str使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了native_str函数的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_info

def parse_info(response):
    info = {}
    response = native_str(response)

    def get_value(value):
        if ',' not in value or '=' not in value:
            try:
                if '.' in value:
                    return float(value)
                else:
                    return int(value)
            except ValueError:
                return value
        else:
            sub_dict = {}
            for item in value.split(','):
                k, v = item.rsplit('=', 1)
                sub_dict[k] = get_value(v)
            return sub_dict

    for line in response.splitlines():
        if line and not line.startswith('#'):
            key, value = line.split(':', 1)
            info[key] = get_value(value)
    return info
开发者ID:axisofentropy,项目名称:pulsar,代码行数:25,代码来源:client.py


示例2: write_err

 def write_err(self, msg='', stream=None):
     '''Write ``msg`` into ``stream`` or ``sys.stderr``
     '''
     h = stream or sys.stderr
     if msg:
         h.write(native_str(msg))
     h.write('\n')
开发者ID:pombredanne,项目名称:lux,代码行数:7,代码来源:extension.py


示例3: minify

 def minify(self, options, data):
     b = convert_bytes(len(data))
     self.write('Minimise %s css file via http://cssminifier.com' % b)
     http = HttpClient(loop=new_event_loop())
     response = http.post('http://cssminifier.com/raw',
                          data={'input': data})
     if response.status_code == 200:
         return native_str(response.get_content())
     else:
         response.raise_for_status()
开发者ID:pombredanne,项目名称:lux,代码行数:10,代码来源:style.py


示例4: _encode_url

 def _encode_url(self, data):
     query = self.query
     if data:
         data = native_str(data)
         if isinstance(data, str):
             data = parse_qsl(data)
         else:
             data = mapping_iterator(data)
         query = parse_qsl(query)
         query.extend(data)
         query = urlencode(query)
     self.query = query
开发者ID:successtest9,项目名称:pulsar,代码行数:12,代码来源:__init__.py


示例5: _encode_url

 def _encode_url(self, body):
     query = self.query
     if body:
         body = native_str(body)
         if isinstance(body, str):
             body = parse_qsl(body)
         else:
             body = mapping_iterator(body)
         query = parse_qsl(query)
         query.extend(body)
         self.data = query
         query = urlencode(query)
     self.query = query
开发者ID:Ghost-script,项目名称:dyno-chat,代码行数:13,代码来源:__init__.py


示例6: model_iterator

    def model_iterator(self, application, include_related=True, exclude=None):
        '''A generator of :class:`.Model` classes found in *application*.

        :parameter application: A python dotted path or an iterable over
            python dotted-paths where models are defined.

        Only models defined in these paths are considered.
        '''
        if exclude is None:
            exclude = set()
        application = native_str(application)
        if ismodule(application) or isinstance(application, str):
            if ismodule(application):
                mod, application = application, application.__name__
            else:
                try:
                    mod = import_module(application)
                except ImportError:
                    # the module is not there
                    mod = None
            if mod:
                label = application.split('.')[-1]
                try:
                    mod_models = import_module('.models', application)
                except ImportError:
                    mod_models = mod
                label = getattr(mod_models, 'app_label', label)
                models = set()
                for name in dir(mod_models):
                    value = getattr(mod_models, name)
                    for model in self.models_from_model(
                            value, include_related=include_related,
                            exclude=exclude):
                        if (model._meta.app_label == label
                                and model not in models):
                            models.add(model)
                            yield model
        else:
            for app in application:
                for m in self.model_iterator(app,
                                             include_related=include_related,
                                             exclude=exclude):
                    yield m
开发者ID:Ghost-script,项目名称:dyno-chat,代码行数:43,代码来源:mapper.py


示例7: websocket_key

 def websocket_key(self):
     if not hasattr(self, '_websocket_key'):
         self._websocket_key = native_str(b64encode(os.urandom(16)),
                                          DEFAULT_CHARSET)
     return self._websocket_key
开发者ID:successtest9,项目名称:pulsar,代码行数:5,代码来源:__init__.py


示例8: wsgi_environ

def wsgi_environ(stream, address, client_address, headers,
                 server_software=None, https=False, extra=None):
    protocol = stream.protocol()
    parser = stream.parser
    request_headers = stream.headers
    raw_uri = parser.get_url()
    request_uri = urlparse(raw_uri)
    #
    # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.2
    # If Request-URI is an absoluteURI, the host is part of the Request-URI.
    # Any Host header field value in the request MUST be ignored
    if request_uri.scheme:
        url_scheme = request_uri.scheme
        host = request_uri.netloc
    else:
        url_scheme = 'https' if https else 'http'
        host = None
    #
    environ = {"wsgi.input": stream,
               "wsgi.errors": sys.stderr,
               "wsgi.version": (1, 0),
               "wsgi.run_once": False,
               "wsgi.multithread": False,
               "wsgi.multiprocess": False,
               "SERVER_SOFTWARE": server_software or pulsar.SERVER_SOFTWARE,
               "REQUEST_METHOD": native_str(parser.get_method()),
               "QUERY_STRING": parser.get_query_string(),
               "RAW_URI": raw_uri,
               "SERVER_PROTOCOL": protocol,
               "CONTENT_TYPE": ''}
    forward = client_address
    script_name = os.environ.get("SCRIPT_NAME", "")
    for header, value in request_headers:
        header = header.lower()
        if header in HOP_HEADERS:
            headers[header] = value
        if header == 'x-forwarded-for':
            forward = value
        elif header == "x-forwarded-protocol" and value == "ssl":
            url_scheme = "https"
        elif header == "x-forwarded-ssl" and value == "on":
            url_scheme = "https"
        elif header == "host" and not host:
            host = value
        elif header == "script_name":
            script_name = value
        elif header == "content-type":
            environ['CONTENT_TYPE'] = value
            continue
        elif header == "content-length":
            environ['CONTENT_LENGTH'] = value
            continue
        key = 'HTTP_' + header.upper().replace('-', '_')
        environ[key] = value
    environ['wsgi.url_scheme'] = url_scheme
    if url_scheme == 'https':
        environ['HTTPS'] = 'on'
    if is_string(forward):
        # we only took the last one
        # http://en.wikipedia.org/wiki/X-Forwarded-For
        if forward.find(",") >= 0:
            forward = forward.rsplit(",", 1)[1].strip()
        remote = forward.split(":")
        if len(remote) < 2:
            remote.append('80')
    else:
        remote = forward
    environ['REMOTE_ADDR'] = remote[0]
    environ['REMOTE_PORT'] = str(remote[1])
    if not host and protocol == 'HTTP/1.0':
        host = format_address(address)
    if host:
        host = host_and_port_default(url_scheme, host)
        environ['SERVER_NAME'] = socket.getfqdn(host[0])
        environ['SERVER_PORT'] = host[1]
    path_info = parser.get_path()
    if path_info is not None:
        if script_name:
            path_info = path_info.split(script_name, 1)[1]
        environ['PATH_INFO'] = unquote(path_info)
    environ['SCRIPT_NAME'] = script_name
    if extra:
        environ.update(extra)
    return environ
开发者ID:Ghost-script,项目名称:dyno-chat,代码行数:84,代码来源:server.py


示例9: challenge_response

 def challenge_response(self, key):
     sha1 = hashlib.sha1(to_bytes(key+WEBSOCKET_GUID))
     return native_str(base64.b64encode(sha1.digest()))
开发者ID:artemmus,项目名称:pulsar,代码行数:3,代码来源:websocket.py


示例10: test_native_str

 def test_native_str(self):
     s = "ciao"
     s2 = native_str(s)
     self.assertEqual(id(s), id(s2))
开发者ID:huobao36,项目名称:pulsar,代码行数:4,代码来源:tools.py


示例11: wsgi_environ

def wsgi_environ(
    stream, parser, request_headers, address, client_address, headers, server_software=None, https=False, extra=None
):
    """Build the WSGI Environment dictionary

    :param stream: a wsgi stream object
    :param parser: pulsar HTTP parser
    :param request_headers: headers of request
    :param address: server address
    :param client_address: client address
    :param headers: container for response headers
    """
    protocol = http_protocol(parser)
    raw_uri = parser.get_url()
    request_uri = urlparse(raw_uri)
    #
    # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.2
    # If Request-URI is an absoluteURI, the host is part of the Request-URI.
    # Any Host header field value in the request MUST be ignored
    if request_uri.scheme:
        url_scheme = request_uri.scheme
        host = request_uri.netloc
    else:
        url_scheme = "https" if https else "http"
        host = None
    #
    environ = {
        "wsgi.input": stream,
        "wsgi.errors": sys.stderr,
        "wsgi.version": (1, 0),
        "wsgi.run_once": False,
        "wsgi.multithread": False,
        "wsgi.multiprocess": False,
        "SERVER_SOFTWARE": server_software or pulsar.SERVER_SOFTWARE,
        "REQUEST_METHOD": native_str(parser.get_method()),
        "QUERY_STRING": parser.get_query_string(),
        "RAW_URI": raw_uri,
        "SERVER_PROTOCOL": protocol,
        "CONTENT_TYPE": "",
    }
    forward = client_address
    script_name = os.environ.get("SCRIPT_NAME", "")
    for header, value in request_headers:
        header = header.lower()
        if header in HOP_HEADERS:
            headers[header] = value
        if header == "x-forwarded-for":
            forward = value
        elif header == "x-forwarded-protocol" and value == "ssl":
            url_scheme = "https"
        elif header == "x-forwarded-ssl" and value == "on":
            url_scheme = "https"
        elif header == "host" and not host:
            host = value
        elif header == "script_name":
            script_name = value
        elif header == "content-type":
            environ["CONTENT_TYPE"] = value
            continue
        elif header == "content-length":
            environ["CONTENT_LENGTH"] = value
            continue
        key = "HTTP_" + header.upper().replace("-", "_")
        environ[key] = value
    environ["wsgi.url_scheme"] = url_scheme
    if url_scheme == "https":
        environ["HTTPS"] = "on"
    if isinstance(forward, str):
        # we only took the last one
        # http://en.wikipedia.org/wiki/X-Forwarded-For
        if forward.find(",") >= 0:
            forward = forward.rsplit(",", 1)[1].strip()
        remote = forward.split(":")
        if len(remote) < 2:
            remote.append("80")
    else:
        remote = forward
    environ["REMOTE_ADDR"] = remote[0]
    environ["REMOTE_PORT"] = str(remote[1])
    if not host and protocol == "HTTP/1.0":
        host = format_address(address)
    if host:
        h = host_and_port_default(url_scheme, host)
        environ["SERVER_NAME"] = socket.getfqdn(h[0]) if h[0] else "0.0.0.0"
        environ["SERVER_PORT"] = h[1]
    path_info = request_uri.path
    if path_info is not None:
        if script_name:
            path_info = path_info.split(script_name, 1)[1]
        environ["PATH_INFO"] = unquote(path_info)
    environ["SCRIPT_NAME"] = script_name
    if extra:
        environ.update(extra)
    return environ
开发者ID:luffyhwl,项目名称:pulsar,代码行数:94,代码来源:server.py


示例12: write_err

 def write_err(self, stream=''):
     '''Write ``stream`` to the :attr:`stderr`.'''
     h = self.stderr or self.stdout or sys.stderr
     if stream:
         h.write(native_str(stream))
     h.write('\n')
开发者ID:pombredanne,项目名称:lux,代码行数:6,代码来源:__init__.py



注:本文中的pulsar.utils.pep.native_str函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python pep.range函数代码示例发布时间:2022-05-25
下一篇:
Python pep.iteritems函数代码示例发布时间:2022-05-25
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap