• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python compat.tobytes函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中waitress.compat.tobytes函数的典型用法代码示例。如果您正苦于以下问题:Python tobytes函数的具体用法?Python tobytes怎么用?Python tobytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了tobytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_pipelining

    def test_pipelining(self):
        s = ("GET / HTTP/1.0\r\n"
             "Connection: %s\r\n"
             "Content-Length: %d\r\n"
             "\r\n"
             "%s")
        to_send = b''
        count = 25
        for n in range(count):
            body = "Response #%d\r\n" % (n + 1)
            if n + 1 < count:
                conn = 'keep-alive'
            else:
                conn = 'close'
            to_send += tobytes(s % (conn, len(body), body))

        self.sock.connect((self.host, self.port))
        self.sock.send(to_send)
        fp = self.sock.makefile('rb', 0)
        for n in range(count):
            expect_body = tobytes("Response #%d\r\n" % (n + 1))
            line = fp.readline() # status line
            version, status, reason = (x.strip() for x in line.split(None, 2))
            headers = parse_headers(fp)
            length = int(headers.get('content-length')) or None
            response_body = fp.read(length)
            self.assertEqual(int(status), 200)
            self.assertEqual(length, len(response_body))
            self.assertEqual(response_body, expect_body)
开发者ID:ankit585,项目名称:thespark,代码行数:29,代码来源:test_functional.py


示例2: test_expect_continue

 def test_expect_continue(self):
     # specifying Connection: close explicitly
     data = "I have expectations"
     to_send = tobytes(
         "GET / HTTP/1.1\n"
          "Connection: close\n"
          "Content-Length: %d\n"
          "Expect: 100-continue\n"
          "\n"
          "%s" % (len(data), data)
          )
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile('rb', 0)
     line = fp.readline() # continue status line
     version, status, reason = (x.strip() for x in line.split(None, 2))
     self.assertEqual(int(status), 100)
     self.assertEqual(reason, b'Continue')
     self.assertEqual(version, b'HTTP/1.1')
     fp.readline() # blank line
     line = fp.readline() # next status line
     version, status, reason = (x.strip() for x in line.split(None, 2))
     headers = parse_headers(fp)
     length = int(headers.get('content-length')) or None
     response_body = fp.read(length)
     self.assertEqual(int(status), 200)
     self.assertEqual(length, len(response_body))
     self.assertEqual(response_body, tobytes(data))
开发者ID:ankit585,项目名称:thespark,代码行数:28,代码来源:test_functional.py


示例3: test_long_body

 def test_long_body(self):
     # check server doesnt close connection when body is too short
     # for cl header
     to_send = tobytes(
         "GET /long_body HTTP/1.0\n"
          "Connection: Keep-Alive\n"
          "Content-Length: 0\n"
          "\n"
          )
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile('rb', 0)
     line = fp.readline() # status line
     version, status, reason = (x.strip() for x in line.split(None, 2))
     headers = parse_headers(fp)
     content_length = int(headers.get('content-length')) or None
     response_body = fp.read(content_length)
     self.assertEqual(int(status), 200)
     self.assertEqual(content_length, len(response_body))
     self.assertEqual(response_body, tobytes('abcdefgh'))
     # remote does not close connection (keepalive header)
     self.sock.send(to_send)
     fp = self.sock.makefile('rb', 0)
     line = fp.readline() # status line
     version, status, reason = (x.strip() for x in line.split(None, 2))
     headers = parse_headers(fp)
     content_length = int(headers.get('content-length')) or None
     response_body = fp.read(content_length)
     self.assertEqual(int(status), 200)
开发者ID:ankit585,项目名称:thespark,代码行数:29,代码来源:test_functional.py


示例4: test_short_body

 def test_short_body(self):
     # check to see if server closes connection when body is too short
     # for cl header
     to_send = tobytes(
         "GET /short_body HTTP/1.0\n"
          "Connection: Keep-Alive\n"
          "Content-Length: 0\n"
          "\n"
          )
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile('rb', 0)
     line = fp.readline() # status line
     version, status, reason = (x.strip() for x in line.split(None, 2))
     headers = parse_headers(fp)
     content_length = int(headers.get('content-length'))
     response_body = fp.read(content_length)
     self.assertEqual(int(status), 200)
     self.assertNotEqual(content_length, len(response_body))
     self.assertEqual(len(response_body), content_length-1)
     self.assertEqual(response_body, tobytes('abcdefghi'))
     # remote closed connection (despite keepalive header); not sure why
     # first send succeeds
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:ankit585,项目名称:thespark,代码行数:25,代码来源:test_functional.py


