本文整理汇总了Python中urllib.splitport函数的典型用法代码示例。如果您正苦于以下问题:Python splitport函数的具体用法?Python splitport怎么用?Python splitport使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了splitport函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: parse_address_info
def parse_address_info(cls, server_addr="nats://nats:[email protected]:4222"):
'''\
parse the metadata nats server uri;
Params:
=====
addr: nats server address;
Returns:
=====
user: username to login nats server;
pswd: password to login nats server;
host: ip address of nats server;
port: port of nats server
'''
if type(server_addr) is not str:
raise NotImplementException
protocol, after_split = urllib.splittype(server_addr)
if not protocol == "nats":
raise NotImplementException
auth_len = len(server_addr.split('@'))
if auth_len > 1:
auth, after_split = urllib.splituser(after_split)
user_raw, pswd = urllib.splitpasswd(auth)
user = user_raw.lstrip("/")
_, after_split = urllib.splithost(after_split)
host, port = urllib.splitport(after_split)
else:
user = pswd = None
host, port = urllib.splitport(after_split)
return user, pswd, host, int(port)
开发者ID:PyHUBKyiv,项目名称:python-nats,代码行数:35,代码来源:common.py
示例2: __init__
def __init__(self, uri, transport=None, encoding=None,
verbose=0, version=None):
import urllib
if not version:
version = config.version
self.__version = version
schema, uri = urllib.splittype(uri)
if schema not in ('http', 'https', 'unix'):
raise IOError('Unsupported JSON-RPC protocol.')
if schema == 'unix':
if not USE_UNIX_SOCKETS:
# Don't like the "generic" Exception...
raise UnixSocketMissing("Unix sockets not available.")
self.__host = uri
self.__handler = '/'
else:
self.__host, self.__handler = urllib.splithost(uri)
if not self.__handler:
# Not sure if this is in the JSON spec?
#self.__handler = '/'
self.__handler == '/'
if transport is None:
if schema == 'unix':
transport = UnixTransport()
elif schema == 'https':
transport = SafeTransport()
else:
transport = Transport()
self.__transport = transport
self.__encoding = encoding
self.__verbose = verbose
print(urllib.splitport(self.__host))
self.__sock=socks.create_connection( (urllib.splitport(self.__host)))
self.__sock.settimeout(0.1)
开发者ID:wwzbwwzb,项目名称:rpcjsonlib,代码行数:34,代码来源:jsonrpc.py
示例3: isKnownServer
def isKnownServer(self, URL):
net, nhost, path, query, id = urlparse.urlsplit(URL)
nhost, port = urllib.splitport(nhost);
for kURL in self.__SourceURLs:
net, khost, path, query, id = urlparse.urlsplit(kURL);
khost, port = urllib.splitport(khost);
if nhost == khost: return True;
return False;
开发者ID:BackupTheBerlios,项目名称:pplt-svn,代码行数:8,代码来源:DataBase.py
示例4: dnslookup
def dnslookup(url):
"""Replaces a hostname by its IP in an url.
Uses gethostbyname to do a DNS lookup, so the nscd cache is used.
If gevent has patched the standard library, makes sure it uses the
original version because gevent uses its own mechanism based on
the async libevent's evdns_resolve_ipv4, which does not use
glibc's resolver.
"""
try:
from gevent.socket import _socket
gethostbyname = _socket.gethostbyname
except ImportError:
import socket
gethostbyname = socket.gethostbyname
# parsing
parsed_url = urlparse.urlparse(url)
host, port = urllib.splitport(parsed_url.netloc)
user, host = urllib.splituser(host)
# resolving the host
host = gethostbyname(host)
# recomposing
if port is not None:
host = '%s:%s' % (host, port)
if user is not None:
host = '%[email protected]%s' % (user, host)
parts = [parsed_url[0]] + [host] + list(parsed_url[2:])
return urlparse.urlunparse(parts)
开发者ID:ncalexan,项目名称:mozservices,代码行数:34,代码来源:util.py
示例5: _get_real_authority
def _get_real_authority(self):
"""
Return the authority specification of the originally requested URL.
The return value is a string of the form <host>:<port>.
"""
url = self._proxy_request.get_selector()
proto, rest = urllib.splittype(url)
if proto is None:
raise ValueError("unknown URL type: %s" % url)
# Get the host and port specification
host, rest = urllib.splithost(rest)
host, port = urllib.splitport(host)
# If port is not defined, then try to get it from the protocol.
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError("unknown protocol for: %s" % url)
return '%s:%d' % (host, port)
开发者ID:pombredanne,项目名称:ensetuptools,代码行数:26,代码来源:connect_HTTP_handler.py
示例6: request
def request(self, method, path, headers=None, data=None, etag=None, etagnot=None, timeout=None):
"""Make HTTP request. Return HTTPResponse instance.
Will never raise urllib2.HTTPError, but may raise other exceptions, such
as urllib2.URLError or httplib.HTTPException
"""
if path[:1]=='/':
path = path[1:]
if headers is None:
headers = {}
if etag is not None:
headers['If-Match'] = '"%s"' % etag if etag!='*' else '*' # XXX use quoteString instead?
if etagnot is not None:
headers['If-None-Match'] = ('"%s"' % etagnot) if etagnot!='*' else '*'
url = self.base_url+path
req = HTTPRequest(url, method=method, headers=headers, data=data)
host, port = urllib.splitport(req.get_host())
HostCache.lookup(host)
try:
response = self.opener.open(req, timeout=timeout)
if isinstance(response, urllib2.HTTPError):
return HTTPResponse.from_HTTPError(response)
elif isinstance(response, urllib2.addinfourl):
return HTTPResponse.from_addinfourl(response)
else:
raise RuntimeError('urllib2.open returned %r' % response)
except urllib2.HTTPError, e:
# Workaround for bug in urllib2 which doesn't reset the retry count
# when a negative, but different that 401 or 407, response is
# received. -Luci
if e.code not in (401, 407):
for handler in (handler for handler in self.opener.handlers if isinstance(handler, (urllib2.HTTPDigestAuthHandler, urllib2.ProxyDigestAuthHandler))):
handler.reset_retry_count()
return HTTPResponse.from_HTTPError(e)
开发者ID:AGProjects,项目名称:python-xcaplib,代码行数:34,代码来源:httpclient.py
示例7: __init__
def __init__(self, ec2_url, ec2_region, ec2_access_key, ec2_secret_key,
vpc=None, storage_path=None, request_floating_ip=False):
self._url = ec2_url
self._region_name = ec2_region
self._access_key = ec2_access_key
self._secret_key = ec2_secret_key
self._vpc = vpc
self.request_floating_ip = request_floating_ip
# read all parameters from url
proto, opaqueurl = urllib.splittype(ec2_url)
self._host, self._ec2path = urllib.splithost(opaqueurl)
self._ec2host, port = urllib.splitport(self._host)
if port:
port = int(port)
self._ec2port = port
if proto == "https":
self._secure = True
else:
self._secure = False
# will be initialized upon first connect
self._ec2_connection = None
self._vpc_connection = None
self._vpc_id = None
self._region = None
self._instances = {}
self._cached_instances = []
self._images = None
开发者ID:HPCNow,项目名称:elasticluster,代码行数:32,代码来源:ec2_boto.py
示例8: open_local_file
def open_local_file(self, req):
try:
import email.utils as emailutils
except ImportError:
# python 2.4
import email.Utils as emailutils
import mimetypes
host = req.get_host()
file = req.get_selector()
localfile = url2pathname(file)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = emailutils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(file)[0]
headers = mimetools.Message(StringIO(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified)))
if host:
host, port = splitport(host)
if not host or \
(not port and socket.gethostbyname(host) in self.get_names()):
return addinfourl(open(localfile, 'rb'),
headers, 'file:'+file)
except OSError, msg:
# urllib2 users shouldn't expect OSErrors coming from urlopen()
raise URLError(msg)
开发者ID:ufal,项目名称:lindat-aai-shibbie,代码行数:27,代码来源:_urllib2_fork.py
示例9: __init__
def __init__(self, url):
""" Initialize the downloader with the specified url string """
# FIXME: support HTTPS
scheme, host, path, params, query, fragment = urlparse.urlparse(url)
auth, host = urllib.splituser(host)
self.host, self.port = urllib.splitport(host)
if not self.port:
self.port = 80
self.username = self.password = None
if auth:
self.username, self.password = urllib.splitpasswd(auth)
self.url = urlparse.urlunparse((scheme, host, path, params, query, fragment))
self.nzbFilename = os.path.basename(path)
self.tempFilename = os.path.join(Hellanzb.TEMP_DIR,
tempFilename(self.TEMP_FILENAME_PREFIX) + '.nzb')
# The HTTPDownloader
self.downloader = None
# The NZB category (e.g. 'Apps')
self.nzbCategory = None
# Whether or not the NZB file data is gzipped
self.isGzipped = False
开发者ID:Bootz,项目名称:hellanzb,代码行数:25,代码来源:NZBDownloader.py
示例10: request
def request(self, method, url, body=None, headers={}):
# Request is called before connect, so can interpret url and get
# real host/port to be used to make CONNECT request to proxy
proto, rest = urllib.splittype(url)
if proto is None:
raise ValueError, "unknown URL type: %s" % url
# Get host
host, rest = urllib.splithost(rest)
# Try to get port
host, port = urllib.splitport(host)
# If port is not defined try to get from proto
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError, "unknown protocol for: %s" % url
self._real_host = host
self._real_port = int(port)
httplib.HTTPConnection.request(self, method, rest, body, headers)
开发者ID:admintecriti,项目名称:sqlmap,代码行数:25,代码来源:proxy.py
示例11: _get_host_from_uri
def _get_host_from_uri (self, uri):
hostport = None
host = None
dnssdhost = None
(scheme, rest) = urllib.splittype (uri)
if scheme == 'hp' or scheme == 'hpfax':
if rest.startswith ("/net/"):
(rest, ipparam) = urllib.splitquery (rest[5:])
if ipparam != None and ipparam.startswith("ip="):
hostport = ipparam[3:]
else:
if ipparam != None and ipparam.startswith("zc="):
dnssdhost = ipparam[3:]
else:
return None, None
else:
return None, None
elif scheme == 'dnssd' or scheme == 'mdns':
# The URIs of the CUPS "dnssd" backend do not contain the host
# name of the printer
return None, None
else:
(hostport, rest) = urllib.splithost (rest)
if hostport == None:
return None, None
if hostport:
(host, port) = urllib.splitport (hostport)
return host, dnssdhost
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:29,代码来源:PhysicalDevice.py
示例12: with_port
def with_port(self, port):
if self.scheme in SCHEME_PORT_MAP:
if SCHEME_PORT_MAP[self.scheme] == port:
return self.without_port()
host, _ = urllib.splitport(self.host)
return self.with_host(host + ':' + str(port))
开发者ID:davidmoss,项目名称:urlobject,代码行数:7,代码来源:urlobject.py
示例13: getChild
def getChild (self, path, request, ) :
_host_orig = urllib.splitport(request.getHeader("host"), )[0]
_key = "%s:%s" % ("https" if request.isSecure() else "http", _host_orig, )
try :
_to = self._config.get(_key, "to", )
except ConfigParser.NoSectionError :
return BadRequestErrorPage()
_p = urlparse.urlsplit(_to, )
(_host, _port, ) = urllib.splitnport(_p.netloc, 443 if _p.scheme == "https" else 80, )
if not self._without_x_forwarded_for :
_headers = request.getAllHeaders()
_xf = ("%s, " % self.RE_REMOVE_COMMA.sub(
"", _headers.get('x-forwarded-for')).strip()
) if "x-forwarded-for" in _headers else ""
_x_forwarded_for = _xf + request.client.host
_x_forwarded_proto = "https" if request.isSecure() else "http"
request.received_headers['x-forwarded-for'] = _x_forwarded_for
request.received_headers['x-forwarded-proto'] = _x_forwarded_proto
request.received_headers['host-original'] = _host_orig
request.content.seek(0, 0)
return ReverseProxyResource(
_host,
_port,
"/" + path if path else "/",
reactor=FakeReactor(self._timeout, ),
)
开发者ID:spikeekips,项目名称:pyhttpproxy,代码行数:31,代码来源:tap.py
示例14: request
def request(self, method, url, body=None, headers={}):
# Dissect the url to determine the protocol. Store it in the instance variable.
self.protocol, stuffAfterProtocol = urllib.splittype(url)
# Check to make sure we got some kind of protocol
if self.protocol is None:
raise ValueError, "Unknown protocol type in " + url
# Parse out the host from the URL resource. host should be something like www.example.com or www.example.com:8888
# and resourceString should be something like /example.html
host, resourceString = urllib.splithost(stuffAfterProtocol)
# Parse out the port from the host
host, port = urllib.splitport(host)
# It is possible that port is not defined. In that case we go by the protocol
if port is None:
# List of common protocol to port mappings
protocolToPortMapping = {'http' : 80, 'https' : 443}
# Check if the protocol is in the list
if self.protocol in protocolToPortMapping:
protocolToPortMapping[self.protocol]
self._real_port = protocolToPortMapping[self.protocol]
else:
raise ValueError, "Unknown port for protocol " + str(self.protocol)
else:
self._real_port = port
self._real_host = host
httplib.HTTPConnection.request(self, method, url, body, headers)
开发者ID:2008chny,项目名称:OAuthTool,代码行数:33,代码来源:ConnectHTTPHandler.py
示例15: open_url
def open_url(method, url):
redirectcount = 0
while redirectcount < 3:
(type, rest) = urllib.splittype(url)
(host, path) = urllib.splithost(rest)
(host, port) = urllib.splitport(host)
if type == "https":
if port == None:
port = 443
elif port == None:
port = 80
try:
conn = None
if type == "http":
conn = httplib.HTTPConnection("%s:%s" % (host, port))
else:
conn = httplib.HTTPSConnection("%s:%s" % (host, port))
conn.request(method, path)
response = conn.getresponse()
if response.status in [301, 302, 303, 307]:
headers = response.getheaders()
for header in headers:
if header[0] == "location":
url = header[1]
elif response.status == 200:
return response
except:
pass
redirectcount = redirectcount + 1
return None
开发者ID:klaernie,项目名称:rss2maildir,代码行数:30,代码来源:rss2maildir.py
示例16: test_node
def test_node(request):
"""Runs an SSH call on a node."""
if authenticated_userid(request) is None:
raise Forbidden()
# trying an ssh connection
connection = paramiko.client.SSHClient()
connection.load_system_host_keys()
connection.set_missing_host_key_policy(paramiko.WarningPolicy())
name = request.matchdict['name']
host, port = urllib.splitport(name)
if port is None:
port = 22
username, host = urllib.splituser(host)
credentials = {}
if username is not None and ':' in username:
username, password = username.split(':', 1)
credentials = {"username": username, "password": password}
elif username is not None:
password = None
credentials = {"username": username}
try:
connection.connect(host, port=port, timeout=5, **credentials)
return 'Connection to %r : OK' % name
except (socket.gaierror, socket.timeout), error:
return str(error)
开发者ID:whitmo,项目名称:marteau,代码行数:29,代码来源:views.py
示例17: single_request
def single_request(self, host, handler, request_body, verbose=0):
# Add SCGI headers to the request.
headers = {'CONTENT_LENGTH': str(len(request_body)), 'SCGI': '1'}
header = '\x00'.join(('%s\x00%s' % item for item in headers.iteritems())) + '\x00'
header = '%d:%s' % (len(header), header)
request_body = '%s,%s' % (header, request_body)
sock = None
try:
if host:
host, port = urllib.splitport(host)
addrinfo = socket.getaddrinfo(host, port, socket.AF_INET,
socket.SOCK_STREAM)
sock = socket.socket(*addrinfo[0][:3])
sock.connect(addrinfo[0][4])
else:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(handler)
self.verbose = verbose
sock.send(request_body)
return self.parse_response(sock.makefile())
finally:
if sock:
sock.close()
开发者ID:paullill,项目名称:scripts,代码行数:27,代码来源:rstat.py
示例18: open_local_file
def open_local_file(self, req):
import mimetypes
import email
host = req.get_host()
file = req.get_selector()
localfile = urllib.url2pathname(file)
stats = os.stat(localfile)
size = stats[stat.ST_SIZE]
modified = email.Utils.formatdate(stats[stat.ST_MTIME])
mtype = mimetypes.guess_type(file)[0]
if host:
host, port = urllib.splitport(host)
if port or socket.gethostbyname(host) not in self.get_names():
raise urllib2.URLError('file not on local host')
fo = open(localfile,'rb')
brange = req.headers.get('Range', None)
brange = range_header_to_tuple(brange)
assert brange != ()
if brange:
(fb, lb) = brange
if lb == '':
lb = size
if fb < 0 or fb > size or lb > size:
raise RangeError('Requested Range Not Satisfiable')
size = (lb - fb)
fo = RangeableFileObject(fo, (fb, lb))
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-Modified: %s\n' %
(mtype or 'text/plain', size, modified))
return urllib.addinfourl(fo, headers, 'file:'+file)
开发者ID:MezzLabs,项目名称:mercurial,代码行数:30,代码来源:byterange.py
示例19: __send
def __send(self, scgireq):
scheme, netloc, path, query, frag = urlparse.urlsplit(self.url)
host, port = urllib.splitport(netloc)
# ~ print '>>>', (netloc, host, port)
if netloc:
addrinfo = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
assert len(addrinfo) == 1, "There's more than one? %r" % addrinfo
# ~ print addrinfo
sock = socket.socket(*addrinfo[0][:3])
sock.connect(addrinfo[0][4])
else:
# if no host then assume unix domain socket
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path)
sock.send(scgireq)
recvdata = resp = sock.recv(1024)
while recvdata != "":
recvdata = sock.recv(1024)
# ~ print 'Trying to receive more: %r'%recvdata
resp += recvdata
sock.close()
return resp
开发者ID:wertel,项目名称:pyrt,代码行数:26,代码来源:xmlrpc2scgi.py
示例20: request
def request(self, method, url, body=None, headers={}):
"""
Make CONNECT request to proxy.
"""
proto, rest = urllib.splittype(url)
if proto is None:
raise ValueError, "unknown URL type: %s" % url
# Get hostname.
host = urllib.splithost(rest)[0]
# Get port of one
host, port = urllib.splitport(host)
# When no port use hardcoded.
if port is None:
try:
port = self._ports[proto]
except KeyError:
raise ValueError, "unknown protocol for: %s" % url
# Remember.
self._real_host = host
self._real_port = port
# Remember auth if there.
if headers.has_key("Proxy-Authorization"):
self._proxy_authorization = headers["Proxy-Authorization"]
del headers["Proxy-Authorization"]
else:
self._proxy_authorization = None
httplib.HTTPConnection.request(self, method, url, body, headers)
开发者ID:andreashe,项目名称:mygitsite,代码行数:34,代码来源:proxysupport.py
注:本文中的urllib.splitport函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论