本文整理汇总了Python中test.support.find_unused_port函数的典型用法代码示例。如果您正苦于以下问题:Python find_unused_port函数的具体用法?Python find_unused_port怎么用?Python find_unused_port使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了find_unused_port函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_start_serving_dual_stack
def test_start_serving_dual_stack(self):
f_proto = futures.Future()
def connection_handler(transport):
f_proto.set_result(MyProto(transport))
port = find_unused_port()
f = self.loop.start_serving(connection_handler, host=None, port=port)
socks = self.loop.run_until_complete(f)
client = socket.socket()
client.connect(('127.0.0.1', port))
client.send(b'xxx')
proto = self.loop.run_until_complete(f_proto)
proto.transport.close()
client.close()
f_proto = futures.Future()
client = socket.socket(socket.AF_INET6)
client.connect(('::1', port))
client.send(b'xxx')
proto = self.loop.run_until_complete(f_proto)
proto.transport.close()
client.close()
for s in socks:
self.loop.stop_serving(s)
开发者ID:sah,项目名称:tulip,代码行数:26,代码来源:events_test.py
示例2: prepare_socksendfile
def prepare_socksendfile(self):
proto = MyProto(self.loop)
port = support.find_unused_port()
srv_sock = self.make_socket(cleanup=False)
srv_sock.bind((support.HOST, port))
server = self.run_loop(self.loop.create_server(
lambda: proto, sock=srv_sock))
self.reduce_receive_buffer_size(srv_sock)
sock = self.make_socket()
self.run_loop(self.loop.sock_connect(sock, ('127.0.0.1', port)))
self.reduce_send_buffer_size(sock)
def cleanup():
if proto.transport is not None:
# can be None if the task was cancelled before
# connection_made callback
proto.transport.close()
self.run_loop(proto.wait_closed())
server.close()
self.run_loop(server.wait_closed())
self.addCleanup(cleanup)
return sock, proto
开发者ID:adrian17,项目名称:cpython,代码行数:26,代码来源:test_sendfile.py
示例3: __init__
def __init__(self, certfile):
self.flag = None
self.active = False
self.port = support.find_unused_port()
self.server = self.EchoServer(self.port, certfile)
threading.Thread.__init__(self)
self.daemon = True
开发者ID:Kanma,项目名称:Athena-Dependencies-Python,代码行数:7,代码来源:test_ssl.py
示例4: test_source_address
def test_source_address(self):
self.client.quit()
port = support.find_unused_port()
self.client.connect(self.server.host, self.server.port,
source_address=(HOST, port))
self.assertEqual(self.client.sock.getsockname()[1], port)
self.client.quit()
开发者ID:pombredanne,项目名称:cpython,代码行数:7,代码来源:test_ftplib.py
示例5: test_create_connection_local_addr
def test_create_connection_local_addr(self):
with test_utils.run_test_server(self.loop) as httpd:
port = find_unused_port()
f = self.loop.create_connection(
*httpd.address, local_addr=(httpd.address[0], port))
tr = self.loop.run_until_complete(f)
pr = MyProto(tr, create_future=True)
expected = pr.transport.get_extra_info('socket').getsockname()[1]
self.assertEqual(port, expected)
tr.close()
开发者ID:sah,项目名称:tulip,代码行数:10,代码来源:events_test.py
示例6: test_source_address_passive_connection
def test_source_address_passive_connection(self):
port = support.find_unused_port()
self.client.source_address = (HOST, port)
try:
with self.client.transfercmd('list') as sock:
self.assertEqual(sock.getsockname()[1], port)
except IOError as e:
if e.errno == errno.EADDRINUSE:
self.skipTest("couldn't bind to port %d" % port)
raise
开发者ID:alfonsodiecko,项目名称:PYTHON_DIST,代码行数:10,代码来源:test_ftplib.py
示例7: test_source_address
def test_source_address(self):
self.client.quit()
port = support.find_unused_port()
try:
self.client.connect(self.server.host, self.server.port, source_address=(HOST, port))
self.assertEqual(self.client.sock.getsockname()[1], port)
self.client.quit()
except OSError as e:
if e.errno == errno.EADDRINUSE:
self.skipTest("couldn't bind to port %d" % port)
raise
开发者ID:collinanderson,项目名称:ensurepip,代码行数:11,代码来源:test_ftplib.py
示例8: setUp
def setUp(self):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
self.port = support.find_unused_port()
self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
self.serv_evt.wait()
self.serv_evt.clear()
开发者ID:LinkedModernismProject,项目名称:web_code,代码行数:11,代码来源:test_smtplib.py
示例9: test_create_connection_local_addr
def test_create_connection_local_addr(self):
from test.support import find_unused_port
loop = get_event_loop()
with run_test_server(loop, EchoServerProtocol) as server:
yield server.start_serving()
host = server.address[0]
port = find_unused_port()
tr, pr = yield loop.create_connection(SimpleProtocol,
*server.address,
local_addr=(host, port))
expected = pr.transport.get_extra_info('socket').getsockname()[1]
self.assertEqual(port, expected)
tr.close()
开发者ID:elimisteve,项目名称:pulsar,代码行数:13,代码来源:eventloop.py
示例10: testSourceAddress
def testSourceAddress(self):
# connect
src_port = support.find_unused_port()
try:
smtp = smtplib.SMTP(self.host, self.port, local_hostname='localhost',
timeout=3, source_address=(self.host, src_port))
self.assertEqual(smtp.source_address, (self.host, src_port))
self.assertEqual(smtp.local_hostname, 'localhost')
smtp.quit()
except OSError as e:
if e.errno == errno.EADDRINUSE:
self.skipTest("couldn't bind to source port %d" % src_port)
raise
开发者ID:CCNITSilchar,项目名称:cpython,代码行数:13,代码来源:test_smtplib.py
示例11: testSourceAddress
def testSourceAddress(self):
# connect
port = support.find_unused_port()
try:
smtp = smtplib.SMTP(
HOST, self.port, local_hostname="localhost", timeout=3, source_address=("127.0.0.1", port)
)
self.assertEqual(smtp.source_address, ("127.0.0.1", port))
self.assertEqual(smtp.local_hostname, "localhost")
smtp.quit()
except IOError as e:
if e.errno == errno.EADDRINUSE:
self.skipTest("couldn't bind to port %d" % port)
raise
开发者ID:pykomke,项目名称:Kurz_Python_KE,代码行数:14,代码来源:test_smtplib.py
示例12: test_create_server_sock
def test_create_server_sock(self):
port = find_unused_port()
sock = create_server_sock((None, port))
with contextlib.closing(sock):
self.assertEqual(sock.getsockname()[1], port)
self.assertEqual(sock.type, socket.SOCK_STREAM)
if has_dual_stack():
self.assertEqual(sock.family, socket.AF_INET6)
else:
self.assertEqual(sock.family, socket.AF_INET)
self.echo_server(sock)
cl = socket.create_connection(('localhost', port), timeout=2)
with contextlib.closing(cl):
cl.sendall(b'foo')
self.assertEqual(cl.recv(1024), b'foo')
开发者ID:jacob-carrier,项目名称:code,代码行数:15,代码来源:recipe-578504.py
示例13: prepare_sendfile
def prepare_sendfile(self, *, is_ssl=False, close_after=0):
port = support.find_unused_port()
srv_proto = MySendfileProto(loop=self.loop,
close_after=close_after)
if is_ssl:
if not ssl:
self.skipTest("No ssl module")
srv_ctx = test_utils.simple_server_sslcontext()
cli_ctx = test_utils.simple_client_sslcontext()
else:
srv_ctx = None
cli_ctx = None
srv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv_sock.bind((support.HOST, port))
server = self.run_loop(self.loop.create_server(
lambda: srv_proto, sock=srv_sock, ssl=srv_ctx))
self.reduce_receive_buffer_size(srv_sock)
if is_ssl:
server_hostname = support.HOST
else:
server_hostname = None
cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli_sock.connect((support.HOST, port))
cli_proto = MySendfileProto(loop=self.loop)
tr, pr = self.run_loop(self.loop.create_connection(
lambda: cli_proto, sock=cli_sock,
ssl=cli_ctx, server_hostname=server_hostname))
self.reduce_send_buffer_size(cli_sock, transport=tr)
def cleanup():
srv_proto.transport.close()
cli_proto.transport.close()
self.run_loop(srv_proto.done)
self.run_loop(cli_proto.done)
server.close()
self.run_loop(server.wait_closed())
self.addCleanup(cleanup)
return srv_proto, cli_proto
开发者ID:adrian17,项目名称:cpython,代码行数:42,代码来源:test_sendfile.py
示例14: test_start_serving_dual_stack
def test_start_serving_dual_stack(self):
f_proto = futures.Future(loop=self.loop)
class TestMyProto(MyProto):
def connection_made(self, transport):
super().connection_made(transport)
f_proto.set_result(self)
try_count = 0
while True:
try:
port = find_unused_port()
f = self.loop.start_serving(TestMyProto, host=None, port=port)
socks = self.loop.run_until_complete(f)
except OSError as ex:
if ex.errno == errno.EADDRINUSE:
try_count += 1
self.assertGreaterEqual(5, try_count)
continue
else:
raise
else:
break
client = socket.socket()
client.connect(('127.0.0.1', port))
client.send(b'xxx')
proto = self.loop.run_until_complete(f_proto)
proto.transport.close()
client.close()
f_proto = futures.Future(loop=self.loop)
client = socket.socket(socket.AF_INET6)
client.connect(('::1', port))
client.send(b'xxx')
proto = self.loop.run_until_complete(f_proto)
proto.transport.close()
client.close()
for s in socks:
self.loop.stop_serving(s)
开发者ID:johnnoone,项目名称:pyzmqtulip,代码行数:40,代码来源:events_test.py
示例15: test_mlistener
def test_mlistener(self):
port = find_unused_port()
# v4
sock = MultipleSocketsListener(
[('127.0.0.1', port), ('::1', port)])
with contextlib.closing(sock):
self.echo_server(sock)
port = sock.getsockname()[1]
cl = socket.create_connection(("127.0.0.1", port), timeout=2)
with contextlib.closing(cl):
cl.sendall(b'foo')
self.assertEqual(cl.recv(1024), b'foo')
# v6
sock = MultipleSocketsListener(
[('127.0.0.1', port), ('::1', port)])
with contextlib.closing(sock):
self.echo_server(sock)
port = sock.getsockname()[1]
cl = socket.create_connection(("::1", port), timeout=2)
with contextlib.closing(cl):
cl.sendall(b'foo')
self.assertEqual(cl.recv(1024), b'foo')
开发者ID:jacob-carrier,项目名称:code,代码行数:22,代码来源:recipe-578504.py
示例16: prepare
def prepare(self):
sock = self.make_socket()
proto = self.MyProto(self.loop)
port = support.find_unused_port()
srv_sock = self.make_socket(cleanup=False)
srv_sock.bind(('127.0.0.1', port))
server = self.run_loop(self.loop.create_server(
lambda: proto, sock=srv_sock))
self.run_loop(self.loop.sock_connect(sock, srv_sock.getsockname()))
def cleanup():
if proto.transport is not None:
# can be None if the task was cancelled before
# connection_made callback
proto.transport.close()
self.run_loop(proto.wait_closed())
server.close()
self.run_loop(server.wait_closed())
self.addCleanup(cleanup)
return sock, proto
开发者ID:FFMG,项目名称:myoddweb.piger,代码行数:23,代码来源:test_proactor_events.py
示例17: testRudeShutdown
def testRudeShutdown(self):
listener_ready = threading.Event()
listener_gone = threading.Event()
port = support.find_unused_port()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# Then it rudely closes the socket, and sets Event `listener_gone`
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
s.bind((HOST, port))
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect((HOST, port))
listener_gone.wait()
try:
ssl_sock = ssl.wrap_socket(s)
except IOError:
pass
else:
raise support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
开发者ID:Kanma,项目名称:Athena-Dependencies-Python,代码行数:36,代码来源:test_ssl.py
示例18: setUp
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.port = support.bind_port(self.serv)
self.source_port = support.find_unused_port()
self.serv.listen(5)
self.conn = None
开发者ID:chidea,项目名称:GoPythonDLLWrapper,代码行数:6,代码来源:test_httplib.py
示例19: test_source_address_passive_connection
def test_source_address_passive_connection(self):
port = support.find_unused_port()
self.client.source_address = (HOST, port)
with self.client.transfercmd('list') as sock:
self.assertEqual(sock.getsockname()[1], port)
开发者ID:pombredanne,项目名称:cpython,代码行数:5,代码来源:test_ftplib.py
示例20: test_find_unused_port
def test_find_unused_port(self):
port = support.find_unused_port()
s = socket.socket()
s.bind((support.HOST, port))
s.close()
开发者ID:MYSHLIFE,项目名称:cpython-1,代码行数:5,代码来源:test_support.py
注:本文中的test.support.find_unused_port函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论