示例5: test_without_crlf

 def test_without_crlf(self):
     data = "Echo\nthis\r\nplease"
     s = tobytes("GET / HTTP/1.0\n" "Connection: close\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
     self.sock.connect((self.host, self.port))
     self.sock.send(s)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.0")
     headers["content-length"]
     self.assertEqual(len(response_body), len(data))
     self.assertEqual(response_body, tobytes(data))
开发者ID:Kleptine,项目名称:Craft,代码行数:11,代码来源:test_functional.py


示例6: test_large_body

 def test_large_body(self):
     # 1024 characters.
     body = "This string has 32 characters.\r\n" * 32
     s = tobytes("GET / HTTP/1.0\n" "Content-Length: %d\n" "\n" "%s" % (len(body), body))
     self.sock.connect((self.host, self.port))
     self.sock.send(s)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.0")
     self.assertEqual(headers.get("content-length"), "1024")
     self.assertEqual(response_body, tobytes(body))
开发者ID:Kleptine,项目名称:Craft,代码行数:11,代码来源:test_functional.py


示例7: test_no_content_length

 def test_no_content_length(self):
     # wtf happens when there's no content-length
     to_send = tobytes("GET /no_content_length HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line = fp.readline()  # status line
     line, headers, response_body = read_http(fp)
     content_length = headers.get("content-length")
     self.assertEqual(content_length, None)
     self.assertEqual(response_body, tobytes("abcdefghi"))
     # remote closed connection (despite keepalive header)
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:14,代码来源:test_functional.py


示例8: testComplexGET

    def testComplexGET(self):
        data = b"""\
GET /foo/a+%2B%2F%C3%A4%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6 HTTP/8.4
FirstName: mickey
lastname: Mouse
content-length: 10

Hello mickey.
"""
        parser = self.parser
        self.feed(data)
        self.assertEqual(parser.command, 'GET')
        self.assertEqual(parser.version, '8.4')
        self.assertFalse(parser.empty)
        self.assertEqual(parser.headers,
                         {'FIRSTNAME': 'mickey',
                          'LASTNAME': 'Mouse',
                          'CONTENT_LENGTH': '10',
                          })
        # path should be utf-8 encoded
        self.assertEqual(tobytes(parser.path).decode('utf-8'),
                         text_(b'/foo/a++/\xc3\xa4=&a:int', 'utf-8'))
        self.assertEqual(parser.query,
                         'd=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6')
        self.assertEqual(parser.get_body_stream().getvalue(), b'Hello mick')
开发者ID:EyeOfPython,项目名称:japanese,代码行数:25,代码来源:test_parser.py


示例9: test_large_body

 def test_large_body(self):
     # 1024 characters.
     body = 'This string has 32 characters.\r\n' * 32
     s = tobytes(
         "GET / HTTP/1.0\n"
         "Content-Length: %d\n"
         "\n"
         "%s" % (len(body), body)
         )
     self.sock.connect((self.host, self.port))
     self.sock.send(s)
     fp = self.sock.makefile('rb', 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, '200', 'OK', 'HTTP/1.0')
     self.assertEqual(headers.get('content-length'), '1024')
     self.assertEqual(response_body, tobytes(body))
开发者ID:ankit585,项目名称:thespark,代码行数:16,代码来源:test_functional.py


示例10: write

 def write(self, data):
     if not self.complete:
         raise RuntimeError('start_response was not called before body '
                            'written')
     channel = self.channel
     if not self.wrote_header:
         rh = self.build_response_header()
         channel.write_soon(rh)
         self.wrote_header = True
     if data:
         towrite = data
         cl = self.content_length
         if self.chunked_response:
             # use chunked encoding response
             towrite = tobytes(hex(len(data))[2:].upper()) + b'\r\n'
             towrite += data + b'\r\n'
         elif cl is not None:
             towrite = data[:cl - self.content_bytes_written]
             self.content_bytes_written += len(towrite)
             if towrite != data and not self.logged_write_excess:
                 self.logger.warning(
                     'application-written content exceeded the number of '
                     'bytes specified by Content-Length header (%s)' % cl)
                 self.logged_write_excess = True
         if towrite:
             channel.write_soon(towrite)
开发者ID:BenAllums,项目名称:waitress,代码行数:26,代码来源:task.py


示例11: test_request_body_too_large_with_no_cl_http11

 def test_request_body_too_large_with_no_cl_http11(self):
     body = 'a' * self.toobig
     to_send = "GET / HTTP/1.1\n\n"
     to_send += body
     to_send = tobytes(to_send)
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile('rb')
     # server trusts the content-length header (assumed 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, '200', 'OK', 'HTTP/1.1')
     cl = int(headers['content-length'])
     self.assertEqual(cl, len(response_body))
     # server assumes pipelined requests due to http/1.1, and the first
     # request was assumed c-l 0 because it had no content-length header,
     # so entire body looks like the header of the subsequent request
     # second response is an error response
     line, headers, response_body = read_http(fp)
     self.assertline(line, '431', 'Request Header Fields Too Large', 
                          'HTTP/1.0')
     cl = int(headers['content-length'])
     self.assertEqual(cl, len(response_body))
     # connection has been closed
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:ankit585,项目名称:thespark,代码行数:25,代码来源:test_functional.py


示例12: test_http10_listlentwo

 def test_http10_listlentwo(self):
     body = string.ascii_letters
     to_send = "GET /list_lentwo HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: %d\n\n" % len(body)
     to_send += body
     to_send = tobytes(to_send)
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.0")
     self.assertEqual(headers.get("content-length"), None)
     self.assertEqual(headers.get("connection"), "close")
     self.assertEqual(response_body, tobytes(body))
     # remote closed connection (despite keepalive header), because
     # lists of length > 1 cannot have their content length divined
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py


示例13: test_long_body

 def test_long_body(self):
     # check server doesnt close connection when body is too long
     # for cl header
     to_send = tobytes("GET /long_body HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     content_length = int(headers.get("content-length")) or None
     self.assertEqual(content_length, 9)
     self.assertEqual(content_length, len(response_body))
     self.assertEqual(response_body, tobytes("abcdefghi"))
     # remote does not close connection (keepalive header)
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.0")
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py


示例14: test_http11_list

 def test_http11_list(self):
     body = string.ascii_letters
     to_send = "GET /list HTTP/1.1\n" "Content-Length: %d\n\n" % len(body)
     to_send += body
     to_send = tobytes(to_send)
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.1")
     self.assertEqual(headers["content-length"], str(len(body)))
     self.assertEqual(response_body, tobytes(body))
     # remote keeps connection open because it divined the content length
     # from a length-1 list
     self.sock.send(to_send)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.1")
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py


示例15: test_chunking_request_without_content

 def test_chunking_request_without_content(self):
     header = tobytes("GET / HTTP/1.1\n" "Transfer-Encoding: chunked\n\n")
     self.sock.connect((self.host, self.port))
     self.sock.send(header)
     self.sock.send(b"0\r\n\r\n")
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.1")
     self.assertEqual(response_body, b"")
开发者ID:Kleptine,项目名称:Craft,代码行数:9,代码来源:test_functional.py


示例16: test_short_body

 def test_short_body(self):
     # check to see if server closes connection when body is too short
     # for cl header
     to_send = tobytes("GET /short_body HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     # server trusts the content-length header (5)
     self.assertline(line, "200", "OK", "HTTP/1.0")
     cl = int(headers["content-length"])
     self.assertEqual(cl, 9)
     self.assertNotEqual(cl, len(response_body))
     self.assertEqual(len(response_body), cl - 1)
     self.assertEqual(response_body, tobytes("abcdefgh"))
     # remote closed connection (despite keepalive header)
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:18,代码来源:test_functional.py


示例17: test_http11_listlentwo

 def test_http11_listlentwo(self):
     body = string.ascii_letters
     to_send = "GET /list_lentwo HTTP/1.1\n" "Content-Length: %s\n\n" % len(body)
     to_send += body
     to_send = tobytes(to_send)
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb")
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.1")
     expected = b""
     for chunk in (body[0], body[1:]):
         expected += tobytes("%s\r\n%s\r\n" % (str(hex(len(chunk))[2:].upper()), chunk))
     expected += b"0\r\n\r\n"
     self.assertEqual(response_body, expected)
     # connection is always closed at the end of a chunked response
     self.sock.send(to_send)
     self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:18,代码来源:test_functional.py


示例18: test_send_empty_body

 def test_send_empty_body(self):
     to_send = "GET / HTTP/1.0\n" "Content-Length: 0\n\n"
     to_send = tobytes(to_send)
     self.sock.connect((self.host, self.port))
     self.sock.send(to_send)
     fp = self.sock.makefile("rb", 0)
     line, headers, response_body = read_http(fp)
     self.assertline(line, "200", "OK", "HTTP/1.0")
     self.assertEqual(headers.get("content-length"), "0")
     self.assertEqual(response_body, b"")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py


示例19: test_keepalive_http11_explicit

 def test_keepalive_http11_explicit(self):
     # Explicitly set keep-alive
     data = "Default: Keep me alive"
     s = tobytes("GET / HTTP/1.1\n" "Connection: keep-alive\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
     self.sock.connect((self.host, self.port))
     self.sock.send(s)
     response = httplib.HTTPResponse(self.sock)
     response.begin()
     self.assertEqual(int(response.status), 200)
     self.assertTrue(response.getheader("connection") != "close")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py


示例20: test_keepalive_http11_connclose

 def test_keepalive_http11_connclose(self):
     # specifying Connection: close explicitly
     data = "Don't keep me alive"
     s = tobytes("GET / HTTP/1.1\n" "Connection: close\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
     self.sock.connect((self.host, self.port))
     self.sock.send(s)
     response = httplib.HTTPResponse(self.sock)
     response.begin()
     self.assertEqual(int(response.status), 200)
     self.assertEqual(response.getheader("connection"), "close")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py



注:本文中的waitress.compat.tobytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python compat.u函数代码示例发布时间:2022-05-26
下一篇:
Python waitress.serve函数代码示例发布时间:2022-05-26
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap