本文整理汇总了Python中twisted.internet.abstract.isIPAddress函数的典型用法代码示例。如果您正苦于以下问题:Python isIPAddress函数的具体用法?Python isIPAddress怎么用?Python isIPAddress使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了isIPAddress函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_decimalDotted
def test_decimalDotted(self):
"""
L{isIPAddress} should return C{True} for any decimal dotted
representation of an IPv4 address.
"""
self.assertTrue(isIPAddress('0.1.2.3'))
self.assertTrue(isIPAddress('252.253.254.255'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:7,代码来源:test_abstract.py
示例2: test_invalidLetters
def test_invalidLetters(self):
"""
L{isIPAddress} should return C{False} for any non-decimal dotted
representation including letters.
"""
self.assertFalse(isIPAddress('a.2.3.4'))
self.assertFalse(isIPAddress('1.b.3.4'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:7,代码来源:test_abstract.py
示例3: test_nonASCII
def test_nonASCII(self):
"""
All IP addresses must be encodable as ASCII; non-ASCII should result in
a L{False} result.
"""
self.assertFalse(isIPAddress(b'\xff.notascii'))
self.assertFalse(isIPAddress(u'\u4321.notascii'))
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:7,代码来源:test_abstract.py
示例4: test_unicodeAndBytes
def test_unicodeAndBytes(self):
"""
L{isIPAddress} evaluates ASCII-encoded bytes as well as text.
"""
self.assertFalse(isIPAddress(b'256.0.0.0'))
self.assertFalse(isIPAddress(u'256.0.0.0'))
self.assertTrue(isIPAddress(b'252.253.254.255'))
self.assertTrue(isIPAddress(u'252.253.254.255'))
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:8,代码来源:test_abstract.py
示例5: test_invalidNegative
def test_invalidNegative(self):
"""
L{isIPAddress} should return C{False} for negative decimal values.
"""
self.assertFalse(isIPAddress('-1'))
self.assertFalse(isIPAddress('1.-2'))
self.assertFalse(isIPAddress('1.2.-3'))
self.assertFalse(isIPAddress('1.2.-3.4'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:8,代码来源:test_abstract.py
示例6: write
def write(self, datagram, addr=None):
"""
Write a datagram.
@type datagram: L{bytes}
@param datagram: The datagram to be sent.
@type addr: L{tuple} containing L{str} as first element and L{int} as
second element, or L{None}
@param addr: A tuple of (I{stringified IPv4 or IPv6 address},
I{integer port number}); can be L{None} in connected mode.
"""
if self._connectedAddr:
assert addr in (None, self._connectedAddr)
try:
return self.socket.send(datagram)
except socket.error as se:
no = se.args[0]
if no == EINTR:
return self.write(datagram)
elif no == EMSGSIZE:
raise error.MessageLengthError("message too long")
elif no == ECONNREFUSED:
self.protocol.connectionRefused()
else:
raise
else:
assert addr != None
if (not abstract.isIPAddress(addr[0])
and not abstract.isIPv6Address(addr[0])
and addr[0] != "<broadcast>"):
raise error.InvalidAddressError(
addr[0],
"write() only accepts IP addresses, not hostnames")
if ((abstract.isIPAddress(addr[0]) or addr[0] == "<broadcast>")
and self.addressFamily == socket.AF_INET6):
raise error.InvalidAddressError(
addr[0],
"IPv6 port write() called with IPv4 or broadcast address")
if (abstract.isIPv6Address(addr[0])
and self.addressFamily == socket.AF_INET):
raise error.InvalidAddressError(
addr[0], "IPv4 port write() called with IPv6 address")
try:
return self.socket.sendto(datagram, addr)
except socket.error as se:
no = se.args[0]
if no == EINTR:
return self.write(datagram, addr)
elif no == EMSGSIZE:
raise error.MessageLengthError("message too long")
elif no == ECONNREFUSED:
# in non-connected UDP ECONNREFUSED is platform dependent, I
# think and the info is not necessarily useful. Nevertheless
# maybe we should call connectionRefused? XXX
return
else:
raise
开发者ID:alfonsjose,项目名称:international-orders-app,代码行数:58,代码来源:udp.py
示例7: test_shortDecimalDotted
def test_shortDecimalDotted(self):
"""
L{isIPAddress} should return C{False} for a dotted decimal
representation with fewer or more than four octets.
"""
self.assertFalse(isIPAddress('0'))
self.assertFalse(isIPAddress('0.1'))
self.assertFalse(isIPAddress('0.1.2'))
self.assertFalse(isIPAddress('0.1.2.3.4'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:9,代码来源:test_abstract.py
示例8: test_invalidPunctuation
def test_invalidPunctuation(self):
"""
L{isIPAddress} should return C{False} for a string containing
strange punctuation.
"""
self.assertFalse(isIPAddress(','))
self.assertFalse(isIPAddress('1,2'))
self.assertFalse(isIPAddress('1,2,3'))
self.assertFalse(isIPAddress('1.,.3,4'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:9,代码来源:test_abstract.py
示例9: test_invalidPositive
def test_invalidPositive(self):
"""
L{isIPAddress} should return C{False} for a string containing
positive decimal values greater than 255.
"""
self.assertFalse(isIPAddress('256.0.0.0'))
self.assertFalse(isIPAddress('0.256.0.0'))
self.assertFalse(isIPAddress('0.0.256.0'))
self.assertFalse(isIPAddress('0.0.0.256'))
self.assertFalse(isIPAddress('256.256.256.256'))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:10,代码来源:test_abstract.py
示例10: write
def write(self, datagram, addr=None):
"""
Write a datagram.
@param addr: should be a tuple (ip, port), can be None in connected
mode.
"""
if self._connectedAddr:
assert addr in (None, self._connectedAddr)
try:
return self.socket.send(datagram)
except socket.error as se:
no = se.args[0]
if no == errno.WSAEINTR:
return self.write(datagram)
elif no == errno.WSAEMSGSIZE:
raise error.MessageLengthError("message too long")
elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
self.protocol.connectionRefused()
else:
raise
else:
assert addr != None
if (not isIPAddress(addr[0]) and not isIPv6Address(addr[0])
and addr[0] != "<broadcast>"):
raise error.InvalidAddressError(
addr[0],
"write() only accepts IP addresses, not hostnames")
if isIPAddress(addr[0]) and self.addressFamily == socket.AF_INET6:
raise error.InvalidAddressError(
addr[0], "IPv6 port write() called with IPv4 address")
if isIPv6Address(addr[0]) and self.addressFamily == socket.AF_INET:
raise error.InvalidAddressError(
addr[0], "IPv4 port write() called with IPv6 address")
try:
return self.socket.sendto(datagram, addr)
except socket.error as se:
no = se.args[0]
if no == errno.WSAEINTR:
return self.write(datagram, addr)
elif no == errno.WSAEMSGSIZE:
raise error.MessageLengthError("message too long")
elif no in (errno.WSAECONNREFUSED, errno.WSAECONNRESET,
ERROR_CONNECTION_REFUSED, ERROR_PORT_UNREACHABLE):
# in non-connected UDP ECONNREFUSED is platform dependent,
# I think and the info is not necessarily useful.
# Nevertheless maybe we should call connectionRefused? XXX
return
else:
raise
开发者ID:BarnetteME1,项目名称:indeed_scraper,代码行数:51,代码来源:udp.py
示例11: render_GET
def render_GET(self, request):
"""Renders the response to peers"""
args = request.args
peer_id = args.get('peer_id', [None])[0]
if peer_id is None:
return self._encode_failure('Peer ID not given')
if len(peer_id) != 20:
return self._encode_failure('Invalid peer id')
# TODO: eliminate ip option
ip_address = args.get('ip', [request.getClientIP()])[0]
if not isIPAddress(ip_address):
return self._encode_failure('Invalid IP Address')
port = args.get('port', [None])[0]
if port is None:
return self._encode_failure('Port number not given')
try:
port = int(port)
except ValueError:
return self._encode_failure('Invalid port number')
self._peer_manager.update_peer(peer_id, ip_address, port)
return self._encode_peers()
开发者ID:ceronman,项目名称:pixtream,代码行数:32,代码来源:resource.py
示例12: get_value
def get_value(self):
val = self.entry.get_text()
if val == "":
return val
if not isIPAddress(val):
raise Exception("Must be a valid IPv4 address.")
return val
开发者ID:clawplach,项目名称:BitBlinder,代码行数:7,代码来源:Entries.py
示例13: __init__
def __init__(self, host, port, connect_callback=None,
disconnect_callback=None):
"""Avoid using this initializer directly; Instead, use the create()
static method, otherwise the messages won't be really delivered.
If you still need to use this directly and want to resolve the host
yourself, remember to call host_resolved() as soon as it's resolved.
@param host: The StatsD server host.
@param port: The StatsD server port.
@param connect_callback: The callback to invoke on connection.
@param disconnect_callback: The callback to invoke on disconnection.
"""
from twisted.internet import reactor
self.reactor = reactor
self.host = host
self.port = port
self.connect_callback = connect_callback
self.disconnect_callback = disconnect_callback
self.data_queue = DataQueue()
self.transport = None
self.transport_gateway = None
if abstract.isIPAddress(host):
self.host_resolved(host)
开发者ID:Weasyl,项目名称:txstatsd,代码行数:28,代码来源:protocol.py
示例14: will_exit_to
def will_exit_to(self, host, port, protocol="TCP"):
"""Figure out whether this relay will allow any exit traffic to host:port.
If host is None, return True if there is any possible host that exit traffic
to that port would be allowed (and vice versa). Behavior when host AND port
are None is undefined (raises exception)"""
if not self.desc:
return False
#protocol is allowed to be DHT or TCP
if protocol == "DHT":
return self.desc.allowsDHT
if protocol != "TCP":
raise ValueError("Bad value for protocol: %s" % (protocol))
#for the vacuous case that we dont actually care
if not host and not port:
return True
#make sure that this host is an int:
intHost = None
if host and type(host) != types.IntType and isIPAddress(str(host)):
intHost = struct.unpack(">I", socket.inet_aton(host))[0]
port = int(port)
#if we're filtering based on both host and port:
if intHost and port:
#check the descriptor's exit policy:
return self.desc.will_exit_to(intHost, port)
#if we're just filtering based on host:
elif intHost:
return self.will_exit_to_host(intHost)
#if we're just filtering based on port:
elif port:
return self.will_exit_to_port(port)
#should be unreachable...
return False
开发者ID:clawplach,项目名称:BitBlinder,代码行数:32,代码来源:Relay.py
示例15: _send_remote_peer_request
def _send_remote_peer_request(self, infohash, callback):
#make sure we have a circuit to send it out on:
if self.circ and self.circ.is_done():
self.circ = None
if not self.circ:
self.circ = self.app.find_or_build_best_circuit(force=True, protocol="DHT")
if self.circ == None:
log_msg("Could not build circuit for DHT remote peer request", 0, "dht")
return
#generate the message: (version, infohash, peerList)
msg = ""
#header:
msg += Basic.write_byte(Node.VERSION)
#infohash:
msg += infohash
#peers:
for host, port in self.knownNodes:
#is this an IP address?
if isIPAddress(host):
msg += Basic.write_byte(0)
msg += struct.pack("!4sH", socket.inet_aton(host), port)
#otherwise, it's a URL that has to be resolved remotely
else:
msg += Basic.write_byte(1)
msg += Basic.write_lenstr(host)
msg += Basic.write_short(port)
self.circ.send_dht_request(msg, self.make_callback_wrapper(callback))
开发者ID:clawplach,项目名称:BitBlinder,代码行数:27,代码来源:Proxy.py
示例16: __init__
def __init__(self, host, port, bindAddress, connector, reactor=None):
# BaseClient.__init__ is invoked later
self.connector = connector
self.addr = (host, port)
whenDone = self.resolveAddress
err = None
skt = None
if abstract.isIPAddress(host):
self._requiresResolution = False
elif abstract.isIPv6Address(host):
self._requiresResolution = False
self.addr = _resolveIPv6(host, port)
self.addressFamily = socket.AF_INET6
self._addressType = address.IPv6Address
else:
self._requiresResolution = True
try:
skt = self.createInternetSocket()
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
if whenDone and bindAddress is not None:
try:
if abstract.isIPv6Address(bindAddress[0]):
bindinfo = _resolveIPv6(*bindAddress)
else:
bindinfo = bindAddress
skt.bind(bindinfo)
except socket.error as se:
err = error.ConnectBindError(se.args[0], se.args[1])
whenDone = None
self._finishInit(whenDone, skt, err, reactor)
开发者ID:dansgithubuser,项目名称:constructicon,代码行数:34,代码来源:tcp.py
示例17: unpickle
def unpickle(obj, fileName):
"""main unpickle function"""
config = configobj.ConfigObj(fileName, encoding="utf-8")
# generate the config spec:
for name in config.keys():
# dont load settings that no longer exist
if not obj.ranges.has_key(name):
continue
range = obj.ranges[name]
defaultVal = obj.defaults[name]
loadedVal = config[name]
try:
if type(range) == types.TupleType and type(range[0]) != types.StringType:
if type(range[0]) == types.FloatType:
loadedVal = float(loadedVal)
assert loadedVal <= range[1], "%s not in range %s" % (loadedVal, range)
assert loadedVal >= range[0], "%s not in range %s" % (loadedVal, range)
elif type(range[0]) == types.IntType:
loadedVal = int(loadedVal)
assert loadedVal <= range[1], "%s not in range %s" % (loadedVal, range)
assert loadedVal >= range[0], "%s not in range %s" % (loadedVal, range)
else:
raise Exception("Weird type for range: %s" % (range))
elif type(range) == types.TupleType and type(range[0]) == types.StringType:
assert loadedVal in range, "%s not in range %s" % (loadedVal, range)
elif type(defaultVal) == types.BooleanType:
if loadedVal.lower() in ("1", "true", "t", "y", "yes", "on"):
loadedVal = True
else:
loadedVal = False
# default to text entry for special types that need validation
elif type(range) == types.StringType:
if range == "folder":
assert os.path.isdir(loadedVal), "%s is not a valid directory" % (loadedVal)
elif range == "ip":
if loadedVal:
assert isIPAddress(loadedVal), "%s is not a valid ip address" % (loadedVal)
elif range in ("GB", "KBps"):
loadedVal = int(loadedVal)
elif range == "password":
pass
elif range == "scheduler":
pass
elif range == "anonymity level":
loadedVal = int(loadedVal)
assert loadedVal in (1, 2, 3), "anonymity value not in range (1,3)"
else:
regex = re.compile(range)
assert regex.match(loadedVal), "settings string (%s) does not match expression (%s)" % (
loadedVal,
range,
)
else:
raise Exception("Unknown range/value combination (%s, %s)" % (range, defaultVal))
except Exception, e:
log_msg("Bad option value (%s) for option %s" % (e, name), 0)
GUIController.get().show_msgbox("Bad option value (%s) for option %s" % (e, name))
setattr(obj, name, loadedVal)
开发者ID:kans,项目名称:BitBlinder,代码行数:59,代码来源:SettingsFile.py
示例18: handle_listening_connect
def handle_listening_connect(self, host, port):
self.state = "connecting"
if isIPAddress(host):
return defer.maybeDeferred(self._connectDone, host, port)
else:
d = self.reactor.resolve(host)
d.addCallback(self._connectDone, port)
return d
开发者ID:pwarren,项目名称:AGDeviceControl,代码行数:8,代码来源:udp.py
示例19: connect
def connect(self, host, port):
"""'Connect' to remote server."""
if self._connectedAddr:
raise RuntimeError, "already connected, reconnecting is not currently supported (talk to itamar if you want this)"
if not abstract.isIPAddress(host):
raise ValueError, "please pass only IP addresses, not domain names"
self._connectedAddr = (host, port)
self.socket.connect((host, port))
开发者ID:P13RR3,项目名称:FrostCore,代码行数:8,代码来源:udp.py
示例20: resolve
def resolve(self, name, timeout = (1, 3, 11, 45)):
"""Return a Deferred that will resolve a hostname.
"""
if not name:
# XXX - This is *less than* '::', and will screw up IPv6 servers
return defer.succeed('0.0.0.0')
if abstract.isIPAddress(name):
return defer.succeed(name)
return self.resolver.getHostByName(name, timeout)
开发者ID:AnthonyNystrom,项目名称:YoGoMee,代码行数:9,代码来源:base.py
注:本文中的twisted.internet.abstract.isIPAddress函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论