本文整理汇总了Python中twisted.internet.abstract.isIPv6Address函数的典型用法代码示例。如果您正苦于以下问题:Python isIPv6Address函数的具体用法?Python isIPv6Address怎么用?Python isIPv6Address使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了isIPv6Address函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, host, port, bindAddress, connector, reactor=None):
# BaseClient.__init__ is invoked later
self.connector = connector
self.addr = (host, port)
whenDone = self.resolveAddress
err = None
skt = None
if abstract.isIPAddress(host):
self._requiresResolution = False
elif abstract.isIPv6Address(host):
self._requiresResolution = False
self.addr = _resolveIPv6(host, port)
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
else:
self._requiresResolution = True
try:
skt = self.createInternetSocket()
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
if whenDone and bindAddress is not None:
try:
if abstract.isIPv6Address(bindAddress[0]):
bindinfo = _resolveIPv6(*bindAddress)
else:
bindinfo = bindAddress
skt.bind(bindinfo)
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
self._finishInit(whenDone, skt, err, reactor)
开发者ID:dansgithubuser,项目名称:constructicon,代码行数:34,代码来源:tcp.py
示例2: test_scopeID
def test_scopeID(self):
"""
An otherwise valid IPv6 address literal may also include a C{"%"}
followed by an arbitrary scope identifier.
"""
self.assertTrue(isIPv6Address("fe80::1%eth0"))
self.assertTrue(isIPv6Address("fe80::2%1"))
self.assertTrue(isIPv6Address("fe80::3%en2"))
开发者ID:0004c,项目名称:VTK,代码行数:8,代码来源:test_abstract.py
示例3: test_invalidWithScopeID
def test_invalidWithScopeID(self):
"""
An otherwise invalid IPv6 address literal is still invalid with a
trailing scope identifier.
"""
self.assertFalse(isIPv6Address("%eth0"))
self.assertFalse(isIPv6Address(":%eth0"))
self.assertFalse(isIPv6Address("hello%eth0"))
开发者ID:0004c,项目名称:VTK,代码行数:8,代码来源:test_abstract.py
示例4: write
def write(self, datagram, addr=None):
"""
Write a datagram.
@type datagram: L{bytes}
@param datagram: The datagram to be sent.
@type addr: L{tuple} containing L{str} as first element and L{int} as
second element, or L{None}
@param addr: A tuple of (I{stringified IPv4 or IPv6 address},
I{integer port number}); can be L{None} in connected mode.
"""
if self._connectedAddr:
assert addr in (None, self._connectedAddr)
try:
return self.socket.send(datagram)
except socket.error as se:
no = se.args[0]
if no == EINTR:
return self.write(datagram)
elif no == EMSGSIZE:
raise error.MessageLengthError("message too long")
elif no == ECONNREFUSED:
self.protocol.connectionRefused()
else:
raise
else:
assert addr != None
if (not abstract.isIPAddress(addr[0])
and not abstract.isIPv6Address(addr[0])
and addr[0] != "<broadcast>"):
raise error.InvalidAddressError(
addr[0],
"write() only accepts IP addresses, not hostnames")
if ((abstract.isIPAddress(addr[0]) or addr[0] == "<broadcast>")
and self.addressFamily == socket.AF_INET6):
raise error.InvalidAddressError(
addr[0],
"IPv6 port write() called with IPv4 or broadcast address")
if (abstract.isIPv6Address(addr[0])
and self.addressFamily == socket.AF_INET):
raise error.InvalidAddressError(
addr[0], "IPv4 port write() called with IPv6 address")
try:
return self.socket.sendto(datagram, addr)
except socket.error as se:
no = se.args[0]
if no == EINTR:
return self.write(datagram, addr)
elif no == EMSGSIZE:
raise error.MessageLengthError("message too long")
elif no == ECONNREFUSED:
# in non-connected UDP ECONNREFUSED is platform dependent, I
# think and the info is not necessarily useful. Nevertheless
# maybe we should call connectionRefused? XXX
return
else:
raise
开发者ID:alfonsjose,项目名称:international-orders-app,代码行数:58,代码来源:udp.py
示例5: write
def write(self, datagram, addr=None):
"""
Write a datagram.
@param addr: should be a tuple (ip, port), can be None in connected
mode.
"""
if self._connectedAddr:
assert addr in (None, self._connectedAddr)
try:
return self.socket.send(datagram)
except socket.error as se:
no = se.args[0]
if no == errno.WSAEINTR:
return self.write(datagram)
elif no == errno.WSAEMSGSIZE:
raise error.MessageLengthError("message too long")
elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
self.protocol.connectionRefused()
else:
raise
else:
assert addr != None
if (not isIPAddress(addr[0]) and not isIPv6Address(addr[0])
and addr[0] != "<broadcast>"):
raise error.InvalidAddressError(
addr[0],
"write() only accepts IP addresses, not hostnames")
if isIPAddress(addr[0]) and self.addressFamily == socket.AF_INET6:
raise error.InvalidAddressError(
addr[0], "IPv6 port write() called with IPv4 address")
if isIPv6Address(addr[0]) and self.addressFamily == socket.AF_INET:
raise error.InvalidAddressError(
addr[0], "IPv4 port write() called with IPv6 address")
try:
return self.socket.sendto(datagram, addr)
except socket.error as se:
no = se.args[0]
if no == errno.WSAEINTR:
return self.write(datagram, addr)
elif no == errno.WSAEMSGSIZE:
raise error.MessageLengthError("message too long")
elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
# in non-connected UDP ECONNREFUSED is platform dependent,
# I think and the info is not necessarily useful.
# Nevertheless maybe we should call connectionRefused? XXX
return
else:
raise
开发者ID:BarnetteME1,项目名称:indeed_scraper,代码行数:51,代码来源:udp.py
示例6: apply
def apply(self, actionType, user, param, settingUser, uid, adding, *params, **kw):
if adding:
userHost = user.realHost
if isIPv6Address(userHost):
user.changeHost("cloak", self.applyIPv6Cloak(userHost))
elif isIPAddress(userHost):
user.changeHost("cloak", self.applyIPv4Cloak(userHost))
else:
if "." in userHost:
user.changeHost("cloak", self.applyHostCloak(userHost, user.ip))
else:
if isIPv6Address(user.ip):
return self.applyIPv6Cloak(user.ip)
else:
return self.applyIPv4Cloak(user.ip)
else:
user.resetHost("cloak")
开发者ID:Heufneutje,项目名称:txircd,代码行数:17,代码来源:cloaking.py
示例7: __init__
def __init__(self, port, factory, backlog=50, interface='', reactor=None):
self.port = port
self.factory = factory
self.backlog = backlog
self.interface = interface
self.reactor = reactor
if isIPv6Address(interface):
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
开发者ID:GunioRobot,项目名称:twisted,代码行数:9,代码来源:tcp.py
示例8: _setAddressFamily
def _setAddressFamily(self):
"""
Resolve address family for the socket.
"""
if abstract.isIPv6Address(self.interface):
self.addressFamily = socket.AF_INET6
elif abstract.isIPAddress(self.interface):
self.addressFamily = socket.AF_INET
elif self.interface:
raise error.InvalidAddressError(self.interface, "not an IPv4 or IPv6 address.")
开发者ID:ssilverek,项目名称:kodb,代码行数:10,代码来源:udp.py
示例9: connect
def connect(self, host, port):
"""
'Connect' to remote server.
"""
if self._connectedAddr:
raise RuntimeError("already connected, reconnecting is not currently supported")
if not abstract.isIPAddress(host) and not abstract.isIPv6Address(host):
raise error.InvalidAddressError(host, "not an IPv4 or IPv6 address.")
self._connectedAddr = (host, port)
self.socket.connect((host, port))
开发者ID:ssilverek,项目名称:kodb,代码行数:10,代码来源:udp.py
示例10: __init__
def __init__(self, hostname, ctx):
self._ctx = ctx
if isIPAddress(hostname) or isIPv6Address(hostname):
self._hostnameBytes = hostname.encode('ascii')
self._sendSNI = False
else:
self._hostnameBytes = _idnaBytes(hostname)
self._sendSNI = True
ctx.set_info_callback(_tolerateErrors(self._identityVerifyingInfoCallback))
开发者ID:matrix-org,项目名称:synapse,代码行数:11,代码来源:context_factory.py
示例11: listenTCP
def listenTCP(self, port, factory, backlog=50, interface=''):
"""
Fake L{reactor.listenTCP}, that logs the call and returns an
L{IListeningPort}.
"""
self.tcpServers.append((port, factory, backlog, interface))
if isIPv6Address(interface):
address = IPv6Address('TCP', interface, port)
else:
address = IPv4Address('TCP', '0.0.0.0', port)
return _FakePort(address)
开发者ID:AmirKhooj,项目名称:VTK,代码行数:11,代码来源:proto_helpers.py
示例12: connect
def connect(self, protocolFactory):
"""
Implement L{IStreamClientEndpoint.connect} to connect via TCP,
once the hostname resolution is done.
"""
if isIPv6Address(self._host):
d = self._resolvedHostConnect(self._host, protocolFactory)
else:
d = self._nameResolution(self._host)
d.addCallback(lambda result: result[0][self._GAI_ADDRESS][self._GAI_ADDRESS_HOST])
d.addCallback(self._resolvedHostConnect, protocolFactory)
return d
开发者ID:pombredanne,项目名称:toppatch,代码行数:12,代码来源:endpoints.py
示例13: connectTCP
def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
"""
Fake L{reactor.connectTCP}, that logs the call and returns an
L{IConnector}.
"""
self.tcpClients.append((host, port, factory, timeout, bindAddress))
if isIPv6Address(host):
conn = _FakeConnector(IPv6Address('TCP', host, port))
else:
conn = _FakeConnector(IPv4Address('TCP', host, port))
factory.startedConnecting(conn)
return conn
开发者ID:AmirKhooj,项目名称:VTK,代码行数:12,代码来源:proto_helpers.py
示例14: applyHostCloak
def applyHostCloak(self, host, ip):
# Find the last segments of the hostname.
index = len(host[::-1].split(".", 3)[-1])
# Cloak the first part of the host and leave the last segments alone.
hostmask = "{}-{}{}".format(self.ircd.config.get("cloaking_prefix", "txircd"), sha256(self.ircd.config.get("cloaking_salt", "") + host[:index]).hexdigest()[:8], host[index:])
# This is very rare since we only leave up to 3 segments uncloaked, but make sure the end result isn't too long.
if len(hostmask) > self.ircd.config.get("hostname_length", 64):
if isIPv6Address(ip):
return self.applyIPv6Cloak(ip)
else:
return self.applyIPv4Cloak(ip)
else:
return hostmask
开发者ID:Heufneutje,项目名称:txircd,代码行数:13,代码来源:cloaking.py
示例15: connect
def connect(self, host, port):
"""
'Connect' to remote server.
"""
if self._connectedAddr:
raise RuntimeError(
"already connected, reconnecting is not currently supported "
"(talk to itamar if you want this)")
if not isIPAddress(host) and not isIPv6Address(host):
raise error.InvalidAddressError(
host, 'not an IPv4 or IPv6 address.')
self._connectedAddr = (host, port)
self.socket.connect((host, port))
开发者ID:Architektor,项目名称:PySnip,代码行数:13,代码来源:udp.py
示例16: _setRealAddress
def _setRealAddress(self, addr):
"""
Set the real IP address for this client.
Once the IP address is set, the socket is created using the correct
address family.
"""
if abstract.isIPv6Address(addr):
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
self.realAddress = (addr, self.addr[1])
# create the socket and wait finish init after that
self.initConnection()
开发者ID:c0ns0le,项目名称:zenoss-4,代码行数:13,代码来源:tcp.py
示例17: connectionMade
def connectionMade(self):
twunnel.logger.log(3, "trace: HTTPSTunnelOutputProtocol.connectionMade")
request = "CONNECT "
if isIPv6Address(self.factory.address) == True:
request = request + "[" + str(self.factory.address) + "]:" + str(self.factory.port)
else:
request = request + str(self.factory.address) + ":" + str(self.factory.port)
request = request + " HTTP/1.1\r\n"
if self.factory.configuration["PROXY_SERVER"]["ACCOUNT"]["NAME"] != "":
request = request + "Proxy-Authorization: Basic " + base64.standard_b64encode(self.factory.configuration["PROXY_SERVER"]["ACCOUNT"]["NAME"] + ":" + self.factory.configuration["PROXY_SERVER"]["ACCOUNT"]["PASSWORD"]) + "\r\n"
request = request + "\r\n"
self.transport.write(request)
开发者ID:cvhau,项目名称:twunnel,代码行数:18,代码来源:proxy_server__https.py
示例18: preprocess
def preprocess(self, request):
request.headers = request.getAllHeaders()
# Twisted annoyingly different between Py2/Py3
# which requires us to handle this specially in each
# case.
if sys.version[0] == '2':
request.hostname = request.getRequestHostname().decode('utf-8')
else:
request.hostname = request.getRequestHostname()
request.hostname = request.hostname.split(b':')[0]
request.port = request.getHost().port
if (request.hostname == b'localhost' or
isIPAddress(request.hostname) or
isIPv6Address(request.hostname)):
request.tid = 1
else:
request.tid = State.tenant_hostname_id_map.get(request.hostname, 1)
request.client_ip = request.headers.get(b'gl-forwarded-for')
request.client_proto = b'https'
if request.client_ip is None:
request.client_ip = request.getClientIP()
request.client_proto = b'http'
request.client_using_tor = request.client_ip in State.tor_exit_set or \
request.port == 8083
if isinstance(request.client_ip, binary_type):
request.client_ip = request.client_ip.decode('utf-8')
if 'x-tor2web' in request.headers:
request.client_using_tor = False
request.client_ua = request.headers.get(b'user-agent', u'')
request.language = text_type(self.detect_language(request))
if b'multilang' in request.args:
request.language = None
开发者ID:chojar,项目名称:GlobaLeaks,代码行数:42,代码来源:api.py
示例19: processDataState2
def processDataState2(self):
twunnel.logger.log(3, "trace: SOCKS5TunnelOutputProtocol.processDataState2")
addressType = 0x03
if isIPAddress(self.factory.address) == True:
addressType = 0x01
else:
if isIPv6Address(self.factory.address) == True:
addressType = 0x04
request = struct.pack("!BBB", 0x05, 0x01, 0x00)
if addressType == 0x01:
address = self.factory.address
address = socket.inet_pton(socket.AF_INET, address)
address, = struct.unpack("!I", address)
request = request + struct.pack("!BI", 0x01, address)
else:
if addressType == 0x03:
address = self.factory.address
addressLength = len(address)
request = request + struct.pack("!BB%ds" % addressLength, 0x03, addressLength, address)
else:
if addressType == 0x04:
address = self.factory.address
address = socket.inet_pton(socket.AF_INET6, address)
address1, address2, address3, address4 = struct.unpack("!IIII", address)
request = request + struct.pack("!BIIII", 0x04, address1, address2, address3, address4)
port = self.factory.port
request = request + struct.pack("!H", port)
self.transport.write(request)
self.dataState = 3
return True
开发者ID:cvhau,项目名称:twunnel,代码行数:41,代码来源:proxy_server__socks5.py
示例20: _query
def _query(self, *args):
"""
Get a new L{DNSDatagramProtocol} instance from L{_connectedProtocol},
issue a query to it using C{*args}, and arrange for it to be
disconnected from its transport after the query completes.
@param *args: Positional arguments to be passed to
L{DNSDatagramProtocol.query}.
@return: A L{Deferred} which will be called back with the result of the
query.
"""
if isIPv6Address(args[0][0]):
protocol = self._connectedProtocol(interface='::')
else:
protocol = self._connectedProtocol()
d = protocol.query(*args)
def cbQueried(result):
protocol.transport.stopListening()
return result
d.addBoth(cbQueried)
return d
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:22,代码来源:client.py
注:本文中的twisted.internet.abstract.isIPv6Address函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论