• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python connectionpool.HTTPConnectionPool类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中urllib3.connectionpool.HTTPConnectionPool的典型用法代码示例。如果您正苦于以下问题:Python HTTPConnectionPool类的具体用法?Python HTTPConnectionPool怎么用?Python HTTPConnectionPool使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了HTTPConnectionPool类的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_recovery_when_server_closes_connection

    def test_recovery_when_server_closes_connection(self):
        # Does the pool work seamlessly if an open connection in the
        # connection pool gets hung up on by the server, then reaches
        # the front of the queue again?

        def server(listener):
            for i in 0, 1:
                sock = listener.accept()[0]
                read_request(sock)
                body = 'Response %d' % i
                sock.send('HTTP/1.1 200 OK\r\n'
                          'Content-Type: text/plain\r\n'
                          'Content-Length: %d\r\n'
                          '\r\n'
                          '%s' % (len(body), body))
                sock.close()  # simulate a server timing out, closing socket
                done_closing.set()  # let the test know it can proceed

        done_closing = Event()
        host, port = start_server(server)
        pool = HTTPConnectionPool(host, port)

        response = pool.request('GET', '/', retries=0)
        self.assertEqual(response.status, 200)
        self.assertEqual(response.data, 'Response 0')

        done_closing.wait()  # wait until the socket in our pool gets closed

        response = pool.request('GET', '/', retries=0)
        self.assertEqual(response.status, 200)
        self.assertEqual(response.data, 'Response 1')
开发者ID:blep,项目名称:urllib3,代码行数:31,代码来源:test_connectionpool.py


示例2: test_pool_size

    def test_pool_size(self):
        POOL_SIZE = 1
        pool = HTTPConnectionPool(host='localhost', maxsize=POOL_SIZE, block=True)

        def _raise(ex):
            raise ex()

        def _test(exception, expect):
            pool._make_request = lambda *args, **kwargs: _raise(exception)
            self.assertRaises(expect, pool.request, 'GET', '/')

            self.assertEqual(pool.pool.qsize(), POOL_SIZE)

        # Make sure that all of the exceptions return the connection to the pool
        _test(Empty, EmptyPoolError)
        _test(BaseSSLError, SSLError)
        _test(CertificateError, SSLError)

        # The pool should never be empty, and with these two exceptions being raised,
        # a retry will be triggered, but that retry will fail, eventually raising
        # MaxRetryError, not EmptyPoolError
        # See: https://github.com/shazow/urllib3/issues/76
        pool._make_request = lambda *args, **kwargs: _raise(HTTPException)
        self.assertRaises(MaxRetryError, pool.request,
                          'GET', '/', retries=1, pool_timeout=0.01)
        self.assertEqual(pool.pool.qsize(), POOL_SIZE)
开发者ID:09Hero,项目名称:urllib3,代码行数:26,代码来源:test_connectionpool.py


示例3: test_retry_when_server_closes_connection_with_no_data

    def test_retry_when_server_closes_connection_with_no_data(self):
        # Test that the retry mechanism works when the server drops the connection
        # prematurely

        done_closing = Event()
        ready = Event()

        def socket_handler(listener):
            for i in 0, 1, 2:
                print "Entering", i
                sock = listener.accept()[0]
                print "Accepting", i

                # only interact with client the second time
                if i == 1:
                    buf = b""
                    while not buf.endswith(b"\r\n\r\n"):
                        print "Reading..."
                        buf = sock.recv(65536)

                    print "Sending..."
                    body = "Response %d" % i
                    sock.send(
                        (
                            "HTTP/1.1 200 OK\r\n"
                            "Content-Type: text/plain\r\n"
                            "Content-Length: %d\r\n"
                            "\r\n"
                            "%s" % (len(body), body)
                        ).encode("utf-8")
                    )
                    print "Done."

                sock.close()  # simulate a server timing out, closing socket
                print "Setting done", i
                done_closing.set()  # let the test know it can proceed

        self._start_server(socket_handler)

        pool = HTTPConnectionPool(self.host, self.port)

        # Should succeed in the second retry
        import time

        time.sleep(0.1)
        response = pool.request("GET", "/", retries=1)
        self.assertEqual(response.status, 200)
        self.assertEqual(response.data, b"Response 1")

        print "(Client) Waiting..."
        done_closing.wait()  # wait until the socket in our pool gets closed

        # Fail with no retries
        with self.assertRaises(MaxRetryError):
            # This is where a failure should occur for issue #104.
            response = pool.request("GET", "/", retries=0)

        print "(Client) Waiting final..."
        done_closing.wait()  # wait until the socket in our pool gets closed
开发者ID:eranrund,项目名称:urllib3,代码行数:59,代码来源:test_socketlevel.py


示例4: __init__

 def __init__(self, host, port=None, strict=False,
              timeout=None, maxsize=1, block=False, headers=headers):
     try:
         host = get_host(host)[1]
     except TypeError: # Already a host-ified host
         pass
     headers = {k.lower(): v for (k, v) in headers.items()}
     HTTPConnectionPool.__init__(self, host, port, strict, timeout, maxsize, block, headers)
     self.cookie_session = CookieSession()
开发者ID:Riamse,项目名称:urllib3,代码行数:9,代码来源:opener.py


示例5: test_pool_timeouts

    def test_pool_timeouts(self):
        pool = HTTPConnectionPool(host='localhost')
        conn = pool._new_conn()
        self.assertEqual(conn.__class__, HTTPConnection)
        self.assertEqual(pool.timeout.__class__, Timeout)
        self.assertEqual(pool.timeout._read, Timeout.DEFAULT_TIMEOUT)
        self.assertEqual(pool.timeout._connect, Timeout.DEFAULT_TIMEOUT)
        self.assertEqual(pool.timeout.total, None)

        pool = HTTPConnectionPool(host='localhost', timeout=3)
        self.assertEqual(pool.timeout._read, 3)
        self.assertEqual(pool.timeout._connect, 3)
        self.assertEqual(pool.timeout.total, None)
开发者ID:09Hero,项目名称:urllib3,代码行数:13,代码来源:test_connectionpool.py


示例6: test_multi_setcookie

    def test_multi_setcookie(self):
        def multicookie_response_handler(listener):
            sock = listener.accept()[0]

            buf = b""
            while not buf.endswith(b"\r\n\r\n"):
                buf += sock.recv(65536)

            sock.send(b"HTTP/1.1 200 OK\r\n" b"Set-Cookie: foo=1\r\n" b"Set-Cookie: bar=1\r\n" b"\r\n")

        self._start_server(multicookie_response_handler)
        pool = HTTPConnectionPool(self.host, self.port)
        r = pool.request("GET", "/", retries=0)
        self.assertEquals(r.headers, {"set-cookie": "foo=1, bar=1"})
开发者ID:eranrund,项目名称:urllib3,代码行数:14,代码来源:test_socketlevel.py


示例7: urlopen

    def urlopen(self, method, url, body=None, headers=None, retries=3, redirect=True, assert_same_host=True,
                timeout=None, pool_timeout=None, release_conn=None, **response_kw):
        """
        Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
        with custom cross-host redirect logic and only sends the request-uri
        portion of the ``url``.

        The given ``url`` parameter must be absolute, such that an appropriate
        :class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
        """
        if headers is None:
            headers = {k.lower(): v for (k, v) in self.headers.items()}
        headers.setdefault("cookie", "")
        for key, val in self.headers.items():
            headers.setdefault(key, val)
        # Now the updated Cookie string will be stored into the HTTP request.
        # The cookie header may contain duplicate entries (e.g. k=a; k=b;)
        headers["cookie"] = self.headers["cookie"] + headers["cookie"]
        # This will be resolved by putting the header in the SimpleCookie
        self.cookie_session.feed(self.headers)
        self.cookie_session.feed(headers)
        headers["cookie"] = self.cookie_session.extract()
        response = HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, False, assert_same_host, timeout, pool_timeout,
            release_conn, **response_kw)
        self.cookie_session.feed(self.headers)
        self.cookie_session.feed(response.headers)
        self.cookie_session.feed(headers)
        headers["cookie"] = self.cookie_session.extract()
        redirect_location = redirect and response.get_redirect_location()
        if not redirect_location:
            return response
        if response.status == 303:
            method = "GET"
        return self.urlopen(method, redirect_location, body, headers, retries - 1, redirect, assert_same_host, timeout, pool_timeout,
            release_conn, **response_kw)
开发者ID:Riamse,项目名称:urllib3,代码行数:35,代码来源:opener.py


示例8: _test

        def _test(exception):
            pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)

            # Verify that the request succeeds after two attempts, and that the
            # connection is left on the response object, instead of being
            # released back into the pool.
            pool._make_request = _raise_once_make_request_function(exception)
            response = pool.urlopen('GET', '/', retries=1,
                                    release_conn=False, preload_content=False,
                                    chunked=True)
            self.assertEqual(pool.pool.qsize(), 0)
            self.assertEqual(pool.num_connections, 2)
            self.assertTrue(response.connection is not None)

            response.release_conn()
            self.assertEqual(pool.pool.qsize(), 1)
            self.assertTrue(response.connection is None)
开发者ID:adamchainz,项目名称:urllib3,代码行数:17,代码来源:test_connectionpool.py


示例9: test_recovery_when_server_closes_connection

    def test_recovery_when_server_closes_connection(self):
        # Does the pool work seamlessly if an open connection in the
        # connection pool gets hung up on by the server, then reaches
        # the front of the queue again?

        done_closing = Event()

        def socket_handler(listener):
            for i in 0, 1:
                sock = listener.accept()[0]

                buf = b""
                while not buf.endswith(b"\r\n\r\n"):
                    buf = sock.recv(65536)

                body = "Response %d" % i
                sock.send(
                    (
                        "HTTP/1.1 200 OK\r\n"
                        "Content-Type: text/plain\r\n"
                        "Content-Length: %d\r\n"
                        "\r\n"
                        "%s" % (len(body), body)
                    ).encode("utf-8")
                )

                sock.close()  # simulate a server timing out, closing socket
                done_closing.set()  # let the test know it can proceed

        self._start_server(socket_handler)
        pool = HTTPConnectionPool(self.host, self.port)

        response = pool.request("GET", "/", retries=0)
        self.assertEqual(response.status, 200)
        self.assertEqual(response.data, b"Response 0")

        done_closing.wait()  # wait until the socket in our pool gets closed

        response = pool.request("GET", "/", retries=0)
        self.assertEqual(response.status, 200)
        self.assertEqual(response.data, b"Response 1")

        done_closing.wait()  # wait until the socket in our pool gets closed
开发者ID:eranrund,项目名称:urllib3,代码行数:43,代码来源:test_socketlevel.py


示例10: test_pool_edgecases

    def test_pool_edgecases(self):
        pool = HTTPConnectionPool(host='localhost', maxsize=1, block=False)

        conn1 = pool._get_conn()
        conn2 = pool._get_conn() # New because block=False

        pool._put_conn(conn1)
        pool._put_conn(conn2) # Should be discarded

        self.assertEqual(conn1, pool._get_conn())
        self.assertNotEqual(conn2, pool._get_conn())

        self.assertEqual(pool.num_connections, 3)
开发者ID:09Hero,项目名称:urllib3,代码行数:13,代码来源:test_connectionpool.py


示例11: test_max_connections

    def test_max_connections(self):
        pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)

        pool._get_conn(timeout=0.01)

        try:
            pool._get_conn(timeout=0.01)
            self.fail("Managed to get a connection without EmptyPoolError")
        except EmptyPoolError:
            pass

        try:
            pool.request('GET', '/', pool_timeout=0.01)
            self.fail("Managed to get a connection without EmptyPoolError")
        except EmptyPoolError:
            pass

        self.assertEqual(pool.num_connections, 1)
开发者ID:09Hero,项目名称:urllib3,代码行数:18,代码来源:test_connectionpool.py


示例12: test_same_host_no_port

    def test_same_host_no_port(self):
        # This test was introduced in #801 to deal with the fact that urllib3
        # never initializes ConnectionPool objects with port=None.
        same_host_http = [
            ('google.com', '/'),
            ('google.com', 'http://google.com/'),
            ('google.com', 'http://google.com'),
            ('google.com', 'http://google.com/abra/cadabra'),
            # Test comparison using default ports
            ('google.com', 'http://google.com:80/abracadabra'),
        ]
        same_host_https = [
            ('google.com', '/'),
            ('google.com', 'https://google.com/'),
            ('google.com', 'https://google.com'),
            ('google.com', 'https://google.com/abra/cadabra'),
            # Test comparison using default ports
            ('google.com', 'https://google.com:443/abracadabra'),
        ]

        for a, b in same_host_http:
            c = HTTPConnectionPool(a)
            self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
        for a, b in same_host_https:
            c = HTTPSConnectionPool(a)
            self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))

        not_same_host_http = [
            ('google.com', 'https://google.com/'),
            ('yahoo.com', 'http://google.com/'),
            ('google.com', 'https://google.net/'),
        ]
        not_same_host_https = [
            ('google.com', 'http://google.com/'),
            ('yahoo.com', 'https://google.com/'),
            ('google.com', 'https://google.net/'),
        ]

        for a, b in not_same_host_http:
            c = HTTPConnectionPool(a)
            self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
            c = HTTPConnectionPool(b)
            self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
        for a, b in not_same_host_https:
            c = HTTPSConnectionPool(a)
            self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
            c = HTTPSConnectionPool(b)
            self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
开发者ID:adamchainz,项目名称:urllib3,代码行数:48,代码来源:test_connectionpool.py



注:本文中的urllib3.connectionpool.HTTPConnectionPool类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python fields.RequestField类代码示例发布时间:2022-05-27
下一篇:
Python connectionpool.connection_from_url函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap