• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python urllib.splitport函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib.splitport函数的典型用法代码示例。如果您正苦于以下问题:Python splitport函数的具体用法?Python splitport怎么用?Python splitport使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了splitport函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: parse_address_info

    def parse_address_info(cls, server_addr="nats://nats:[email protected]:4222"):
        '''\
        parse the metadata nats server uri;

        Params:
        =====
        addr: nats server address;

        Returns:
        =====
        user: username to login nats server;
        pswd: password to login nats server;
        host: ip address of nats server;
        port: port of nats server
        '''

        if type(server_addr) is not str:
            raise NotImplementException

        protocol, after_split = urllib.splittype(server_addr)

        if not protocol == "nats":
            raise NotImplementException

        auth_len = len(server_addr.split('@'))
        if auth_len > 1:
            auth, after_split = urllib.splituser(after_split)
            user_raw, pswd = urllib.splitpasswd(auth)
            user = user_raw.lstrip("/")
            _, after_split = urllib.splithost(after_split)
            host, port = urllib.splitport(after_split)
        else:
            user = pswd = None
            host, port = urllib.splitport(after_split)
        return user, pswd, host, int(port)
开发者ID:PyHUBKyiv,项目名称:python-nats,代码行数:35,代码来源:common.py


示例2: __init__

 def __init__(self, uri, transport=None, encoding=None, 
              verbose=0, version=None):
     import urllib
     if not version:
         version = config.version
     self.__version = version
     schema, uri = urllib.splittype(uri)
     if schema not in ('http', 'https', 'unix'):
         raise IOError('Unsupported JSON-RPC protocol.')
     if schema == 'unix':
         if not USE_UNIX_SOCKETS:
             # Don't like the "generic" Exception...
             raise UnixSocketMissing("Unix sockets not available.")
         self.__host = uri
         self.__handler = '/'
     else:
         self.__host, self.__handler = urllib.splithost(uri)
         if not self.__handler:
             # Not sure if this is in the JSON spec?
             #self.__handler = '/'
             self.__handler == '/'
     if transport is None:
         if schema == 'unix':
             transport = UnixTransport()
         elif schema == 'https':
             transport = SafeTransport()
         else:
             transport = Transport()
     self.__transport = transport
     self.__encoding = encoding
     self.__verbose = verbose
     print(urllib.splitport(self.__host))
     self.__sock=socks.create_connection( (urllib.splitport(self.__host)))
     self.__sock.settimeout(0.1)
开发者ID:wwzbwwzb,项目名称:rpcjsonlib,代码行数:34,代码来源:jsonrpc.py


示例3: isKnownServer

 def isKnownServer(self, URL):
     net, nhost, path, query, id = urlparse.urlsplit(URL)
     nhost, port = urllib.splitport(nhost);
     for kURL in self.__SourceURLs:
         net, khost, path, query, id = urlparse.urlsplit(kURL);
         khost, port = urllib.splitport(khost);
         if nhost == khost: return True;
     return False;
开发者ID:BackupTheBerlios,项目名称:pplt-svn,代码行数:8,代码来源:DataBase.py


示例4: dnslookup

def dnslookup(url):
    """Replaces a hostname by its IP in an url.

    Uses gethostbyname to do a DNS lookup, so the nscd cache is used.

    If gevent has patched the standard library, makes sure it uses the
    original version because gevent uses its own mechanism based on
    the async libevent's evdns_resolve_ipv4, which does not use
    glibc's resolver.
    """
    try:
        from gevent.socket import _socket
        gethostbyname = _socket.gethostbyname
    except ImportError:
        import socket
        gethostbyname = socket.gethostbyname

    # parsing
    parsed_url = urlparse.urlparse(url)
    host, port = urllib.splitport(parsed_url.netloc)
    user, host = urllib.splituser(host)

    # resolving the host
    host = gethostbyname(host)

    # recomposing
    if port is not None:
        host = '%s:%s' % (host, port)

    if user is not None:
        host = '%[email protected]%s' % (user, host)

    parts = [parsed_url[0]] + [host] + list(parsed_url[2:])
    return urlparse.urlunparse(parts)
开发者ID:ncalexan,项目名称:mozservices,代码行数:34,代码来源:util.py


示例5: _get_real_authority

    def _get_real_authority(self):
        """
        Return the authority specification of the originally requested URL.

        The return value is a string of the form <host>:<port>.

        """

        url = self._proxy_request.get_selector()

        proto, rest = urllib.splittype(url)
        if proto is None:
            raise ValueError("unknown URL type: %s" % url)

        # Get the host and port specification
        host, rest = urllib.splithost(rest)
        host, port = urllib.splitport(host)

        # If port is not defined, then try to get it from the protocol.
        if port is None:
            try:
                port = self._ports[proto]
            except KeyError:
                raise ValueError("unknown protocol for: %s" % url)

        return '%s:%d' % (host, port)
开发者ID:pombredanne,项目名称:ensetuptools,代码行数:26,代码来源:connect_HTTP_handler.py


示例6: request

    def request(self, method, path, headers=None, data=None, etag=None, etagnot=None, timeout=None):
        """Make HTTP request. Return HTTPResponse instance.

        Will never raise urllib2.HTTPError, but may raise other exceptions, such
        as urllib2.URLError or httplib.HTTPException
        """
        if path[:1]=='/':
            path = path[1:]
        if headers is None:
            headers = {}
        if etag is not None:
            headers['If-Match'] = '"%s"' % etag if etag!='*' else '*' # XXX use quoteString instead?
        if etagnot is not None:
            headers['If-None-Match'] = ('"%s"' % etagnot) if etagnot!='*' else '*'
        url = self.base_url+path
        req = HTTPRequest(url, method=method, headers=headers, data=data)
        host, port = urllib.splitport(req.get_host())
        HostCache.lookup(host)
        try:
            response = self.opener.open(req, timeout=timeout)
            if isinstance(response, urllib2.HTTPError):
                return HTTPResponse.from_HTTPError(response)
            elif isinstance(response, urllib2.addinfourl):
                return HTTPResponse.from_addinfourl(response)
            else:
                raise RuntimeError('urllib2.open returned %r' % response)
        except urllib2.HTTPError, e:
            # Workaround for bug in urllib2 which doesn't reset the retry count
            # when a negative, but different that 401 or 407, response is
            # received. -Luci
            if e.code not in (401, 407):
                for handler in (handler for handler in self.opener.handlers if isinstance(handler, (urllib2.HTTPDigestAuthHandler, urllib2.ProxyDigestAuthHandler))):
                    handler.reset_retry_count()
            return HTTPResponse.from_HTTPError(e)
开发者ID:AGProjects,项目名称:python-xcaplib,代码行数:34,代码来源:httpclient.py


示例7: __init__

    def __init__(self, ec2_url, ec2_region, ec2_access_key, ec2_secret_key,
                 vpc=None, storage_path=None, request_floating_ip=False):
        self._url = ec2_url
        self._region_name = ec2_region
        self._access_key = ec2_access_key
        self._secret_key = ec2_secret_key
        self._vpc = vpc
        self.request_floating_ip = request_floating_ip

        # read all parameters from url
        proto, opaqueurl = urllib.splittype(ec2_url)
        self._host, self._ec2path = urllib.splithost(opaqueurl)
        self._ec2host, port = urllib.splitport(self._host)

        if port:
            port = int(port)
        self._ec2port = port

        if proto == "https":
            self._secure = True
        else:
            self._secure = False

        # will be initialized upon first connect
        self._ec2_connection = None
        self._vpc_connection = None
        self._vpc_id = None
        self._region = None

        self._instances = {}
        self._cached_instances = []
        self._images = None
开发者ID:HPCNow,项目名称:elasticluster,代码行数:32,代码来源:ec2_boto.py


示例8: open_local_file

 def open_local_file(self, req):
     try:
         import email.utils as emailutils
     except ImportError:
         # python 2.4
         import email.Utils as emailutils
     import mimetypes
     host = req.get_host()
     file = req.get_selector()
     localfile = url2pathname(file)
     try:
         stats = os.stat(localfile)
         size = stats.st_size
         modified = emailutils.formatdate(stats.st_mtime, usegmt=True)
         mtype = mimetypes.guess_type(file)[0]
         headers = mimetools.Message(StringIO(
             'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
             (mtype or 'text/plain', size, modified)))
         if host:
             host, port = splitport(host)
         if not host or \
             (not port and socket.gethostbyname(host) in self.get_names()):
             return addinfourl(open(localfile, 'rb'),
                               headers, 'file:'+file)
     except OSError, msg:
         # urllib2 users shouldn't expect OSErrors coming from urlopen()
         raise URLError(msg)
开发者ID:ufal,项目名称:lindat-aai-shibbie,代码行数:27,代码来源:_urllib2_fork.py


示例9: __init__

    def __init__(self, url):
        """ Initialize the downloader with the specified url string """
        # FIXME: support HTTPS
        scheme, host, path, params, query, fragment = urlparse.urlparse(url)
        
        auth, host = urllib.splituser(host)
        self.host, self.port = urllib.splitport(host)
        if not self.port:
            self.port = 80

        self.username = self.password = None
        if auth:
            self.username, self.password = urllib.splitpasswd(auth)

        self.url = urlparse.urlunparse((scheme, host, path, params, query, fragment))

        self.nzbFilename = os.path.basename(path)
        self.tempFilename = os.path.join(Hellanzb.TEMP_DIR,
                                         tempFilename(self.TEMP_FILENAME_PREFIX) + '.nzb')
        # The HTTPDownloader
        self.downloader = None
        # The NZB category (e.g. 'Apps')
        self.nzbCategory = None
        # Whether or not the NZB file data is gzipped
        self.isGzipped = False
开发者ID:Bootz,项目名称:hellanzb,代码行数:25,代码来源:NZBDownloader.py


示例10: request

    def request(self, method, url, body=None, headers={}):
        # Request is called before connect, so can interpret url and get
        # real host/port to be used to make CONNECT request to proxy
        proto, rest = urllib.splittype(url)

        if proto is None:
            raise ValueError, "unknown URL type: %s" % url

        # Get host
        host, rest = urllib.splithost(rest)

        # Try to get port
        host, port = urllib.splitport(host)

        # If port is not defined try to get from proto
        if port is None:
            try:
                port = self._ports[proto]
            except KeyError:
                raise ValueError, "unknown protocol for: %s" % url

        self._real_host = host
        self._real_port = int(port)

        httplib.HTTPConnection.request(self, method, rest, body, headers)
开发者ID:admintecriti,项目名称:sqlmap,代码行数:25,代码来源:proxy.py


示例11: _get_host_from_uri

    def _get_host_from_uri (self, uri):
        hostport = None
        host = None
        dnssdhost = None
        (scheme, rest) = urllib.splittype (uri)
        if scheme == 'hp' or scheme == 'hpfax':
            if rest.startswith ("/net/"):
                (rest, ipparam) = urllib.splitquery (rest[5:])
                if ipparam != None and ipparam.startswith("ip="):
                    hostport = ipparam[3:]
                else:
                    if ipparam != None and ipparam.startswith("zc="):
                        dnssdhost = ipparam[3:]
                    else:
                        return None, None
            else:
                return None, None
        elif scheme == 'dnssd' or scheme == 'mdns':
            # The URIs of the CUPS "dnssd" backend do not contain the host
            # name of the printer
            return None, None
        else:
            (hostport, rest) = urllib.splithost (rest)
            if hostport == None:
                return None, None

        if hostport:
            (host, port) = urllib.splitport (hostport)
        return host, dnssdhost
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:29,代码来源:PhysicalDevice.py


示例12: with_port

 def with_port(self, port):
     if self.scheme in SCHEME_PORT_MAP:
         if SCHEME_PORT_MAP[self.scheme] == port:
             return self.without_port()
     
     host, _ = urllib.splitport(self.host)
     return self.with_host(host + ':' + str(port))
开发者ID:davidmoss,项目名称:urlobject,代码行数:7,代码来源:urlobject.py


示例13: getChild

    def getChild (self, path, request, ) :
        _host_orig = urllib.splitport(request.getHeader("host"), )[0]
        _key = "%s:%s" % ("https" if request.isSecure() else "http", _host_orig, )

        try :
            _to = self._config.get(_key, "to", )
        except ConfigParser.NoSectionError :
            return BadRequestErrorPage()

        _p = urlparse.urlsplit(_to, )
        (_host, _port, ) = urllib.splitnport(_p.netloc, 443 if _p.scheme == "https" else 80, )

        if not self._without_x_forwarded_for :
            _headers = request.getAllHeaders()
            _xf = ("%s, " % self.RE_REMOVE_COMMA.sub(
                    "", _headers.get('x-forwarded-for')).strip()
                ) if "x-forwarded-for" in _headers else ""

            _x_forwarded_for = _xf + request.client.host
            _x_forwarded_proto = "https" if request.isSecure() else "http"
            request.received_headers['x-forwarded-for'] = _x_forwarded_for
            request.received_headers['x-forwarded-proto'] = _x_forwarded_proto

        request.received_headers['host-original'] = _host_orig
        request.content.seek(0, 0)
        return ReverseProxyResource(
                _host,
                _port,
                "/" + path if path else "/",
                reactor=FakeReactor(self._timeout, ),
            )
开发者ID:spikeekips,项目名称:pyhttpproxy,代码行数:31,代码来源:tap.py


示例14: request

  def request(self, method, url, body=None, headers={}):

    # Dissect the url to determine the protocol. Store it in the instance variable.
    self.protocol, stuffAfterProtocol = urllib.splittype(url)

    # Check to make sure we got some kind of protocol
    if self.protocol is None:
      raise ValueError, "Unknown protocol type in " + url

    # Parse out the host from the URL resource. host should be something like www.example.com or www.example.com:8888
    # and resourceString should be something like /example.html
    host, resourceString = urllib.splithost(stuffAfterProtocol)
    
    # Parse out the port from the host
    host, port = urllib.splitport(host)

    # It is possible that port is not defined. In that case we go by the protocol
    if port is None:
      # List of common protocol to port mappings
      protocolToPortMapping = {'http' : 80, 'https' : 443}

      # Check if the protocol is in the list
      if self.protocol in protocolToPortMapping:
        protocolToPortMapping[self.protocol]
        self._real_port = protocolToPortMapping[self.protocol]
      else:
        raise ValueError, "Unknown port for protocol " + str(self.protocol)
    else:
      self._real_port = port
    
    self._real_host = host

    httplib.HTTPConnection.request(self, method, url, body, headers)
开发者ID:2008chny,项目名称:OAuthTool,代码行数:33,代码来源:ConnectHTTPHandler.py


示例15: open_url

def open_url(method, url):
    redirectcount = 0
    while redirectcount < 3:
        (type, rest) = urllib.splittype(url)
        (host, path) = urllib.splithost(rest)
        (host, port) = urllib.splitport(host)
        if type == "https":
            if port == None:
                port = 443
        elif port == None:
            port = 80
        try:
            conn = None
            if type == "http":
                conn = httplib.HTTPConnection("%s:%s" % (host, port))
            else:
                conn = httplib.HTTPSConnection("%s:%s" % (host, port))
            conn.request(method, path)
            response = conn.getresponse()
            if response.status in [301, 302, 303, 307]:
                headers = response.getheaders()
                for header in headers:
                    if header[0] == "location":
                        url = header[1]
            elif response.status == 200:
                return response
        except:
            pass
        redirectcount = redirectcount + 1
    return None
开发者ID:klaernie,项目名称:rss2maildir,代码行数:30,代码来源:rss2maildir.py


示例16: test_node

def test_node(request):
    """Runs an SSH call on a node."""
    if authenticated_userid(request) is None:
        raise Forbidden()
    # trying an ssh connection
    connection = paramiko.client.SSHClient()
    connection.load_system_host_keys()
    connection.set_missing_host_key_policy(paramiko.WarningPolicy())
    name = request.matchdict['name']

    host, port = urllib.splitport(name)
    if port is None:
        port = 22

    username, host = urllib.splituser(host)
    credentials = {}

    if username is not None and ':' in username:
        username, password = username.split(':', 1)
        credentials = {"username": username, "password": password}
    elif username is not None:
        password = None
        credentials = {"username": username}

    try:
        connection.connect(host, port=port, timeout=5, **credentials)
        return 'Connection to %r : OK' % name
    except (socket.gaierror, socket.timeout), error:
        return str(error)
开发者ID:whitmo,项目名称:marteau,代码行数:29,代码来源:views.py


示例17: single_request

 def single_request(self, host, handler, request_body, verbose=0):
     # Add SCGI headers to the request.
     headers = {'CONTENT_LENGTH': str(len(request_body)), 'SCGI': '1'}
     header = '\x00'.join(('%s\x00%s' % item for item in headers.iteritems())) + '\x00'
     header = '%d:%s' % (len(header), header)
     request_body = '%s,%s' % (header, request_body)
     
     sock = None
     
     try:
         if host:
             host, port = urllib.splitport(host)
             addrinfo = socket.getaddrinfo(host, port, socket.AF_INET,
                                           socket.SOCK_STREAM)
             sock = socket.socket(*addrinfo[0][:3])
             sock.connect(addrinfo[0][4])
         else:
             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
             sock.connect(handler)
         
         self.verbose = verbose
         
         sock.send(request_body)
         return self.parse_response(sock.makefile())
     finally:
         if sock:
             sock.close()
开发者ID:paullill,项目名称:scripts,代码行数:27,代码来源:rstat.py


示例18: open_local_file

 def open_local_file(self, req):
     import mimetypes
     import email
     host = req.get_host()
     file = req.get_selector()
     localfile = urllib.url2pathname(file)
     stats = os.stat(localfile)
     size = stats[stat.ST_SIZE]
     modified = email.Utils.formatdate(stats[stat.ST_MTIME])
     mtype = mimetypes.guess_type(file)[0]
     if host:
         host, port = urllib.splitport(host)
         if port or socket.gethostbyname(host) not in self.get_names():
             raise urllib2.URLError('file not on local host')
     fo = open(localfile,'rb')
     brange = req.headers.get('Range', None)
     brange = range_header_to_tuple(brange)
     assert brange != ()
     if brange:
         (fb, lb) = brange
         if lb == '':
             lb = size
         if fb < 0 or fb > size or lb > size:
             raise RangeError('Requested Range Not Satisfiable')
         size = (lb - fb)
         fo = RangeableFileObject(fo, (fb, lb))
     headers = email.message_from_string(
         'Content-Type: %s\nContent-Length: %d\nLast-Modified: %s\n' %
         (mtype or 'text/plain', size, modified))
     return urllib.addinfourl(fo, headers, 'file:'+file)
开发者ID:MezzLabs,项目名称:mercurial,代码行数:30,代码来源:byterange.py


示例19: __send

    def __send(self, scgireq):
        scheme, netloc, path, query, frag = urlparse.urlsplit(self.url)
        host, port = urllib.splitport(netloc)
        # ~ print '>>>', (netloc, host, port)

        if netloc:
            addrinfo = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)

            assert len(addrinfo) == 1, "There's more than one? %r" % addrinfo
            # ~ print addrinfo

            sock = socket.socket(*addrinfo[0][:3])
            sock.connect(addrinfo[0][4])
        else:
            # if no host then assume unix domain socket
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect(path)

        sock.send(scgireq)
        recvdata = resp = sock.recv(1024)
        while recvdata != "":
            recvdata = sock.recv(1024)
            # ~ print 'Trying to receive more: %r'%recvdata
            resp += recvdata
        sock.close()
        return resp
开发者ID:wertel,项目名称:pyrt,代码行数:26,代码来源:xmlrpc2scgi.py


示例20: request

 def request(self, method, url, body=None, headers={}):
     """
     Make CONNECT request to proxy.
     """
         
     proto, rest = urllib.splittype(url)
     if proto is None:
         raise ValueError, "unknown URL type: %s" % url
         
     # Get hostname.
     host = urllib.splithost(rest)[0]
     
     # Get port of one
     host, port = urllib.splitport(host)
     
     # When no port use hardcoded.
     if port is None:
         try:
             port = self._ports[proto]
         except KeyError:
             raise ValueError, "unknown protocol for: %s" % url
     
     # Remember.        
     self._real_host = host
     self._real_port = port
     
     # Remember auth if there.
     if headers.has_key("Proxy-Authorization"):
         self._proxy_authorization = headers["Proxy-Authorization"]
         del headers["Proxy-Authorization"]
     else:
         self._proxy_authorization = None
     
     httplib.HTTPConnection.request(self, method, url, body, headers)
开发者ID:andreashe,项目名称:mygitsite,代码行数:34,代码来源:proxysupport.py



注:本文中的urllib.splitport函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python urllib.splitquery函数代码示例发布时间:2022-05-27
下一篇:
Python urllib.splitpasswd函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap