• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python reactors.installReactor函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.application.reactors.installReactor函数的典型用法代码示例。如果您正苦于以下问题:Python installReactor函数的具体用法?Python installReactor怎么用?Python installReactor使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了installReactor函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: runApp

def runApp(config):
	loadConfigurations(config, settings)
	
	if not sys.modules.has_key('twisted.internet.reactor'):
		from twisted.application.reactors import installReactor
		installReactor(settings.reactor)
	ApplicationRunner(config).run()
开发者ID:oscar810429,项目名称:pndfs,代码行数:7,代码来源:run.py


示例2: opt_reactor

 def opt_reactor(self, shortName):
     """
     Which reactor to use (see --help-reactors for a list of possibilities)
     """
     # Actually actually actually install the reactor right at this very
     # moment, before any other code (for example, a sub-command plugin)
     # runs and accidentally imports and installs the default reactor.
     #
     # This could probably be improved somehow.
     installReactor(shortName)
开发者ID:andrewbird,项目名称:vodafone-mobile-connect,代码行数:10,代码来源:app.py


示例3: install_optimal_reactor

def install_optimal_reactor():
   """
   Try to install the optimal Twisted reactor for platform.
   """
   import sys

   if 'bsd' in sys.platform or sys.platform.startswith('darwin'):
      try:
         v = sys.version_info
         if v[0] == 1 or (v[0] == 2 and v[1] < 6) or (v[0] == 2 and v[1] == 6 and v[2] < 5):
            raise Exception("Python version too old (%s)" % sys.version)
         from twisted.internet import kqreactor
         kqreactor.install()
      except Exception as e:
         print("""
   WARNING: Running on BSD or Darwin, but cannot use kqueue Twisted reactor.

    => %s

   To use the kqueue Twisted reactor, you will need:

     1. Python >= 2.6.5 or PyPy > 1.8
     2. Twisted > 12.0

   Note the use of >= and >.

   Will let Twisted choose a default reactor (potential performance degradation).
   """ % str(e))
         pass

   if sys.platform in ['win32']:
      try:
         from twisted.application.reactors import installReactor
         installReactor("iocp")
      except Exception as e:
         print("""
   WARNING: Running on Windows, but cannot use IOCP Twisted reactor.

    => %s

   Will let Twisted choose a default reactor (potential performance degradation).
   """ % str(e))

   if sys.platform.startswith('linux'):
      try:
         from twisted.internet import epollreactor
         epollreactor.install()
      except Exception as e:
         print("""
   WARNING: Running on Linux, but cannot use Epoll Twisted reactor.

    => %s

   Will let Twisted choose a default reactor (potential performance degradation).
   """ % str(e))
开发者ID:luxorv,项目名称:Chat-GBH,代码行数:55,代码来源:choosereactor.py


示例4: install_reactor

def install_reactor(explicit_reactor=None, verbose=False, log=None, require_optimal_reactor=True):
    """
    Install Twisted reactor.

    :param explicit_reactor: If provided, install this reactor. Else, install
        the optimal reactor.
    :type explicit_reactor: obj

    :param verbose: If ``True``, log (at level "info") the reactor that is
        in place afterwards.
    :type verbose: bool

    :param log: Explicit logging to this txaio logger object.
    :type log: obj

    :param require_optimal_reactor: If ``True`` and the desired reactor could not be installed,
        raise ``ReactorAlreadyInstalledError``, else fallback to another reactor.
    :type require_optimal_reactor: bool

    :returns: The Twisted reactor in place (`twisted.internet.reactor`).
    """
    if not log:
        log = txaio.make_logger()

    if explicit_reactor:
        # install explicitly given reactor
        #
        from twisted.application.reactors import installReactor
        if verbose:
            log.info('Trying to install explicitly specified Twisted reactor "{reactor}" ..', reactor=explicit_reactor)
        try:
            installReactor(explicit_reactor)
        except:
            log.failure('Could not install Twisted reactor {reactor}\n{log_failure.value}',
                        reactor=explicit_reactor)
            sys.exit(1)
    else:
        # automatically choose optimal reactor
        #
        if verbose:
            log.info('Automatically choosing optimal Twisted reactor ..')
        install_optimal_reactor(require_optimal_reactor)

    # now the reactor is installed, import it
    from twisted.internet import reactor
    txaio.config.loop = reactor

    if verbose:
        from twisted.python.reflect import qual
        log.info('Running on Twisted reactor {reactor}', reactor=qual(reactor.__class__))

    return reactor
开发者ID:crossbario,项目名称:autobahn-python,代码行数:52,代码来源:choosereactor.py


示例5: main

def main():
    """
    Try to install the reactor named C{qt}.  Expect it to not work.  Print
    diagnostics to stdout if something goes wrong, print nothing otherwise.
    """
    sys.meta_path.insert(0, QTNotImporter())
    try:
        reactors.installReactor('qt')
    except reactors.NoSuchReactor, e:
        if e.args != ('qt',):
            print 'Wrong arguments to NoSuchReactor:', e.args
        else:
            # Do nothing to indicate success.
            pass
开发者ID:Almad,项目名称:twisted,代码行数:14,代码来源:app_qtstub.py


示例6: test_installReactor

 def test_installReactor(self):
     """
     Test that the L{reactors.installReactor} function correctly installs
     the specified reactor.
     """
     installed = []
     def install():
         installed.append(True)
     name = 'fakereactortest'
     package = __name__
     description = 'description'
     self.pluginResults = [FakeReactor(install, name, package, description)]
     reactors.installReactor(name)
     self.assertEqual(installed, [True])
开发者ID:Berimor66,项目名称:mythbox,代码行数:14,代码来源:test_application.py


示例7: install_reactor

def install_reactor(_reactor = None, verbose = False):
   """
   Install Twisted reactor. This is used from CLI.
   """
   import sys

   if _reactor:
      ## install explicitly given reactor
      ##
      from twisted.application.reactors import installReactor
      print "Trying to install explicitly specified Twisted reactor '%s'" % _reactor
      try:
         installReactor(_reactor)
      except Exception, e:
         print "Could not install Twisted reactor %s%s" % (_reactor, ' ["%s"]' % e if verbose else '')
         sys.exit(1)
开发者ID:ajjoshi,项目名称:AutobahnPython,代码行数:16,代码来源:choosereactor.py


示例8: test_installReactorMultiplePlugins

 def test_installReactorMultiplePlugins(self):
     """
     Test that the L{reactors.installReactor} function correctly installs
     the specified reactor when there are multiple reactor plugins.
     """
     installed = []
     def install():
         installed.append(True)
     name = 'fakereactortest'
     package = __name__
     description = 'description'
     fakeReactor = FakeReactor(install, name, package, description)
     otherReactor = FakeReactor(lambda: None,
                                "otherreactor", package, description)
     self.pluginResults = [otherReactor, fakeReactor]
     reactors.installReactor(name)
     self.assertEqual(installed, [True])
开发者ID:marcelpetersen,项目名称:localnews,代码行数:17,代码来源:test_application.py


示例9: install_reactor

def install_reactor(explicit_reactor=None, verbose=False):
    """
    Install Twisted reactor.

    :param explicit_reactor: If provided, install this reactor. Else, install
        the optimal reactor.
    :type explicit_reactor: obj
    :param verbose: If ``True``, print what happens.
    :type verbose: bool
    """
    import sys
    import txaio
    txaio.use_twisted()  # just to be sure...

    log = make_logger()

    if explicit_reactor:
        # install explicitly given reactor
        ##
        from twisted.application.reactors import installReactor
        log.info("Trying to install explicitly specified Twisted reactor '{reactor}'", reactor=explicit_reactor)
        try:
            installReactor(explicit_reactor)
        except:
            log.failure("Could not install Twisted reactor {reactor}\n{log_failure.value}",
                        reactor=explicit_reactor)
            sys.exit(1)
    else:
        # automatically choose optimal reactor
        ##
        log.debug("Automatically choosing optimal Twisted reactor")
        install_optimal_reactor(verbose)

    # now the reactor is installed, import it
    from twisted.internet import reactor
    txaio.config.loop = reactor

    if verbose:
        from twisted.python.reflect import qual
        log.debug("Running Twisted reactor {reactor}", reactor=qual(reactor.__class__))

    return reactor
开发者ID:honeyflyfish,项目名称:autobahn-python,代码行数:42,代码来源:choosereactor.py


示例10: opt_reactor

 def opt_reactor(self, shortName):
     """
     Which reactor to use (see --help-reactors for a list of possibilities)
     """
     # Actually actually actually install the reactor right at this very
     # moment, before any other code (for example, a sub-command plugin)
     # runs and accidentally imports and installs the default reactor.
     #
     # This could probably be improved somehow.
     try:
         installReactor(shortName)
     except NoSuchReactor:
         msg = ("The specified reactor does not exist: '%s'.\n"
                "See the list of available reactors with "
                "--help-reactors" % (shortName,))
         raise usage.UsageError(msg)
     except Exception, e:
         msg = ("The specified reactor cannot be used, failed with error: "
                "%s.\nSee the list of available reactors with "
                "--help-reactors" % (e,))
         raise usage.UsageError(msg)
开发者ID:antong,项目名称:twisted,代码行数:21,代码来源:app.py


示例11: install_reactor

def install_reactor(explicitReactor=None, verbose=False):
    """
    Install Twisted reactor.

    :param explicitReactor: If provided, install this reactor. Else, install optimal reactor.
    :type explicitReactor: obj
    :param verbose: If ``True``, print what happens.
    :type verbose: bool
    """
    import sys
    import txaio
    txaio.use_twisted()  # just to be sure...

    if explicitReactor:
        # install explicitly given reactor
        ##
        from twisted.application.reactors import installReactor
        print("Trying to install explicitly specified Twisted reactor '%s'" % explicitReactor)
        try:
            installReactor(explicitReactor)
        except Exception as e:
            print("Could not install Twisted reactor %s%s" % (explicitReactor, ' ["%s"]' % e if verbose else ''))
            sys.exit(1)
    else:
        # automatically choose optimal reactor
        ##
        if verbose:
            print("Automatically choosing optimal Twisted reactor")
        install_optimal_reactor(verbose)

    # now the reactor is installed, import it
    from twisted.internet import reactor
    txaio.config.loop = reactor

    if verbose:
        from twisted.python.reflect import qual
        print("Running Twisted reactor %s" % qual(reactor.__class__))

    return reactor
开发者ID:Avnerus,项目名称:AutobahnPython,代码行数:39,代码来源:choosereactor.py


示例12: test_installReactorReturnsReactor

 def test_installReactorReturnsReactor(self):
     """
     Test that the L{reactors.installReactor} function correctly returns
     the installed reactor.
     """
     reactor = object()
     def install():
         from twisted import internet
         self.patch(internet, 'reactor', reactor)
     name = 'fakereactortest'
     package = __name__
     description = 'description'
     self.pluginResults = [FakeReactor(install, name, package, description)]
     installed = reactors.installReactor(name)
     self.assertIdentical(installed, reactor)
开发者ID:marcelpetersen,项目名称:localnews,代码行数:15,代码来源:test_application.py


示例13: testReactor

import sys

from twisted.application import reactors
reactors.installReactor('qt4')

from twisted.internet import reactor, task
from twisted.python import log
log.startLogging(sys.stdout)
 
def testReactor():
    print 'tick...'

def doit():
    task.LoopingCall(testReactor).start(1.0)
    reactor.callLater(15.0,reactor.stop)
    
reactor.callWhenRunning(doit)
log.msg('calling reactor.run()')
reactor.run()
log.msg('fell off the bottom?...')

#sys.exit(app.exec_())

开发者ID:D3f0,项目名称:txscada,代码行数:22,代码来源:trivialscript.py


示例14: Copyright

__author__ = 'ght'


from twisted.application import reactors

reactors.installReactor('kqueue')

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Test running processes.
"""

import gzip
import os
import sys
import signal
import StringIO
import errno
import gc
import stat
import operator
try:
    import fcntl
except ImportError:
    fcntl = process = None
else:
    from twisted.internet import process

开发者ID:brianbirke,项目名称:qtreactor,代码行数:29,代码来源:FDtest.py


示例15: ConfigParser

    config = ConfigParser(args.configfile)
except ConfigurationError as e:
    parser.error(e)

# emulate twisted configuration:
twisted_config = DefaultDict()
twisted_config['nodaemon'] = not (args.daemon or config.daemon or False)
twisted_config['no_save'] = True
twisted_config['originalname'] = 'openvpn2dns'
twisted_config['prefix'] = 'openvpn2dns'
twisted_config['rundir'] = '.'
if (args.drop or config.drop) is True:
    twisted_config['uid'] = args.user or config.user
    if not twisted_config['uid']:
        parser.error('Need user (and group) information to drop privileges')
    twisted_config['gid'] = args.group or config.group
    if not twisted_config['gid']:
        parser.error('Need group information to drop privileges')
if (args.log or config.log) == 'syslog':
    twisted_config['syslog'] = True
elif args.log or config.log:
    twisted_config['log'] = args.log or config.log
twisted_config['pidfile'] = args.pidfile or config.pidfile


if args.reactor or config.reactor:
    installReactor(args.reactor or config.reactor)

# run appliation:
OpenVpn2DnsApplication(config, twisted_config).run()
开发者ID:algby,项目名称:openvpn2dns,代码行数:30,代码来源:launch.py


示例16: installReactor

import sys, random, time, pprint

if "bsd" in sys.platform or sys.platform.startswith("darwin"):
    from twisted.internet import kqreactor

    kqreactor.install()
elif sys.platform in ["win32"]:
    from twisted.application.reactors import installReactor

    installReactor("iocp")
elif sys.platform.startswith("linux"):
    from twisted.internet import epollreactor

    epollreactor.install()

from twisted.internet import reactor

from twisted.python import log
from twisted.internet.defer import Deferred, DeferredList, gatherResults, returnValue, inlineCallbacks

from twisted.internet.threads import deferToThread


def newid():
    return "".join(
        [random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_") for i in xrange(16)]
    )


if sys.platform.startswith("win"):
    rtime = time.clock
开发者ID:luodaobin,项目名称:scratchbox,代码行数:31,代码来源:test.py


示例17: Copyright

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for lots of functionality provided by L{twisted.internet}.
"""


import os
import sys
import time

from twisted.application import reactors

reactors.installReactor("qt4")

from twisted.python.compat import _PY3
from twisted.trial import unittest
from twisted.internet import reactor, protocol, error, abstract, defer
from twisted.internet import interfaces, base

try:
    from twisted.internet import ssl
except ImportError:
    ssl = None
if ssl and not ssl.supported:
    ssl = None

from twisted.internet.defer import Deferred, maybeDeferred
from twisted.python import runtime
开发者ID:yucheng1992,项目名称:Heuristic-problem-solving,代码行数:30,代码来源:test_internet.py


示例18: main

def main(args=None, locals_=None, banner=None):
    translations.init()

    # TODO: maybe support displays other than raw_display?
    config, options, exec_args = bpargs.parse(args, (
            'Urwid options', None, [
                Option('--twisted', '-T', action='store_true',
                       help=_('Run twisted reactor.')),
                Option('--reactor', '-r',
                       help=_('Select specific reactor (see --help-reactors). '
                       'Implies --twisted.')),
                Option('--help-reactors', action='store_true',
                       help=_('List available reactors for -r.')),
                Option('--plugin', '-p',
                       help=_('twistd plugin to run (use twistd for a list). '
                       'Use "--" to pass further options to the plugin.')),
                Option('--server', '-s', type='int',
                       help=_('Port to run an eval server on (forces Twisted).')),
                ]))

    if options.help_reactors:
        try:
            from twisted.application import reactors
            # Stolen from twisted.application.app (twistd).
            for r in reactors.getReactorTypes():
                print('    %-4s\t%s' % (r.shortName, r.description))
        except ImportError:
            sys.stderr.write('No reactors are available. Please install '
                'twisted for reactor support.\n')
        return

    palette = [
        (name, COLORMAP[color.lower()], 'default',
         'bold' if color.isupper() else 'default')
        for name, color in iteritems(config.color_scheme)]
    palette.extend([
            ('bold ' + name, color + ',bold', background, monochrome)
            for name, color, background, monochrome in palette])

    if options.server or options.plugin:
        options.twisted = True

    if options.reactor:
        try:
            from twisted.application import reactors
        except ImportError:
            sys.stderr.write('No reactors are available. Please install '
                'twisted for reactor support.\n')
            return
        try:
            # XXX why does this not just return the reactor it installed?
            reactor = reactors.installReactor(options.reactor)
            if reactor is None:
                from twisted.internet import reactor
        except reactors.NoSuchReactor:
            sys.stderr.write('Reactor %s does not exist\n' % (
                    options.reactor,))
            return
        event_loop = TwistedEventLoop(reactor)
    elif options.twisted:
        try:
            from twisted.internet import reactor
        except ImportError:
            sys.stderr.write('No reactors are available. Please install '
                'twisted for reactor support.\n')
            return
        event_loop = TwistedEventLoop(reactor)
    else:
        # None, not urwid.SelectEventLoop(), to work with
        # screens that do not support external event loops.
        event_loop = None
    # TODO: there is also a glib event loop. Do we want that one?

    # __main__ construction from bpython.cli
    if locals_ is None:
        main_mod = sys.modules['__main__'] = ModuleType('__main__')
        locals_ = main_mod.__dict__

    if options.plugin:
        try:
            from twisted import plugin
            from twisted.application import service
        except ImportError:
            sys.stderr.write('No twisted plugins are available. Please install '
                'twisted for twisted plugin support.\n')
            return

        for plug in plugin.getPlugins(service.IServiceMaker):
            if plug.tapname == options.plugin:
                break
        else:
            sys.stderr.write('Plugin %s does not exist\n' % (options.plugin,))
            return
        plugopts = plug.options()
        plugopts.parseOptions(exec_args)
        serv = plug.makeService(plugopts)
        locals_['service'] = serv
        reactor.callWhenRunning(serv.startService)
        exec_args = []
    interpreter = repl.Interpreter(locals_, locale.getpreferredencoding())
#.........这里部分代码省略.........
开发者ID:daronwolff,项目名称:CHART_IN_REAL_TIME_socketio_python,代码行数:101,代码来源:urwid.py


示例19: install

 def install(self):
     installReactor(self.options.reactor)
     import twisted.internet.reactor
     return twisted.internet.reactor
开发者ID:eswald,项目名称:parlance,代码行数:4,代码来源:reactor.py



注:本文中的twisted.application.reactors.installReactor函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python service.loadApplication函数代码示例发布时间:2022-05-27
下一篇:
Python internet.TimerService类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap