• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python selectreactor.SelectReactor类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.internet.selectreactor.SelectReactor的典型用法代码示例。如果您正苦于以下问题:Python SelectReactor类的具体用法?Python SelectReactor怎么用?Python SelectReactor使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了SelectReactor类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _start_run

    def _start_run(self, config, app, stdout_expected, stderr_expected, end_on):

        with open(self.config, "wb") as f:
            f.write(json.dumps(config, ensure_ascii=False).encode("utf8"))

        with open(self.code_location + "/myapp.py", "w") as f:
            f.write(app)

        reactor = SelectReactor()

        make_lc(self, reactor, end_on)

        # In case it hard-locks
        reactor.callLater(self._subprocess_timeout, reactor.stop)

        cli.run("crossbar", ["start", "--cbdir={}".format(self.cbdir), "--logformat=syslogd"], reactor=reactor)

        out = self.stdout.getvalue()
        err = self.stderr.getvalue()
        for i in stdout_expected:
            if i not in out:
                self.fail(u"Error: '{}' not in:\n{}".format(i, out))

        for i in stderr_expected:
            if i not in err:
                self.fail(u"Error: '{}' not in:\n{}".format(i, err))
开发者ID:oberstet,项目名称:crossbar,代码行数:26,代码来源:test_run.py


示例2: test_unix_already_listening

    def test_unix_already_listening(self):
        """
        A config with type = "unix" will create an endpoint for a UNIX socket
        at the given path, and delete it if required.
        """
        path = FilePath("/tmp").child(uuid4().hex).path
        self.addCleanup(os.remove, path)

        # Something is already there
        FilePath(path).setContent(b"")

        reactor = SelectReactor()
        config = {
            "type": "unix",
            "path": path
        }

        endpoint = create_listening_endpoint_from_config(config, self.cbdir,
                                                         reactor, self.log)
        self.assertTrue(isinstance(endpoint, UNIXServerEndpoint))

        factory = Factory.forProtocol(Echo)
        endpoint.listen(factory)

        self.assertIn(
            factory,
            [getattr(x, "factory", None) for x in reactor.getReaders()])
开发者ID:abhimanyu-siwach,项目名称:crossbar,代码行数:27,代码来源:test_endpoint.py


示例3: _start_run

    def _start_run(self, config, app, stdout_expected, stderr_expected,
                   end_on):

        with open(self.config, "wb") as f:
            f.write(json.dumps(config, ensure_ascii=False).encode('utf8'))

        with open(self.code_location + "/myapp.py", "w") as f:
            f.write(app)

        reactor = SelectReactor()

        make_lc(self, reactor, end_on)

        # In case it hard-locks
        reactor.callLater(self._subprocess_timeout, reactor.stop)

        cli.run("crossbar",
                ["start",
                 "--cbdir={}".format(self.cbdir),
                 "--logformat=syslogd"],
                reactor=reactor)

        for i in stdout_expected:
            self.assertIn(i, self.stdout.getvalue())

        for i in stderr_expected:
            self.assertIn(i, self.stderr.getvalue())
开发者ID:snowattitudes,项目名称:crossbar,代码行数:27,代码来源:test_run.py


示例4: test_hello

    def test_hello(self):
        def _check(lc, reactor):
            if "published to 'oncounter'" in self.stdout.getvalue():
                lc.stop()
                try:
                    reactor.stop()
                except:
                    pass

        appdir = self.mktemp()
        cbdir = os.path.join(appdir, ".crossbar")

        reactor = SelectReactor()
        cli.run("crossbar", ["init", "--appdir={}".format(appdir), "--template=hello:python"], reactor=reactor)

        self.assertIn("Application template initialized", self.stdout.getvalue())

        reactor = SelectReactor()
        make_lc(self, reactor, _check)

        # In case it hard-locks
        reactor.callLater(self._subprocess_timeout, reactor.stop)

        cli.run("crossbar", ["start", "--cbdir={}".format(cbdir.path), "--logformat=syslogd"], reactor=reactor)

        stdout_expected = ["published to 'oncounter'"]

        for i in stdout_expected:
            self.assertIn(i, self.stdout.getvalue())
开发者ID:oberstet,项目名称:crossbar,代码行数:29,代码来源:test_run.py


示例5: test_uninvited_pubrel

def test_uninvited_pubrel(host, port):
    record = [
        Frame(
            send=True,
            data=Connect(client_id=u"test_pubrel",
                         flags=ConnectFlags(clean_session=True))),
        Frame(
            send=False,
            data=ConnACK(session_present=False, return_code=0)),
        Frame(
            send=True,
            data=PubREL(packet_identifier=1234)),
        Frame(
            send=False,
            data=PubCOMP(packet_identifier=1234)),
        Frame(
            send=True,
            data=Disconnect()),
        ConnectionLoss(),
    ]

    r = SelectReactor()
    f = ReplayClientFactory(r, record)
    e = TCP4ClientEndpoint(r, host, port)
    e.connect(f)
    r.run()

    return Result("uninvited_pubrel", f.success, f.reason, f.client_transcript)
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:28,代码来源:interop_tests.py


示例6: _preenDescriptors

 def _preenDescriptors(self):
     for fdMap in (self.readFdMap, self.writeFdMap):
         lst = fdMap.keys()
         for fd in lst:
             try:
                 select(fd + 1, [fd], [fd], [fd], 0)
             except Exception:
                 fdMap.pop(fd, None)
     SelectReactor._preenDescriptors(self)
开发者ID:aidenlai,项目名称:ZenPacks.zenoss.PySamba,代码行数:9,代码来源:reactor.py


示例7: doIteration

    def doIteration(self, t):
        # Do the same as in mainLoop() first.
        self.runUntilCurrent()
        t2 = self.timeout()

        if t2 is not None:
            t = min(t, self.running and t2)

        SelectReactor.doIteration(self, t)
        self.loopCall()
开发者ID:MaxFangX,项目名称:viff,代码行数:10,代码来源:reactor.py


示例8: __init__

        def __init__(self):
            SelectReactor.__init__(self)
            self.paused = False
            self._return_value = None
            self._release_requested = False
            self._mainLoopGen = None

            # Older versions of twisted do not have the _started attribute, make it a synonym for running in that case
            if not hasattr(self, '_started'):
                PausingReactor._started = property(lambda self: self.running)
开发者ID:Anaerin,项目名称:Flexget,代码行数:10,代码来源:plugin_deluge.py


示例9: stop

        def stop(self):
            """Stops the reactor."""
            SelectReactor.stop(self)
            # If this was called while the reactor was paused we have to resume in order for it to complete
            if self.paused:
                self.run()

            # These need to be re-registered so that the PausingReactor can be safely restarted after a stop
            self.addSystemEventTrigger('during', 'shutdown', self.crash)
            self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
开发者ID:Anaerin,项目名称:Flexget,代码行数:10,代码来源:plugin_deluge.py


示例10: __init__

 def __init__(self, tempo=130, meters=(), reactor=None):
     self.tempo = tempo
     self.ticks = 0
     # self.setTempo(tempo)
     self._tick_interval = (60.0 / tempo) * (1.0 / 24)
     self.meters = meters
     self._meter_schedule = {}
     if not self.meters:
         self.meters = [Meter(4, 4, 1)]
     if not reactor:
         from twisted.internet import reactor
     self.reactor = reactor
     SelectReactor.__init__(self)
开发者ID:djfroofy,项目名称:txbeatlounge,代码行数:13,代码来源:scheduler2.py


示例11: _preenDescriptors

 def _preenDescriptors(self):
     for fdMap, lst in (
         (self.readFdMap, self.readFdMap.keys()),
         (self.writeFdMap.keys(), self.writeFdMap.keys())
         ):
         for fd in lst:
             try:
                 select(fd + 1, [fd], [fd], [fd], 0)
             except:
                 try:
                     fdMap.pop(fd)
                 except IndexError:
                     pass
     SelectReactor._preenDescriptors(self)
开发者ID:Marvin-Lee,项目名称:libwmiclient,代码行数:14,代码来源:reactor.py


示例12: test_qos2_send_wrong_confirm

def test_qos2_send_wrong_confirm(host, port):
    record = [
        Frame(
            send=True,
            data=Connect(client_id=u"test_wrong_confirm_qos2",
                         flags=ConnectFlags(clean_session=True))),
        Frame(
            send=False,
            data=ConnACK(session_present=False, return_code=0)),
        Frame(
            send=True,
            data=Subscribe(packet_identifier=1234,
                           topic_requests=[SubscriptionTopicRequest(u"foo", 2)])),
        Frame(
            send=False,
            data=SubACK(packet_identifier=1234, return_codes=[2])),
        Frame(
            send=True,
            data=Publish(duplicate=False, qos_level=2, topic_name=u"foo",
                         payload=b"abc", retain=False, packet_identifier=12)),
        Frame(
            send=False,
            data=[
                PubREC(packet_identifier=12),
                Publish(duplicate=False, qos_level=2, topic_name=u"foo",
                        payload=b"abc", retain=False, packet_identifier=1),
                PubCOMP(packet_identifier=12)]),
        Frame(
            send=True,
            data=PubREL(packet_identifier=12)),
        Frame(
            send=True,
            data=PubACK(packet_identifier=1)),
        Frame(
            send=False,
            data=b""),
        Frame(
            send=True,
            data=Disconnect()),
        ConnectionLoss(),
    ]

    r = SelectReactor()
    f = ReplayClientFactory(r, record)
    e = TCP4ClientEndpoint(r, host, port)
    e.connect(f)
    r.run()

    return Result("qos2_wrong_confirm", f.success, f.reason, f.client_transcript)
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:49,代码来源:interop_tests.py


示例13: test_start

    def test_start(self):
        """
        A basic start, that doesn't actually enter the reactor.
        """
        with open(self.config, "w") as f:
            f.write("""{"controller": {}}""")

        reactor = SelectReactor()
        reactor.run = lambda: False

        cli.run("crossbar",
                ["start", "--cbdir={}".format(self.cbdir),
                 "--logformat=syslogd"],
                reactor=reactor)

        self.assertIn("Entering reactor event loop", self.stdout.getvalue())
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:16,代码来源:test_cli.py


示例14: __init__

	def __init__(self, evManager):
		self.state = ReactorSpinController.STATE_STOPPED
		self.evManager = evManager
		self.evManager.RegisterListener( self )
		self.reactor = SelectReactor()
		installReactor(self.reactor)
		self.loopingCall = LoopingCall(self.FireTick)
开发者ID:ClashTeak,项目名称:writing_games_tutorial,代码行数:7,代码来源:client.py


示例15: supplicant

def supplicant():
    """Run a reactor and provide access to the supplicant driver"""
    reactor = SelectReactor()
    t = threading.Thread(target=reactor.run, kwargs={'installSignalHandlers': 0})
    t.start()
    time.sleep(0.1)  # let reactor start
    driver = WpaSupplicantDriver(reactor)
    supplicant = driver.connect()
    try:
        yield supplicant
    except Exception as e:
        print('FAIL - {}'.format(e))
    else:
        print('OK')
    reactor.disconnectAll()
    reactor.sigTerm()
    t.join()
开发者ID:devmapal,项目名称:python-wpa-supplicant,代码行数:17,代码来源:cli.py


示例16: __init__

 def __init__(self):
     self.keepGoing = True
     self.reactor = SelectReactor()
     installReactor(self.reactor)
     connection = self.reactor.connectTCP('localhost', 8000, factory)
     self.reactor.startRunning()
     self.futureCall = None
     self.futureCallTimeout = None
     pygame_test.prepare()
开发者ID:ClashTeak,项目名称:writing_games_tutorial,代码行数:9,代码来源:multi_controller_test.py


示例17: doIteration

    def doIteration(self, delay):
        """
        Perform a single iteration of the reactor. Here we make sure that
        all of our file descriptors that we need to watch are and then delegate
        the actual watching back to the twisted reactor.
        """

        self.callTimeouts(delay)
        return SelectReactor.doIteration(self, delay)
开发者ID:aidenlai,项目名称:ZenPacks.zenoss.PySamba,代码行数:9,代码来源:reactor.py


示例18: setUp

 def setUp(self):
     mocks.init()
     self._taskrunner = ThreadedTaskRunner()
     self._taskrunner.start()
     self._reactor = SelectReactor()
     self._driver = WpaSupplicantDriver(self._reactor)
     self._reactor_thread = threading.Thread(target=self._reactor.run,
                                             kwargs={'installSignalHandlers': 0})
     self._reactor_thread.start()
     time.sleep(0.1)
     self._supplicant = self._driver.connect()
开发者ID:iRaffnix,项目名称:python-wpa-supplicant,代码行数:11,代码来源:test_core.py


示例19: test_fileLogging

    def test_fileLogging(self):
        """
        Running `crossbar start --logtofile` will log to cbdir/node.log.
        """
        with open(self.config, "w") as f:
            f.write("""{"controller": {}}""")

        reactor = SelectReactor()
        reactor.run = lambda: None

        cli.run("crossbar",
                ["start", "--cbdir={}".format(self.cbdir), "--logtofile"],
                reactor=reactor)

        with open(os.path.join(self.cbdir, "node.log"), "r") as f:
            logFile = f.read()

        self.assertIn("Entering reactor event loop", logFile)
        self.assertEqual("", self.stderr.getvalue())
        self.assertEqual("", self.stdout.getvalue())
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:20,代码来源:test_cli.py


示例20: test_quirks_mode_connect

def test_quirks_mode_connect(host, port):
    record = [
        Frame(
            send=True,
            data=b"\x10\x15\x00\x04MQTT\x04\x02\x00x\x00\x07testqrk\x00\x00"),
        Frame(
            send=False,
            data=ConnACK(session_present=False, return_code=0)),
        Frame(
            send=True,
            data=Disconnect()),
        ConnectionLoss(),
    ]

    r = SelectReactor()
    f = ReplayClientFactory(r, record)
    e = TCP4ClientEndpoint(r, host, port)
    e.connect(f)
    r.run()

    return Result("connect_quirks", f.success, f.reason, f.client_transcript)
开发者ID:NinjaMSP,项目名称:crossbar,代码行数:21,代码来源:interop_tests.py



注:本文中的twisted.internet.selectreactor.SelectReactor类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python ssl.optionsForClientTLS函数代码示例发布时间:2022-05-27
下一篇:
Python reactor.suggestThreadPoolSize函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap