• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python websocket.websocket_connect函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中tornado.websocket.websocket_connect函数的典型用法代码示例。如果您正苦于以下问题:Python websocket_connect函数的具体用法?Python websocket_connect怎么用?Python websocket_connect使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了websocket_connect函数的4个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_status_dual_ws

    def test_status_dual_ws(self):

        _mock_device = self._mock_device
        class EventThread(threading.Thread):
            def run(self):
                for x in xrange(5):
                    stat = x % 2 and 'ready' or 'unavailable'
                    _mock_device._set_device('rtl', stat)
                    time.sleep(.2)
  
        url = self.get_url('/status').replace('http', 'ws')
        conn1 = yield websocket.websocket_connect(url,
                                                  io_loop=self.io_loop) 
        conn2 = yield websocket.websocket_connect(url,
                                                  io_loop=self.io_loop) 
        self.io_loop.add_callback(EventThread().start)
        for x in xrange(5):
            stat = x % 2 and 'ready' or 'unavailable'
            message = yield conn1.read_message()
            logging.debug("Connection 1 message #%s: %s", x, message)
            data = json.loads(message)
            self.assertEquals(stat, data['body']['status'])

            message = yield conn2.read_message()
            logging.debug("Connection 2 message #%s: %s", x, message)
            data = json.loads(message)
            self.assertEquals(stat, data['body']['status'])
        conn1.close()
        conn2.close()
        # FIXME: Ensure that close is being called on websocket so that is tested
        # tried with stop(), but unsure if that does anything
        self.stop()
开发者ID:RedhawkSDR,项目名称:rtl-demo-app,代码行数:32,代码来源:test_server.py


示例2: start

 def start(self):
     websocket_connect(
         self.url,
     #     self.ioloop, # no longer takes this arg?
         callback=self.on_connected,
         on_message_callback=self.on_message)
     self.ioloop.start()
开发者ID:cottrell,项目名称:notebooks,代码行数:7,代码来源:do.py


示例3: _connect

    def _connect(self,
                 conn_type,
                 session,
                 force_close,
                 force_release,
                 pool):
        future = self._future_class()
        request = self._connector(self._url)
        if self._timeout:
            future_conn = with_timeout(timeout, websocket_connect(request))
        else:
            future_conn = websocket_connect(request)

        def get_conn(f):
            try:
                conn = f.result()
            except socket.error:
                future.set_exception(
                    RuntimeError("Could not connect to server."))
            except socket.gaierror:
                future.set_exception(
                    RuntimeError("Could not connect to server."))
            except HTTPError as e:
                future.set_exception(e)
            except Exception as e:
                future.set_exception(e)
            else:
                resp = Response(conn, self._future_class, self._loop)
                gc = conn_type(resp, self._future_class, self._timeout,
                               self._username, self._password, self._loop,
                               force_close, pool, force_release, session)
                future.set_result(gc)
        future_conn.add_done_callback(get_conn)
        return future
开发者ID:davebshow,项目名称:gremlinclient,代码行数:34,代码来源:client.py


示例4: test_websocket_callbacks

 def test_websocket_callbacks(self):
     websocket_connect("ws://localhost:%d/echo" % self.get_http_port(), io_loop=self.io_loop, callback=self.stop)
     ws = self.wait().result()
     ws.write_message("hello")
     ws.read_message(self.stop)
     response = self.wait().result()
     self.assertEqual(response, "hello")
开发者ID:jiangsong,项目名称:tornado_pubsub,代码行数:7,代码来源:websocket_test.py



注:本文中的tornado.websocket.websocket_connect函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python wsgi.WSGIContainer类代码示例发布时间:2022-05-27
下一篇:
Python web.StaticFileHandler类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap