本文整理汇总了Python中waitress.compat.tobytes函数的典型用法代码示例。如果您正苦于以下问题:Python tobytes函数的具体用法?Python tobytes怎么用?Python tobytes使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了tobytes函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_pipelining
def test_pipelining(self):
s = ("GET / HTTP/1.0\r\n"
"Connection: %s\r\n"
"Content-Length: %d\r\n"
"\r\n"
"%s")
to_send = b''
count = 25
for n in range(count):
body = "Response #%d\r\n" % (n + 1)
if n + 1 < count:
conn = 'keep-alive'
else:
conn = 'close'
to_send += tobytes(s % (conn, len(body), body))
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile('rb', 0)
for n in range(count):
expect_body = tobytes("Response #%d\r\n" % (n + 1))
line = fp.readline() # status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
length = int(headers.get('content-length')) or None
response_body = fp.read(length)
self.assertEqual(int(status), 200)
self.assertEqual(length, len(response_body))
self.assertEqual(response_body, expect_body)
开发者ID:ankit585,项目名称:thespark,代码行数:29,代码来源:test_functional.py
示例2: test_expect_continue
def test_expect_continue(self):
# specifying Connection: close explicitly
data = "I have expectations"
to_send = tobytes(
"GET / HTTP/1.1\n"
"Connection: close\n"
"Content-Length: %d\n"
"Expect: 100-continue\n"
"\n"
"%s" % (len(data), data)
)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile('rb', 0)
line = fp.readline() # continue status line
version, status, reason = (x.strip() for x in line.split(None, 2))
self.assertEqual(int(status), 100)
self.assertEqual(reason, b'Continue')
self.assertEqual(version, b'HTTP/1.1')
fp.readline() # blank line
line = fp.readline() # next status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
length = int(headers.get('content-length')) or None
response_body = fp.read(length)
self.assertEqual(int(status), 200)
self.assertEqual(length, len(response_body))
self.assertEqual(response_body, tobytes(data))
开发者ID:ankit585,项目名称:thespark,代码行数:28,代码来源:test_functional.py
示例3: test_long_body
def test_long_body(self):
# check server doesnt close connection when body is too short
# for cl header
to_send = tobytes(
"GET /long_body HTTP/1.0\n"
"Connection: Keep-Alive\n"
"Content-Length: 0\n"
"\n"
)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile('rb', 0)
line = fp.readline() # status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
content_length = int(headers.get('content-length')) or None
response_body = fp.read(content_length)
self.assertEqual(int(status), 200)
self.assertEqual(content_length, len(response_body))
self.assertEqual(response_body, tobytes('abcdefgh'))
# remote does not close connection (keepalive header)
self.sock.send(to_send)
fp = self.sock.makefile('rb', 0)
line = fp.readline() # status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
content_length = int(headers.get('content-length')) or None
response_body = fp.read(content_length)
self.assertEqual(int(status), 200)
开发者ID:ankit585,项目名称:thespark,代码行数:29,代码来源:test_functional.py
示例4: test_short_body
def test_short_body(self):
# check to see if server closes connection when body is too short
# for cl header
to_send = tobytes(
"GET /short_body HTTP/1.0\n"
"Connection: Keep-Alive\n"
"Content-Length: 0\n"
"\n"
)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile('rb', 0)
line = fp.readline() # status line
version, status, reason = (x.strip() for x in line.split(None, 2))
headers = parse_headers(fp)
content_length = int(headers.get('content-length'))
response_body = fp.read(content_length)
self.assertEqual(int(status), 200)
self.assertNotEqual(content_length, len(response_body))
self.assertEqual(len(response_body), content_length-1)
self.assertEqual(response_body, tobytes('abcdefghi'))
# remote closed connection (despite keepalive header); not sure why
# first send succeeds
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:ankit585,项目名称:thespark,代码行数:25,代码来源:test_functional.py
示例5: test_without_crlf
def test_without_crlf(self):
data = "Echo\nthis\r\nplease"
s = tobytes("GET / HTTP/1.0\n" "Connection: close\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
self.sock.connect((self.host, self.port))
self.sock.send(s)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
headers["content-length"]
self.assertEqual(len(response_body), len(data))
self.assertEqual(response_body, tobytes(data))
开发者ID:Kleptine,项目名称:Craft,代码行数:11,代码来源:test_functional.py
示例6: test_large_body
def test_large_body(self):
# 1024 characters.
body = "This string has 32 characters.\r\n" * 32
s = tobytes("GET / HTTP/1.0\n" "Content-Length: %d\n" "\n" "%s" % (len(body), body))
self.sock.connect((self.host, self.port))
self.sock.send(s)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers.get("content-length"), "1024")
self.assertEqual(response_body, tobytes(body))
开发者ID:Kleptine,项目名称:Craft,代码行数:11,代码来源:test_functional.py
示例7: test_no_content_length
def test_no_content_length(self):
# wtf happens when there's no content-length
to_send = tobytes("GET /no_content_length HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line = fp.readline() # status line
line, headers, response_body = read_http(fp)
content_length = headers.get("content-length")
self.assertEqual(content_length, None)
self.assertEqual(response_body, tobytes("abcdefghi"))
# remote closed connection (despite keepalive header)
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:14,代码来源:test_functional.py
示例8: testComplexGET
def testComplexGET(self):
data = b"""\
GET /foo/a+%2B%2F%C3%A4%3D%26a%3Aint?d=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6 HTTP/8.4
FirstName: mickey
lastname: Mouse
content-length: 10
Hello mickey.
"""
parser = self.parser
self.feed(data)
self.assertEqual(parser.command, 'GET')
self.assertEqual(parser.version, '8.4')
self.assertFalse(parser.empty)
self.assertEqual(parser.headers,
{'FIRSTNAME': 'mickey',
'LASTNAME': 'Mouse',
'CONTENT_LENGTH': '10',
})
# path should be utf-8 encoded
self.assertEqual(tobytes(parser.path).decode('utf-8'),
text_(b'/foo/a++/\xc3\xa4=&a:int', 'utf-8'))
self.assertEqual(parser.query,
'd=b+%2B%2F%3D%26b%3Aint&c+%2B%2F%3D%26c%3Aint=6')
self.assertEqual(parser.get_body_stream().getvalue(), b'Hello mick')
开发者ID:EyeOfPython,项目名称:japanese,代码行数:25,代码来源:test_parser.py
示例9: test_large_body
def test_large_body(self):
# 1024 characters.
body = 'This string has 32 characters.\r\n' * 32
s = tobytes(
"GET / HTTP/1.0\n"
"Content-Length: %d\n"
"\n"
"%s" % (len(body), body)
)
self.sock.connect((self.host, self.port))
self.sock.send(s)
fp = self.sock.makefile('rb', 0)
line, headers, response_body = read_http(fp)
self.assertline(line, '200', 'OK', 'HTTP/1.0')
self.assertEqual(headers.get('content-length'), '1024')
self.assertEqual(response_body, tobytes(body))
开发者ID:ankit585,项目名称:thespark,代码行数:16,代码来源:test_functional.py
示例10: write
def write(self, data):
if not self.complete:
raise RuntimeError('start_response was not called before body '
'written')
channel = self.channel
if not self.wrote_header:
rh = self.build_response_header()
channel.write_soon(rh)
self.wrote_header = True
if data:
towrite = data
cl = self.content_length
if self.chunked_response:
# use chunked encoding response
towrite = tobytes(hex(len(data))[2:].upper()) + b'\r\n'
towrite += data + b'\r\n'
elif cl is not None:
towrite = data[:cl - self.content_bytes_written]
self.content_bytes_written += len(towrite)
if towrite != data and not self.logged_write_excess:
self.logger.warning(
'application-written content exceeded the number of '
'bytes specified by Content-Length header (%s)' % cl)
self.logged_write_excess = True
if towrite:
channel.write_soon(towrite)
开发者ID:BenAllums,项目名称:waitress,代码行数:26,代码来源:task.py
示例11: test_request_body_too_large_with_no_cl_http11
def test_request_body_too_large_with_no_cl_http11(self):
body = 'a' * self.toobig
to_send = "GET / HTTP/1.1\n\n"
to_send += body
to_send = tobytes(to_send)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile('rb')
# server trusts the content-length header (assumed 0)
line, headers, response_body = read_http(fp)
self.assertline(line, '200', 'OK', 'HTTP/1.1')
cl = int(headers['content-length'])
self.assertEqual(cl, len(response_body))
# server assumes pipelined requests due to http/1.1, and the first
# request was assumed c-l 0 because it had no content-length header,
# so entire body looks like the header of the subsequent request
# second response is an error response
line, headers, response_body = read_http(fp)
self.assertline(line, '431', 'Request Header Fields Too Large',
'HTTP/1.0')
cl = int(headers['content-length'])
self.assertEqual(cl, len(response_body))
# connection has been closed
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:ankit585,项目名称:thespark,代码行数:25,代码来源:test_functional.py
示例12: test_http10_listlentwo
def test_http10_listlentwo(self):
body = string.ascii_letters
to_send = "GET /list_lentwo HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: %d\n\n" % len(body)
to_send += body
to_send = tobytes(to_send)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers.get("content-length"), None)
self.assertEqual(headers.get("connection"), "close")
self.assertEqual(response_body, tobytes(body))
# remote closed connection (despite keepalive header), because
# lists of length > 1 cannot have their content length divined
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py
示例13: test_long_body
def test_long_body(self):
# check server doesnt close connection when body is too long
# for cl header
to_send = tobytes("GET /long_body HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
content_length = int(headers.get("content-length")) or None
self.assertEqual(content_length, 9)
self.assertEqual(content_length, len(response_body))
self.assertEqual(response_body, tobytes("abcdefghi"))
# remote does not close connection (keepalive header)
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py
示例14: test_http11_list
def test_http11_list(self):
body = string.ascii_letters
to_send = "GET /list HTTP/1.1\n" "Content-Length: %d\n\n" % len(body)
to_send += body
to_send = tobytes(to_send)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
self.assertEqual(headers["content-length"], str(len(body)))
self.assertEqual(response_body, tobytes(body))
# remote keeps connection open because it divined the content length
# from a length-1 list
self.sock.send(to_send)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
开发者ID:Kleptine,项目名称:Craft,代码行数:17,代码来源:test_functional.py
示例15: test_chunking_request_without_content
def test_chunking_request_without_content(self):
header = tobytes("GET / HTTP/1.1\n" "Transfer-Encoding: chunked\n\n")
self.sock.connect((self.host, self.port))
self.sock.send(header)
self.sock.send(b"0\r\n\r\n")
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
self.assertEqual(response_body, b"")
开发者ID:Kleptine,项目名称:Craft,代码行数:9,代码来源:test_functional.py
示例16: test_short_body
def test_short_body(self):
# check to see if server closes connection when body is too short
# for cl header
to_send = tobytes("GET /short_body HTTP/1.0\n" "Connection: Keep-Alive\n" "Content-Length: 0\n" "\n")
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
# server trusts the content-length header (5)
self.assertline(line, "200", "OK", "HTTP/1.0")
cl = int(headers["content-length"])
self.assertEqual(cl, 9)
self.assertNotEqual(cl, len(response_body))
self.assertEqual(len(response_body), cl - 1)
self.assertEqual(response_body, tobytes("abcdefgh"))
# remote closed connection (despite keepalive header)
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:18,代码来源:test_functional.py
示例17: test_http11_listlentwo
def test_http11_listlentwo(self):
body = string.ascii_letters
to_send = "GET /list_lentwo HTTP/1.1\n" "Content-Length: %s\n\n" % len(body)
to_send += body
to_send = tobytes(to_send)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb")
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.1")
expected = b""
for chunk in (body[0], body[1:]):
expected += tobytes("%s\r\n%s\r\n" % (str(hex(len(chunk))[2:].upper()), chunk))
expected += b"0\r\n\r\n"
self.assertEqual(response_body, expected)
# connection is always closed at the end of a chunked response
self.sock.send(to_send)
self.assertRaises(ConnectionClosed, read_http, fp)
开发者ID:Kleptine,项目名称:Craft,代码行数:18,代码来源:test_functional.py
示例18: test_send_empty_body
def test_send_empty_body(self):
to_send = "GET / HTTP/1.0\n" "Content-Length: 0\n\n"
to_send = tobytes(to_send)
self.sock.connect((self.host, self.port))
self.sock.send(to_send)
fp = self.sock.makefile("rb", 0)
line, headers, response_body = read_http(fp)
self.assertline(line, "200", "OK", "HTTP/1.0")
self.assertEqual(headers.get("content-length"), "0")
self.assertEqual(response_body, b"")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py
示例19: test_keepalive_http11_explicit
def test_keepalive_http11_explicit(self):
# Explicitly set keep-alive
data = "Default: Keep me alive"
s = tobytes("GET / HTTP/1.1\n" "Connection: keep-alive\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
self.sock.connect((self.host, self.port))
self.sock.send(s)
response = httplib.HTTPResponse(self.sock)
response.begin()
self.assertEqual(int(response.status), 200)
self.assertTrue(response.getheader("connection") != "close")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py
示例20: test_keepalive_http11_connclose
def test_keepalive_http11_connclose(self):
# specifying Connection: close explicitly
data = "Don't keep me alive"
s = tobytes("GET / HTTP/1.1\n" "Connection: close\n" "Content-Length: %d\n" "\n" "%s" % (len(data), data))
self.sock.connect((self.host, self.port))
self.sock.send(s)
response = httplib.HTTPResponse(self.sock)
response.begin()
self.assertEqual(int(response.status), 200)
self.assertEqual(response.getheader("connection"), "close")
开发者ID:Kleptine,项目名称:Craft,代码行数:10,代码来源:test_functional.py
注:本文中的waitress.compat.tobytes函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论