本文整理汇总了Python中urllib.splitnport函数的典型用法代码示例。如果您正苦于以下问题:Python splitnport函数的具体用法?Python splitnport怎么用?Python splitnport使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了splitnport函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: processRequest
def processRequest(self,method=None,url=None,data="",headers={}):
conf = desktop.Config()
if not conf['proxy']:
self.proxy_host = None
self.proxy_port = None
else:
self.proxy_host = conf['proxy']['proxy']
self.proxy_port = conf['proxy']['proxy_port']
socket.setdefaulttimeout(self.http_timeout)
(protocol,resource) = urllib.splittype(url)
(hostport,path) = urllib.splithost(resource)
connexion = None
if protocol.lower() == "http":
(host,port) = urllib.splitnport(hostport, 80)
import httplib
if self.proxy_host != None and self.proxy_port != None :
connexion = HTTPConnection(self.proxy_host, self.proxy_port, timeout=self.http_timeout)
path = url
else:
connexion = HTTPConnection(host, port, timeout=self.http_timeout)
elif protocol.lower() == "https" :
(host,port) = urllib.splitnport(hostport, 443)
connexion = HTTPSConnection(host, port)
if self.proxy_host != None and self.proxy_port != None :
connexion.http_proxy = [self.proxy_host,
self.proxy_port]
else:
assert False, "Unhandled Protocol, please use HTTP or HTTPS"
connexion.connect()
connexion.request(method, path, body=data, headers=headers)
response = connexion.getresponse()
return response
开发者ID:delaballe,项目名称:KaraCos-Desktop,代码行数:35,代码来源:http.py
示例2: __init__
def __init__(self, hub, url):
self.hub = hub
parsed = urlparse.urlsplit(url)
assert parsed.query == ''
assert parsed.fragment == ''
default_port = 443 if parsed.scheme == 'https' else 80
host, port = urllib.splitnport(parsed.netloc, default_port)
self.socket = self.hub.tcp.connect(host=host, port=port)
# TODO: this shouldn't block on the SSL handshake
if parsed.scheme == 'https':
# TODO: what a mess
conn = self.socket.sender.fd.conn
conn = ssl.wrap_socket(conn)
conn.setblocking(0)
self.socket.sender.fd.conn = conn
self.socket.recver.sep = '\r\n'
self.agent = 'vanilla/%s' % vanilla.meta.__version__
self.default_headers = dict([
('Accept', '*/*'),
('User-Agent', self.agent),
('Host', parsed.netloc), ])
self.requests = self.hub.router().pipe(self.hub.queue(10))
self.requests.pipe(self.hub.consumer(self.writer))
self.responses = self.hub.router().pipe(self.hub.queue(10))
self.responses.pipe(self.hub.consumer(self.reader))
开发者ID:nikolayvoronchikhin,项目名称:vanilla,代码行数:34,代码来源:http.py
示例3: request
def request(self, method, url, body=None, headers=None):
"""
Send an HTTP or HTTPS request, as appropriate; and parse the request
URL in case it's a proxied request.
"""
protocol = self.scheme
headers = headers or {}
if self.is_proxy:
# if it's a proxied connection, a full URL is passed in, so parse it
protocol, remainder = urllib.splittype(url)
if protocol not in (HTTP_SCHEME, HTTPS_SCHEME):
raise ValueError('Unsupported URL protocol: %s' % url)
host, remainder = urllib.splithost(remainder)
host, port = urllib.splitnport(host)
port = port or self._ports[protocol]
self._real_host = host
self._real_port = port
if protocol == HTTP_SCHEME:
httplib.HTTPConnection.request(self, method, url, body, headers)
else: # HTTPS_SCHEME
httplib.HTTPSConnection.request(self, method, url, body, headers)
开发者ID:msurovcak,项目名称:pulp,代码行数:27,代码来源:urllib2_utils.py
示例4: _parse_url
def _parse_url(url):
""" Returns scheme, host, port, path. """
scheme, netloc, path, _params, _query, _frag = rhnLib.parseUrl(url)
host, port = urllib.splitnport(netloc)
if port <= 0:
port = None
return scheme, host, port, path
开发者ID:NehaRawat,项目名称:spacewalk,代码行数:7,代码来源:rhnShared.py
示例5: main
def main():
optlist, args = getopt.getopt(sys.argv[1:], 'sd:')
shared = 0
depth = 'infinity'
for opt, value in optlist:
if opt == '-d':
if value not in ['0', '1', 'infinity']:
print 'ERROR: "%s" is not a valid depth' % value
sys.exit(1)
depth = value
elif opt == '-s':
shared = 1
if len(args) < 2 or len(args) > 3:
usage()
command = args[0]
if not commands.has_key(command):
usage()
url = args[1]
extra = args[2:]
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
host, port = urllib.splitnport(netloc, 80)
conn = davlib.DAV(host, port)
commands[command](conn, path, shared, depth, extra)
开发者ID:AzerTyQsdF,项目名称:osx,代码行数:30,代码来源:lock.py
示例6: getChild
def getChild (self, path, request, ) :
_host_orig = urllib.splitport(request.getHeader("host"), )[0]
_key = "%s:%s" % ("https" if request.isSecure() else "http", _host_orig, )
try :
_to = self._config.get(_key, "to", )
except ConfigParser.NoSectionError :
return BadRequestErrorPage()
_p = urlparse.urlsplit(_to, )
(_host, _port, ) = urllib.splitnport(_p.netloc, 443 if _p.scheme == "https" else 80, )
if not self._without_x_forwarded_for :
_headers = request.getAllHeaders()
_xf = ("%s, " % self.RE_REMOVE_COMMA.sub(
"", _headers.get('x-forwarded-for')).strip()
) if "x-forwarded-for" in _headers else ""
_x_forwarded_for = _xf + request.client.host
_x_forwarded_proto = "https" if request.isSecure() else "http"
request.received_headers['x-forwarded-for'] = _x_forwarded_for
request.received_headers['x-forwarded-proto'] = _x_forwarded_proto
request.received_headers['host-original'] = _host_orig
request.content.seek(0, 0)
return ReverseProxyResource(
_host,
_port,
"/" + path if path else "/",
reactor=FakeReactor(self._timeout, ),
)
开发者ID:spikeekips,项目名称:pyhttpproxy,代码行数:31,代码来源:tap.py
示例7: __init__
def __init__(self, url, _client=None):
Transport.__init__(self, url)
(scheme, _, loc, _, _) = urlparse.urlsplit(url)
assert scheme == "git"
hostport, self._path = urllib.splithost(loc)
(self._host, self._port) = urllib.splitnport(hostport, git.protocol.TCP_GIT_PORT)
self._client = _client
开发者ID:harsh-a1,项目名称:repeater-testing,代码行数:7,代码来源:remote.py
示例8: run
def run(self, netloc='localhost:8000', reload=True, log=True):
"""Run the CherryPy server."""
from django.conf import settings
from django.core.handlers.wsgi import WSGIHandler
from paste.translogger import TransLogger
host, port = urllib.splitnport(netloc, 80)
host = socket.gethostbyname(host)
cherrypy.config.update({
'server.socket_host': host,
'server.socket_port': port,
'log.screen': log,
'engine.autoreload_on': reload,
})
self.cfg_assets(settings.MEDIA_URL, settings.MEDIA_ROOT)
self.cfg_assets(settings.STATIC_URL, settings.STATIC_ROOT)
self.cfg_favicon(settings.STATIC_ROOT)
app = WSGIHandler()
app = TransLogger(app, logger_name='cherrypy.access',
setup_console_handler=False)
if self.domains:
app = cherrypy.wsgi.VirtualHost(app, self.domains)
cherrypy.tree.graft(app)
cherrypy.engine.start()
cherrypy.engine.block()
开发者ID:bussiere,项目名称:PirateBoxMessageBoard,代码行数:25,代码来源:cherypyd.py
示例9: _parse_url
def _parse_url(self, url):
""" Returns scheme, host, port, path. """
scheme, netloc, path, params, query, frag = rhnLib.parseUrl(url)
host, port = urllib.splitnport(netloc)
if (port <= 0):
port = None
return scheme, host, port, path
开发者ID:pombredanne,项目名称:spacewalk-1,代码行数:7,代码来源:rhnShared.py
示例10: _connect_ssh
def _connect_ssh(self, host):
(user, host) = urllib.splituser(host)
if user is not None:
(user, password) = urllib.splitpassword(user)
else:
password = None
(host, port) = urllib.splitnport(host, 22)
self._tunnel = get_ssh_vendor().connect_ssh(user, password, host, port, ["svnserve", "-t"])
return (self._tunnel.recv, self._tunnel.send)
开发者ID:lygstate,项目名称:subvertpy,代码行数:9,代码来源:ra_svn.py
示例11: makeService
def makeService(self, options):
"""
Constructs a lobber storage node service
"""
lobber = LobberClient(options['lobberUrl'],
options['lobberKey'],
options['torrentDir'].rstrip(os.sep),
options['announceUrl'])
transmission = TransmissionClient(options['transmissionRpc'],
options['transmissionDownloadsDir'])
torrentDownloader = TorrentDownloader(lobber,transmission,
options.destinations,
options['trackerProxyTrackerUrl'],
options['trackerProxyListenOn'])
stompService = internet.TCPClient(options['stomp_host'],
options['stomp_port'],
torrentDownloader)
self.getter = {}
for url in options.urls:
log.msg("Pulling RSS/Torrent from "+url)
self.getter[url] = task.LoopingCall(torrentDownloader.url_handler.load_url,url)
self.getter[url].start(30,True)
transmissionSweeper = TransmissionSweeper(lobber, transmission,
remove_limit=options['removeLimit'])
self.sweeper = task.LoopingCall(transmissionSweeper.clean_done)
reactor.callLater(30/2, self.sweeper.start, 30, True)
if options['dropbox']:
dropboxWatcher = DropboxWatcher(lobber,transmission,
options['dropbox'],
register=options['register'],
acl=options['acl'],
move=not options['keepData'])
self.dropbox = task.LoopingCall(dropboxWatcher.watch_dropbox)
self.dropbox.start(5,True)
if options['trackerProxyTrackerUrl']:
netloc, path = urlparse(options['trackerProxyTrackerUrl'])[1:3]
tracker_host, tracker_port = splitnport(netloc, 443)
proxy = server.Site(
ReverseProxyTLSResource(
tracker_host,
tracker_port,
'',
#path_rewrite=[['[^\?]+', path]],
tls=True, # FIXME: Base on urlparse()[0].
headers={'X_LOBBER_KEY': options['lobberKey']}))
bindto = options['trackerProxyListenOn'].split(':')
bindto_host = bindto[0]
bindto_port = int(bindto[1])
reactor.listenTCP(bindto_port, proxy, interface=bindto_host)
return stompService
开发者ID:SUNET,项目名称:lobber-storagenode,代码行数:56,代码来源:lobberstoragenode_plugin.py
示例12: get_host
def get_host(self):
_Request.get_host(self)
if self.port is None:
if self.get_type() == 'http':
default = 80
elif self.get_type() == 'https':
default = 443
else:
raise ValueError('Unsupported type.', self.get_type())
self.host, self.port = splitnport(self.host, default)
return self.host
开发者ID:mcruse,项目名称:monotone,代码行数:11,代码来源:request.py
示例13: remotefile_from_str
def remotefile_from_str(s):
import urllib
protocol, rest = s.split("://")
host, rest = urllib.splithost("//" + rest)
host, port = urllib.splitnport(host, defport=21)
file, t = urllib.splitquery(rest)
s = Server(hostname=host, port=port)
dir, fn = os.path.split(file)
rf = File(filename=fn, dir=dir, type="f", server=s)
return rf
开发者ID:tumist,项目名称:flashftp,代码行数:11,代码来源:ftp.py
示例14: fetch
def fetch(self, command, **params):
"""
@return object
"""
# Perform HTTP request
request_body = {}
request_body["Action"] = command
request_body["Version"] = self.environment.api_version
request_body['AuthVersion'] = '3'
if self.environment.env_id and self.environment.env_id != 'None':
request_body['EnvID'] = self.environment.env_id
#request_body['SysDebug'] = '1'
if {} != params :
for key, value in params.items():
if isinstance(value, dict):
for k,v in value.items():
request_body['%s[%s]'%(key,k)] = v
else:
request_body[key] = value
if not self.environment.auth_type or self.environment.auth_type == 'password':
request_body["KeyID"] = self.environment.key_id
signature, timestamp = sign_http_request_v3(request_body, self.environment.key_id, self.environment.key)
request_body["TimeStamp"] = timestamp
request_body["Signature"] = signature
elif self.environment.auth_type == 'ldap':
request_body["Login"] = self.environment.ldap_login
request_body["AuthType"] = self.environment.auth_type
password =self.environment.ldap_password or os.environ.get('SCALR_API_LDAP_PASSWORD')
if not password:
password = getpass.getpass('Password: ')
request_body["Password"] = password
post_data = urlencode(request_body)
self._logger.debug('POST URL: \n%s' % self.environment.url)
self._logger.debug('POST DATA: \n%s' % post_data)
response = None
try:
req = Request(self.environment.url, post_data, {})
response = urlopen(req)
except URLError, e:
if isinstance(e, HTTPError):
resp_body = e.read() if e.fp is not None else ""
raise ScalrAPIError("Request failed. %s. URL: %s. Service message: %s" % (e, self.environment.url, resp_body))
else:
host, port = splitnport(req.host, req.port or 443)
raise ScalrAPIError("Cannot connect to Scalr on %s:%s. %s"
% (host, port, str(e)))
开发者ID:AnyBucket,项目名称:pecha,代码行数:53,代码来源:client.py
示例15: savefilename
def savefilename(self, url):
type, rest = urllib.splittype(url)
host, path = urllib.splithost(rest)
while path[:1] == "/": path = path[1:]
user, host = urllib.splituser(host)
host, port = urllib.splitnport(host)
host = string.lower(host)
if not path or path[-1] == "/":
path = path + "index.html"
if os.sep != "/":
path = string.join(string.split(path, "/"), os.sep)
path = os.path.join(host, path)
return path
开发者ID:asottile,项目名称:ancient-pythons,代码行数:13,代码来源:websucker.py
示例16: __init__
def __init__(self, token, uri_prefix=DEFAULT_URI_PREFIX, log=Log()):
"""Sets up access to a service that provides the Google Meter API.
Args:
token: AuthSub token to use for all requests
uri_prefix: URI prefix under which feeds are located
log: Log object to which messages will be logged
"""
self.token = token
self.scheme, hostport, self.path, _, _, _ = urlparse.urlparse(uri_prefix)
default_port = {'http': 80, 'https': 443}[self.scheme]
self.host, self.port = urllib.splitnport(hostport, default_port)
self.log = log
开发者ID:johnblackmore,项目名称:cc128-php-extractor,代码行数:13,代码来源:google_meter.py
示例17: savefilename
def savefilename(self, url):
type, rest = urllib.splittype(url)
host, path = urllib.splithost(rest)
path = path.lstrip("/")
user, host = urllib.splituser(host)
host, port = urllib.splitnport(host)
host = host.lower()
if not path or path[-1] == "/":
path = path + "index.html"
if os.sep != "/":
path = os.sep.join(path.split("/"))
path = os.path.join(host, path)
return path
开发者ID:dr4ke616,项目名称:custom_python,代码行数:13,代码来源:websucker.py
示例18: __init__
def __init__(self, host, transport, close_callable=None):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.host = host
self.getparser = transport.getparser
self.close_callable = close_callable
self.deferred = None
self.waiting_on_response = False
self.rx_buf = ""
self.tx_buf = ""
# Connect to host and POST request
self.connect(urllib.splitnport(host, 80))
开发者ID:mgenti,项目名称:apy,代码行数:13,代码来源:AsyncXMLRPCTransport.py
示例19: _connect
def _connect(self, host):
(host, port) = urllib.splitnport(host, SVN_PORT)
sockaddrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, 0)
self._socket = None
for (family, socktype, proto, canonname, sockaddr) in sockaddrs:
try:
self._socket = socket.socket(family, socktype, proto)
self._socket.connect(sockaddr)
except socket.error, err:
if self._socket is not None:
self._socket.close()
self._socket = None
continue
break
开发者ID:lygstate,项目名称:subvertpy,代码行数:15,代码来源:ra_svn.py
示例20: openHotlineURL
def openHotlineURL(url, nickname=None):
urlparse.uses_netloc.append('hotline')
schema, host, path, query, fragment = urlparse.urlsplit(url)
assert schema == 'hotline'
assert query == ''
assert fragment == ''
kws = {}
if nickname:
kws['nickname'] = nickname
if host.count('@'):
userstr, host = urllib.splituser(host)
kws['username'], kws['password'] = urllib.splittype(userstr)
kws['cwd'] = path
cc = ClientCreator(reactor, HotlineClient, **kws)
dd = cc.connectTCP(*urllib.splitnport(host, 5500))
return dd
开发者ID:myers,项目名称:myers.github.io,代码行数:16,代码来源:hotline.py
注:本文中的urllib.splitnport函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论