本文整理汇总了Python中tornado.websocket.websocket_connect函数的典型用法代码示例。如果您正苦于以下问题:Python websocket_connect函数的具体用法?Python websocket_connect怎么用?Python websocket_connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了websocket_connect函数的4个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_status_dual_ws
def test_status_dual_ws(self):
_mock_device = self._mock_device
class EventThread(threading.Thread):
def run(self):
for x in xrange(5):
stat = x % 2 and 'ready' or 'unavailable'
_mock_device._set_device('rtl', stat)
time.sleep(.2)
url = self.get_url('/status').replace('http', 'ws')
conn1 = yield websocket.websocket_connect(url,
io_loop=self.io_loop)
conn2 = yield websocket.websocket_connect(url,
io_loop=self.io_loop)
self.io_loop.add_callback(EventThread().start)
for x in xrange(5):
stat = x % 2 and 'ready' or 'unavailable'
message = yield conn1.read_message()
logging.debug("Connection 1 message #%s: %s", x, message)
data = json.loads(message)
self.assertEquals(stat, data['body']['status'])
message = yield conn2.read_message()
logging.debug("Connection 2 message #%s: %s", x, message)
data = json.loads(message)
self.assertEquals(stat, data['body']['status'])
conn1.close()
conn2.close()
# FIXME: Ensure that close is being called on websocket so that is tested
# tried with stop(), but unsure if that does anything
self.stop()
开发者ID:RedhawkSDR,项目名称:rtl-demo-app,代码行数:32,代码来源:test_server.py
示例2: start
def start(self):
websocket_connect(
self.url,
# self.ioloop, # no longer takes this arg?
callback=self.on_connected,
on_message_callback=self.on_message)
self.ioloop.start()
开发者ID:cottrell,项目名称:notebooks,代码行数:7,代码来源:do.py
示例3: _connect
def _connect(self,
conn_type,
session,
force_close,
force_release,
pool):
future = self._future_class()
request = self._connector(self._url)
if self._timeout:
future_conn = with_timeout(timeout, websocket_connect(request))
else:
future_conn = websocket_connect(request)
def get_conn(f):
try:
conn = f.result()
except socket.error:
future.set_exception(
RuntimeError("Could not connect to server."))
except socket.gaierror:
future.set_exception(
RuntimeError("Could not connect to server."))
except HTTPError as e:
future.set_exception(e)
except Exception as e:
future.set_exception(e)
else:
resp = Response(conn, self._future_class, self._loop)
gc = conn_type(resp, self._future_class, self._timeout,
self._username, self._password, self._loop,
force_close, pool, force_release, session)
future.set_result(gc)
future_conn.add_done_callback(get_conn)
return future
开发者ID:davebshow,项目名称:gremlinclient,代码行数:34,代码来源:client.py
示例4: test_websocket_callbacks
def test_websocket_callbacks(self):
websocket_connect("ws://localhost:%d/echo" % self.get_http_port(), io_loop=self.io_loop, callback=self.stop)
ws = self.wait().result()
ws.write_message("hello")
ws.read_message(self.stop)
response = self.wait().result()
self.assertEqual(response, "hello")
开发者ID:jiangsong,项目名称:tornado_pubsub,代码行数:7,代码来源:websocket_test.py
注:本文中的tornado.websocket.websocket_connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论