本文整理汇总了Python中tornado.util.b函数的典型用法代码示例。如果您正苦于以下问题:Python b函数的具体用法?Python b怎么用?Python b使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了b函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_two_requests
def test_two_requests(self):
self.connect()
self.stream.write(b("GET / HTTP/1.1\r\n\r\n"))
self.read_response()
self.stream.write(b("GET / HTTP/1.1\r\n\r\n"))
self.read_response()
self.close()
开发者ID:nickwong,项目名称:tornado,代码行数:7,代码来源:httpserver_test.py
示例2: __call__
def __call__(self, request):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data["status"] = status
data["headers"] = response_headers
return response.append
app_response = self.wsgi_application(
WSGIContainer.environ(request), start_response)
response.extend(app_response)
body = b("").join(response)
if hasattr(app_response, "close"):
app_response.close()
if not data: raise Exception("WSGI app did not call start_response")
status_code = int(data["status"].split()[0])
headers = data["headers"]
header_set = set(k.lower() for (k,v) in headers)
body = escape.utf8(body)
if "content-length" not in header_set:
headers.append(("Content-Length", str(len(body))))
if "content-type" not in header_set:
headers.append(("Content-Type", "text/html; charset=UTF-8"))
if "server" not in header_set:
headers.append(("Server", "TornadoServer/%s" % tornado.version))
parts = [escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")]
for key, value in headers:
parts.append(escape.utf8(key) + b(": ") + escape.utf8(value) + b("\r\n"))
parts.append(b("\r\n"))
parts.append(body)
request.write(b("").join(parts))
request.finish()
self._log(status_code, request)
开发者ID:BantouTelecom,项目名称:alertbirds-community-edition,代码行数:34,代码来源:wsgi.py
示例3: test_streaming_callback
def test_streaming_callback(self):
server, client = self.make_iostream_pair()
try:
chunks = []
final_called = []
def streaming_callback(data):
chunks.append(data)
self.stop()
def final_callback(data):
self.assertFalse(data)
final_called.append(True)
self.stop()
server.read_bytes(6, callback=final_callback,
streaming_callback=streaming_callback)
client.write(b("1234"))
self.wait(condition=lambda: chunks)
client.write(b("5678"))
self.wait(condition=lambda: final_called)
self.assertEqual(chunks, [b("1234"), b("56")])
# the rest of the last chunk is still in the buffer
server.read_bytes(2, callback=self.stop)
data = self.wait()
self.assertEqual(data, b("78"))
finally:
server.close()
client.close()
开发者ID:CNCBASHER,项目名称:tornado,代码行数:29,代码来源:iostream_test.py
示例4: test_escaping
def test_escaping(self):
self.assertRaises(ParseError, lambda: Template("{{"))
self.assertRaises(ParseError, lambda: Template("{%"))
self.assertEqual(Template("{{!").generate(), b("{{"))
self.assertEqual(Template("{%!").generate(), b("{%"))
self.assertEqual(Template("{{ 'expr' }} {{!jquery expr}}").generate(),
b("expr {{jquery expr}}"))
开发者ID:NickNeedsAName,项目名称:tornado,代码行数:7,代码来源:template_test.py
示例5: _on_headers
def _on_headers(self, data):
data = native_str(data.decode("latin1"))
first_line, _, header_data = data.partition("\n")
match = re.match("HTTP/1.[01] ([0-9]+) ([^\r]*)", first_line)
assert match
code = int(match.group(1))
if 100 <= code < 200:
self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
return
else:
self.code = code
self.reason = match.group(2)
self.headers = HTTPHeaders.parse(header_data)
if "Content-Length" in self.headers:
if "," in self.headers["Content-Length"]:
# Proxies sometimes cause Content-Length headers to get
# duplicated. If all the values are identical then we can
# use them but if they differ it's an error.
pieces = re.split(r',\s*', self.headers["Content-Length"])
if any(i != pieces[0] for i in pieces):
raise ValueError("Multiple unequal Content-Lengths: %r" %
self.headers["Content-Length"])
self.headers["Content-Length"] = pieces[0]
content_length = int(self.headers["Content-Length"])
else:
content_length = None
if self.request.header_callback is not None:
# re-attach the newline we split on earlier
self.request.header_callback(first_line + _)
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
self.request.header_callback('\r\n')
if self.request.method == "HEAD" or self.code == 304:
# HEAD requests and 304 responses never have content, even
# though they may have content-length headers
self._on_body(b(""))
return
if 100 <= self.code < 200 or self.code == 204:
# These response codes never have bodies
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
if ("Transfer-Encoding" in self.headers or
content_length not in (None, 0)):
raise ValueError("Response with code %d should not have body" %
self.code)
self._on_body(b(""))
return
if (self.request.use_gzip and
self.headers.get("Content-Encoding") == "gzip"):
self._decompressor = GzipDecompressor()
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b("\r\n"), self._on_chunk_length)
elif content_length is not None:
self.stream.read_bytes(content_length, self._on_body)
else:
self.stream.read_until_close(self._on_body)
开发者ID:CNCBASHER,项目名称:tornado,代码行数:60,代码来源:simple_httpclient.py
示例6: test_cookie_tampering_future_timestamp
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
handler.set_secure_cookie('foo', binascii.a2b_hex(b('d76df8e7aefc')))
cookie = handler._cookies['foo']
match = re.match(b(r'12345678\|([0-9]+)\|([0-9a-f]+)'), cookie)
self.assertTrue(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
_create_signature(handler.application.settings["cookie_secret"],
'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
_create_signature(handler.application.settings["cookie_secret"],
'foo', '1234', b('5678') + timestamp),
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
self.assertTrue(handler.get_secure_cookie('foo') is None)
开发者ID:CNCBASHER,项目名称:tornado,代码行数:26,代码来源:web_test.py
示例7: test_unextended_block
def test_unextended_block(self):
loader = DictLoader(self.templates)
name = "<script>"
self.assertEqual(loader.load("escaped_block.html").generate(name=name),
b("base: <script>"))
self.assertEqual(loader.load("unescaped_block.html").generate(name=name),
b("base: <script>"))
开发者ID:NickNeedsAName,项目名称:tornado,代码行数:7,代码来源:template_test.py
示例8: test_close_buffered_data
def test_close_buffered_data(self):
# Similar to the previous test, but with data stored in the OS's
# socket buffers instead of the IOStream's read buffer. Out-of-band
# close notifications must be delayed until all data has been
# drained into the IOStream buffer. (epoll used to use out-of-band
# close events with EPOLLRDHUP, but no longer)
#
# This depends on the read_chunk_size being smaller than the
# OS socket buffer, so make it small.
server, client = self.make_iostream_pair(read_chunk_size=256)
try:
server.write(b("A") * 512)
client.read_bytes(256, self.stop)
data = self.wait()
self.assertEqual(b("A") * 256, data)
server.close()
# Allow the close to propagate to the client side of the
# connection. Using add_callback instead of add_timeout
# doesn't seem to work, even with multiple iterations
self.io_loop.add_timeout(self.io_loop.time() + 0.01, self.stop)
self.wait()
client.read_bytes(256, self.stop)
data = self.wait()
self.assertEqual(b("A") * 256, data)
finally:
server.close()
client.close()
开发者ID:CNCBASHER,项目名称:tornado,代码行数:27,代码来源:iostream_test.py
示例9: test_delayed_close_callback
def test_delayed_close_callback(self):
# The scenario: Server closes the connection while there is a pending
# read that can be served out of buffered data. The client does not
# run the close_callback as soon as it detects the close, but rather
# defers it until after the buffered read has finished.
server, client = self.make_iostream_pair()
try:
client.set_close_callback(self.stop)
server.write(b("12"))
chunks = []
def callback1(data):
chunks.append(data)
client.read_bytes(1, callback2)
server.close()
def callback2(data):
chunks.append(data)
client.read_bytes(1, callback1)
self.wait() # stopped by close_callback
self.assertEqual(chunks, [b("1"), b("2")])
finally:
server.close()
client.close()
开发者ID:stillzhl,项目名称:tornado,代码行数:25,代码来源:iostream_test.py
示例10: test_write_while_connecting
def test_write_while_connecting(self):
stream = self._make_client_iostream()
connected = [False]
def connected_callback():
connected[0] = True
self.stop()
stream.connect(("localhost", self.get_http_port()),
callback=connected_callback)
# unlike the previous tests, try to write before the connection
# is complete.
written = [False]
def write_callback():
written[0] = True
self.stop()
stream.write(b("GET / HTTP/1.0\r\nConnection: close\r\n\r\n"),
callback=write_callback)
self.assertTrue(not connected[0])
# by the time the write has flushed, the connection callback has
# also run
try:
self.wait(lambda: connected[0] and written[0])
finally:
logging.debug((connected, written))
stream.read_until_close(self.stop)
data = self.wait()
self.assertTrue(data.endswith(b("Hello")))
stream.close()
开发者ID:CNCBASHER,项目名称:tornado,代码行数:31,代码来源:iostream_test.py
示例11: test_large_read_until
def test_large_read_until(self):
# Performance test: read_until used to have a quadratic component
# so a read_until of 4MB would take 8 seconds; now it takes 0.25
# seconds.
server, client = self.make_iostream_pair()
try:
try:
# This test fails on pypy with ssl. I think it's because
# pypy's gc defeats moves objects, breaking the
# "frozen write buffer" assumption.
if (isinstance(server, SSLIOStream) and
platform.python_implementation() == 'PyPy'):
raise unittest.SkipTest(
"pypy gc causes problems with openssl")
except AttributeError:
# python 2.5 didn't have platform.python_implementation,
# but there was no pypy for 2.5
pass
NUM_KB = 4096
for i in xrange(NUM_KB):
client.write(b("A") * 1024)
client.write(b("\r\n"))
server.read_until(b("\r\n"), self.stop)
data = self.wait()
self.assertEqual(len(data), NUM_KB * 1024 + 2)
finally:
server.close()
client.close()
开发者ID:CNCBASHER,项目名称:tornado,代码行数:28,代码来源:iostream_test.py
示例12: test_get_argument
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
self.assertEqual(response.body, b("bar"))
response = self.fetch("/get_argument?foo=")
self.assertEqual(response.body, b(""))
response = self.fetch("/get_argument")
self.assertEqual(response.body, b("default"))
开发者ID:CNCBASHER,项目名称:tornado,代码行数:7,代码来源:web_test.py
示例13: test_multipart_form
def test_multipart_form(self):
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch(
[
b("POST /multipart HTTP/1.0"),
b("Content-Type: multipart/form-data; boundary=1234567890"),
b("X-Header-encoding-test: \xe9"),
],
b("\r\n").join(
[
b("Content-Disposition: form-data; name=argument"),
b(""),
u"\u00e1".encode("utf-8"),
b("--1234567890"),
u'Content-Disposition: form-data; name="files"; filename="\u00f3"'.encode("utf8"),
b(""),
u"\u00fa".encode("utf-8"),
b("--1234567890--"),
b(""),
]
),
)
data = json_decode(response.body)
self.assertEqual(u"\u00e9", data["header"])
self.assertEqual(u"\u00e1", data["argument"])
self.assertEqual(u"\u00f3", data["filename"])
self.assertEqual(u"\u00fa", data["filebody"])
开发者ID:nickwong,项目名称:tornado,代码行数:28,代码来源:httpserver_test.py
示例14: _on_headers
def _on_headers(self, data):
data = native_str(data.decode("latin1"))
first_line, _, header_data = data.partition("\n")
match = re.match("HTTP/1.[01] ([0-9]+)", first_line)
assert match
self.code = int(match.group(1))
self.headers = HTTPHeaders.parse(header_data)
if self.code == 100:
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html#sec8.2.3
# support HTTP/1.1 100 Continue
if self.request.body is not None:
self.stream.write(self.request.body)
self.stream.read_until_regex(b("\r?\n\r?\n"), self._on_headers)
return
if "Content-Length" in self.headers:
if "," in self.headers["Content-Length"]:
# Proxies sometimes cause Content-Length headers to get
# duplicated. If all the values are identical then we can
# use them but if they differ it's an error.
pieces = re.split(r',\s*', self.headers["Content-Length"])
if any(i != pieces[0] for i in pieces):
raise ValueError("Multiple unequal Content-Lengths: %r" %
self.headers["Content-Length"])
self.headers["Content-Length"] = pieces[0]
content_length = int(self.headers["Content-Length"])
else:
content_length = None
if self.request.header_callback is not None:
for k, v in self.headers.get_all():
self.request.header_callback("%s: %s\r\n" % (k, v))
if self.request.method == "HEAD":
# HEAD requests never have content, even though they may have
# content-length headers
self._on_body(b(""))
return
if 100 <= self.code < 200 or self.code in (204, 304):
# These response codes never have bodies
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
assert "Transfer-Encoding" not in self.headers
assert content_length in (None, 0)
self._on_body(b(""))
return
if (self.request.use_gzip and
self.headers.get("Content-Encoding") == "gzip"):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b("\r\n"), self._on_chunk_length)
elif content_length is not None:
self.stream.read_bytes(content_length, self._on_body)
else:
self.stream.read_until_close(self._on_body)
开发者ID:yyuu,项目名称:tornado,代码行数:59,代码来源:simple_httpclient.py
示例15: test_default
def test_default(self):
response = self.fetch("/default")
self.assertEqual(response.code, 500)
self.assertTrue(b("500: Internal Server Error") in response.body)
response = self.fetch("/default?status=503")
self.assertEqual(response.code, 503)
self.assertTrue(b("503: Service Unavailable") in response.body)
开发者ID:mathphreak,项目名称:curtains,代码行数:8,代码来源:web_test.py
示例16: test_write_error
def test_write_error(self):
response = self.fetch("/write_error")
self.assertEqual(response.code, 500)
self.assertEqual(b("Exception: ZeroDivisionError"), response.body)
response = self.fetch("/write_error?status=503")
self.assertEqual(response.code, 503)
self.assertEqual(b("Status: 503"), response.body)
开发者ID:mathphreak,项目名称:curtains,代码行数:8,代码来源:web_test.py
示例17: write_message
def write_message(self, message, binary=False):
"""Sends the given message to the client of this Web Socket."""
if binary:
raise ValueError("Binary messages not supported by this version of websockets")
if isinstance(message, unicode):
message = message.encode("utf-8")
assert isinstance(message, bytes_type)
self.stream.write(b("\x00") + message + b("\xff"))
开发者ID:stillzhl,项目名称:tornado,代码行数:8,代码来源:websocket.py
示例18: read_headers
def read_headers(self):
self.stream.read_until(b("\r\n"), self.stop)
first_line = self.wait()
self.assertTrue(first_line.startswith(self.http_version + b(" 200")), first_line)
self.stream.read_until(b("\r\n\r\n"), self.stop)
header_bytes = self.wait()
headers = HTTPHeaders.parse(header_bytes.decode("latin1"))
return headers
开发者ID:nickwong,项目名称:tornado,代码行数:8,代码来源:httpserver_test.py
示例19: test_hello_world
def test_hello_world(self):
response = self.fetch("/hello")
self.assertEqual(response.code, 200)
self.assertEqual(response.headers["Content-Type"], "text/plain")
self.assertEqual(response.body, b("Hello world!"))
response = self.fetch("/hello?name=Ben")
self.assertEqual(response.body, b("Hello Ben!"))
开发者ID:nasi,项目名称:tornado,代码行数:8,代码来源:httpclient_test.py
示例20: test_chunked
def test_chunked(self):
response = self.fetch("/chunk")
self.assertEqual(response.body, b("asdfqwer"))
chunks = []
response = self.fetch("/chunk", streaming_callback=chunks.append)
self.assertEqual(chunks, [b("asdf"), b("qwer")])
self.assertFalse(response.body)
开发者ID:niedbalski,项目名称:tornado,代码行数:8,代码来源:httpclient_test.py
注:本文中的tornado.util.b函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论