本文整理汇总了Python中urllib3.connectionpool.HTTPConnectionPool类的典型用法代码示例。如果您正苦于以下问题:Python HTTPConnectionPool类的具体用法?Python HTTPConnectionPool怎么用?Python HTTPConnectionPool使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了HTTPConnectionPool类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_recovery_when_server_closes_connection
def test_recovery_when_server_closes_connection(self):
# Does the pool work seamlessly if an open connection in the
# connection pool gets hung up on by the server, then reaches
# the front of the queue again?
def server(listener):
for i in 0, 1:
sock = listener.accept()[0]
read_request(sock)
body = 'Response %d' % i
sock.send('HTTP/1.1 200 OK\r\n'
'Content-Type: text/plain\r\n'
'Content-Length: %d\r\n'
'\r\n'
'%s' % (len(body), body))
sock.close() # simulate a server timing out, closing socket
done_closing.set() # let the test know it can proceed
done_closing = Event()
host, port = start_server(server)
pool = HTTPConnectionPool(host, port)
response = pool.request('GET', '/', retries=0)
self.assertEqual(response.status, 200)
self.assertEqual(response.data, 'Response 0')
done_closing.wait() # wait until the socket in our pool gets closed
response = pool.request('GET', '/', retries=0)
self.assertEqual(response.status, 200)
self.assertEqual(response.data, 'Response 1')
开发者ID:blep,项目名称:urllib3,代码行数:31,代码来源:test_connectionpool.py
示例2: test_pool_size
def test_pool_size(self):
POOL_SIZE = 1
pool = HTTPConnectionPool(host='localhost', maxsize=POOL_SIZE, block=True)
def _raise(ex):
raise ex()
def _test(exception, expect):
pool._make_request = lambda *args, **kwargs: _raise(exception)
self.assertRaises(expect, pool.request, 'GET', '/')
self.assertEqual(pool.pool.qsize(), POOL_SIZE)
# Make sure that all of the exceptions return the connection to the pool
_test(Empty, EmptyPoolError)
_test(BaseSSLError, SSLError)
_test(CertificateError, SSLError)
# The pool should never be empty, and with these two exceptions being raised,
# a retry will be triggered, but that retry will fail, eventually raising
# MaxRetryError, not EmptyPoolError
# See: https://github.com/shazow/urllib3/issues/76
pool._make_request = lambda *args, **kwargs: _raise(HTTPException)
self.assertRaises(MaxRetryError, pool.request,
'GET', '/', retries=1, pool_timeout=0.01)
self.assertEqual(pool.pool.qsize(), POOL_SIZE)
开发者ID:09Hero,项目名称:urllib3,代码行数:26,代码来源:test_connectionpool.py
示例3: test_retry_when_server_closes_connection_with_no_data
def test_retry_when_server_closes_connection_with_no_data(self):
# Test that the retry mechanism works when the server drops the connection
# prematurely
done_closing = Event()
ready = Event()
def socket_handler(listener):
for i in 0, 1, 2:
print "Entering", i
sock = listener.accept()[0]
print "Accepting", i
# only interact with client the second time
if i == 1:
buf = b""
while not buf.endswith(b"\r\n\r\n"):
print "Reading..."
buf = sock.recv(65536)
print "Sending..."
body = "Response %d" % i
sock.send(
(
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s" % (len(body), body)
).encode("utf-8")
)
print "Done."
sock.close() # simulate a server timing out, closing socket
print "Setting done", i
done_closing.set() # let the test know it can proceed
self._start_server(socket_handler)
pool = HTTPConnectionPool(self.host, self.port)
# Should succeed in the second retry
import time
time.sleep(0.1)
response = pool.request("GET", "/", retries=1)
self.assertEqual(response.status, 200)
self.assertEqual(response.data, b"Response 1")
print "(Client) Waiting..."
done_closing.wait() # wait until the socket in our pool gets closed
# Fail with no retries
with self.assertRaises(MaxRetryError):
# This is where a failure should occur for issue #104.
response = pool.request("GET", "/", retries=0)
print "(Client) Waiting final..."
done_closing.wait() # wait until the socket in our pool gets closed
开发者ID:eranrund,项目名称:urllib3,代码行数:59,代码来源:test_socketlevel.py
示例4: __init__
def __init__(self, host, port=None, strict=False,
timeout=None, maxsize=1, block=False, headers=headers):
try:
host = get_host(host)[1]
except TypeError: # Already a host-ified host
pass
headers = {k.lower(): v for (k, v) in headers.items()}
HTTPConnectionPool.__init__(self, host, port, strict, timeout, maxsize, block, headers)
self.cookie_session = CookieSession()
开发者ID:Riamse,项目名称:urllib3,代码行数:9,代码来源:opener.py
示例5: test_pool_timeouts
def test_pool_timeouts(self):
pool = HTTPConnectionPool(host='localhost')
conn = pool._new_conn()
self.assertEqual(conn.__class__, HTTPConnection)
self.assertEqual(pool.timeout.__class__, Timeout)
self.assertEqual(pool.timeout._read, Timeout.DEFAULT_TIMEOUT)
self.assertEqual(pool.timeout._connect, Timeout.DEFAULT_TIMEOUT)
self.assertEqual(pool.timeout.total, None)
pool = HTTPConnectionPool(host='localhost', timeout=3)
self.assertEqual(pool.timeout._read, 3)
self.assertEqual(pool.timeout._connect, 3)
self.assertEqual(pool.timeout.total, None)
开发者ID:09Hero,项目名称:urllib3,代码行数:13,代码来源:test_connectionpool.py
示例6: test_multi_setcookie
def test_multi_setcookie(self):
def multicookie_response_handler(listener):
sock = listener.accept()[0]
buf = b""
while not buf.endswith(b"\r\n\r\n"):
buf += sock.recv(65536)
sock.send(b"HTTP/1.1 200 OK\r\n" b"Set-Cookie: foo=1\r\n" b"Set-Cookie: bar=1\r\n" b"\r\n")
self._start_server(multicookie_response_handler)
pool = HTTPConnectionPool(self.host, self.port)
r = pool.request("GET", "/", retries=0)
self.assertEquals(r.headers, {"set-cookie": "foo=1, bar=1"})
开发者ID:eranrund,项目名称:urllib3,代码行数:14,代码来源:test_socketlevel.py
示例7: urlopen
def urlopen(self, method, url, body=None, headers=None, retries=3, redirect=True, assert_same_host=True,
timeout=None, pool_timeout=None, release_conn=None, **response_kw):
"""
Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
with custom cross-host redirect logic and only sends the request-uri
portion of the ``url``.
The given ``url`` parameter must be absolute, such that an appropriate
:class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
"""
if headers is None:
headers = {k.lower(): v for (k, v) in self.headers.items()}
headers.setdefault("cookie", "")
for key, val in self.headers.items():
headers.setdefault(key, val)
# Now the updated Cookie string will be stored into the HTTP request.
# The cookie header may contain duplicate entries (e.g. k=a; k=b;)
headers["cookie"] = self.headers["cookie"] + headers["cookie"]
# This will be resolved by putting the header in the SimpleCookie
self.cookie_session.feed(self.headers)
self.cookie_session.feed(headers)
headers["cookie"] = self.cookie_session.extract()
response = HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, False, assert_same_host, timeout, pool_timeout,
release_conn, **response_kw)
self.cookie_session.feed(self.headers)
self.cookie_session.feed(response.headers)
self.cookie_session.feed(headers)
headers["cookie"] = self.cookie_session.extract()
redirect_location = redirect and response.get_redirect_location()
if not redirect_location:
return response
if response.status == 303:
method = "GET"
return self.urlopen(method, redirect_location, body, headers, retries - 1, redirect, assert_same_host, timeout, pool_timeout,
release_conn, **response_kw)
开发者ID:Riamse,项目名称:urllib3,代码行数:35,代码来源:opener.py
示例8: _test
def _test(exception):
pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)
# Verify that the request succeeds after two attempts, and that the
# connection is left on the response object, instead of being
# released back into the pool.
pool._make_request = _raise_once_make_request_function(exception)
response = pool.urlopen('GET', '/', retries=1,
release_conn=False, preload_content=False,
chunked=True)
self.assertEqual(pool.pool.qsize(), 0)
self.assertEqual(pool.num_connections, 2)
self.assertTrue(response.connection is not None)
response.release_conn()
self.assertEqual(pool.pool.qsize(), 1)
self.assertTrue(response.connection is None)
开发者ID:adamchainz,项目名称:urllib3,代码行数:17,代码来源:test_connectionpool.py
示例9: test_recovery_when_server_closes_connection
def test_recovery_when_server_closes_connection(self):
# Does the pool work seamlessly if an open connection in the
# connection pool gets hung up on by the server, then reaches
# the front of the queue again?
done_closing = Event()
def socket_handler(listener):
for i in 0, 1:
sock = listener.accept()[0]
buf = b""
while not buf.endswith(b"\r\n\r\n"):
buf = sock.recv(65536)
body = "Response %d" % i
sock.send(
(
"HTTP/1.1 200 OK\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s" % (len(body), body)
).encode("utf-8")
)
sock.close() # simulate a server timing out, closing socket
done_closing.set() # let the test know it can proceed
self._start_server(socket_handler)
pool = HTTPConnectionPool(self.host, self.port)
response = pool.request("GET", "/", retries=0)
self.assertEqual(response.status, 200)
self.assertEqual(response.data, b"Response 0")
done_closing.wait() # wait until the socket in our pool gets closed
response = pool.request("GET", "/", retries=0)
self.assertEqual(response.status, 200)
self.assertEqual(response.data, b"Response 1")
done_closing.wait() # wait until the socket in our pool gets closed
开发者ID:eranrund,项目名称:urllib3,代码行数:43,代码来源:test_socketlevel.py
示例10: test_pool_edgecases
def test_pool_edgecases(self):
pool = HTTPConnectionPool(host='localhost', maxsize=1, block=False)
conn1 = pool._get_conn()
conn2 = pool._get_conn() # New because block=False
pool._put_conn(conn1)
pool._put_conn(conn2) # Should be discarded
self.assertEqual(conn1, pool._get_conn())
self.assertNotEqual(conn2, pool._get_conn())
self.assertEqual(pool.num_connections, 3)
开发者ID:09Hero,项目名称:urllib3,代码行数:13,代码来源:test_connectionpool.py
示例11: test_max_connections
def test_max_connections(self):
pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)
pool._get_conn(timeout=0.01)
try:
pool._get_conn(timeout=0.01)
self.fail("Managed to get a connection without EmptyPoolError")
except EmptyPoolError:
pass
try:
pool.request('GET', '/', pool_timeout=0.01)
self.fail("Managed to get a connection without EmptyPoolError")
except EmptyPoolError:
pass
self.assertEqual(pool.num_connections, 1)
开发者ID:09Hero,项目名称:urllib3,代码行数:18,代码来源:test_connectionpool.py
示例12: test_same_host_no_port
def test_same_host_no_port(self):
# This test was introduced in #801 to deal with the fact that urllib3
# never initializes ConnectionPool objects with port=None.
same_host_http = [
('google.com', '/'),
('google.com', 'http://google.com/'),
('google.com', 'http://google.com'),
('google.com', 'http://google.com/abra/cadabra'),
# Test comparison using default ports
('google.com', 'http://google.com:80/abracadabra'),
]
same_host_https = [
('google.com', '/'),
('google.com', 'https://google.com/'),
('google.com', 'https://google.com'),
('google.com', 'https://google.com/abra/cadabra'),
# Test comparison using default ports
('google.com', 'https://google.com:443/abracadabra'),
]
for a, b in same_host_http:
c = HTTPConnectionPool(a)
self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
for a, b in same_host_https:
c = HTTPSConnectionPool(a)
self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
not_same_host_http = [
('google.com', 'https://google.com/'),
('yahoo.com', 'http://google.com/'),
('google.com', 'https://google.net/'),
]
not_same_host_https = [
('google.com', 'http://google.com/'),
('yahoo.com', 'https://google.com/'),
('google.com', 'https://google.net/'),
]
for a, b in not_same_host_http:
c = HTTPConnectionPool(a)
self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
c = HTTPConnectionPool(b)
self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
for a, b in not_same_host_https:
c = HTTPSConnectionPool(a)
self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
c = HTTPSConnectionPool(b)
self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
开发者ID:adamchainz,项目名称:urllib3,代码行数:48,代码来源:test_connectionpool.py
注:本文中的urllib3.connectionpool.HTTPConnectionPool类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论