本文整理汇总了Python中twisted.application.reactors.installReactor函数的典型用法代码示例。如果您正苦于以下问题:Python installReactor函数的具体用法?Python installReactor怎么用?Python installReactor使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了installReactor函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: runApp
def runApp(config):
loadConfigurations(config, settings)
if not sys.modules.has_key('twisted.internet.reactor'):
from twisted.application.reactors import installReactor
installReactor(settings.reactor)
ApplicationRunner(config).run()
开发者ID:oscar810429,项目名称:pndfs,代码行数:7,代码来源:run.py
示例2: opt_reactor
def opt_reactor(self, shortName):
"""
Which reactor to use (see --help-reactors for a list of possibilities)
"""
# Actually actually actually install the reactor right at this very
# moment, before any other code (for example, a sub-command plugin)
# runs and accidentally imports and installs the default reactor.
#
# This could probably be improved somehow.
installReactor(shortName)
开发者ID:andrewbird,项目名称:vodafone-mobile-connect,代码行数:10,代码来源:app.py
示例3: install_optimal_reactor
def install_optimal_reactor():
"""
Try to install the optimal Twisted reactor for platform.
"""
import sys
if 'bsd' in sys.platform or sys.platform.startswith('darwin'):
try:
v = sys.version_info
if v[0] == 1 or (v[0] == 2 and v[1] < 6) or (v[0] == 2 and v[1] == 6 and v[2] < 5):
raise Exception("Python version too old (%s)" % sys.version)
from twisted.internet import kqreactor
kqreactor.install()
except Exception as e:
print("""
WARNING: Running on BSD or Darwin, but cannot use kqueue Twisted reactor.
=> %s
To use the kqueue Twisted reactor, you will need:
1. Python >= 2.6.5 or PyPy > 1.8
2. Twisted > 12.0
Note the use of >= and >.
Will let Twisted choose a default reactor (potential performance degradation).
""" % str(e))
pass
if sys.platform in ['win32']:
try:
from twisted.application.reactors import installReactor
installReactor("iocp")
except Exception as e:
print("""
WARNING: Running on Windows, but cannot use IOCP Twisted reactor.
=> %s
Will let Twisted choose a default reactor (potential performance degradation).
""" % str(e))
if sys.platform.startswith('linux'):
try:
from twisted.internet import epollreactor
epollreactor.install()
except Exception as e:
print("""
WARNING: Running on Linux, but cannot use Epoll Twisted reactor.
=> %s
Will let Twisted choose a default reactor (potential performance degradation).
""" % str(e))
开发者ID:luxorv,项目名称:Chat-GBH,代码行数:55,代码来源:choosereactor.py
示例4: install_reactor
def install_reactor(explicit_reactor=None, verbose=False, log=None, require_optimal_reactor=True):
"""
Install Twisted reactor.
:param explicit_reactor: If provided, install this reactor. Else, install
the optimal reactor.
:type explicit_reactor: obj
:param verbose: If ``True``, log (at level "info") the reactor that is
in place afterwards.
:type verbose: bool
:param log: Explicit logging to this txaio logger object.
:type log: obj
:param require_optimal_reactor: If ``True`` and the desired reactor could not be installed,
raise ``ReactorAlreadyInstalledError``, else fallback to another reactor.
:type require_optimal_reactor: bool
:returns: The Twisted reactor in place (`twisted.internet.reactor`).
"""
if not log:
log = txaio.make_logger()
if explicit_reactor:
# install explicitly given reactor
#
from twisted.application.reactors import installReactor
if verbose:
log.info('Trying to install explicitly specified Twisted reactor "{reactor}" ..', reactor=explicit_reactor)
try:
installReactor(explicit_reactor)
except:
log.failure('Could not install Twisted reactor {reactor}\n{log_failure.value}',
reactor=explicit_reactor)
sys.exit(1)
else:
# automatically choose optimal reactor
#
if verbose:
log.info('Automatically choosing optimal Twisted reactor ..')
install_optimal_reactor(require_optimal_reactor)
# now the reactor is installed, import it
from twisted.internet import reactor
txaio.config.loop = reactor
if verbose:
from twisted.python.reflect import qual
log.info('Running on Twisted reactor {reactor}', reactor=qual(reactor.__class__))
return reactor
开发者ID:crossbario,项目名称:autobahn-python,代码行数:52,代码来源:choosereactor.py
示例5: main
def main():
"""
Try to install the reactor named C{qt}. Expect it to not work. Print
diagnostics to stdout if something goes wrong, print nothing otherwise.
"""
sys.meta_path.insert(0, QTNotImporter())
try:
reactors.installReactor('qt')
except reactors.NoSuchReactor, e:
if e.args != ('qt',):
print 'Wrong arguments to NoSuchReactor:', e.args
else:
# Do nothing to indicate success.
pass
开发者ID:Almad,项目名称:twisted,代码行数:14,代码来源:app_qtstub.py
示例6: test_installReactor
def test_installReactor(self):
"""
Test that the L{reactors.installReactor} function correctly installs
the specified reactor.
"""
installed = []
def install():
installed.append(True)
name = 'fakereactortest'
package = __name__
description = 'description'
self.pluginResults = [FakeReactor(install, name, package, description)]
reactors.installReactor(name)
self.assertEqual(installed, [True])
开发者ID:Berimor66,项目名称:mythbox,代码行数:14,代码来源:test_application.py
示例7: install_reactor
def install_reactor(_reactor = None, verbose = False):
"""
Install Twisted reactor. This is used from CLI.
"""
import sys
if _reactor:
## install explicitly given reactor
##
from twisted.application.reactors import installReactor
print "Trying to install explicitly specified Twisted reactor '%s'" % _reactor
try:
installReactor(_reactor)
except Exception, e:
print "Could not install Twisted reactor %s%s" % (_reactor, ' ["%s"]' % e if verbose else '')
sys.exit(1)
开发者ID:ajjoshi,项目名称:AutobahnPython,代码行数:16,代码来源:choosereactor.py
示例8: test_installReactorMultiplePlugins
def test_installReactorMultiplePlugins(self):
"""
Test that the L{reactors.installReactor} function correctly installs
the specified reactor when there are multiple reactor plugins.
"""
installed = []
def install():
installed.append(True)
name = 'fakereactortest'
package = __name__
description = 'description'
fakeReactor = FakeReactor(install, name, package, description)
otherReactor = FakeReactor(lambda: None,
"otherreactor", package, description)
self.pluginResults = [otherReactor, fakeReactor]
reactors.installReactor(name)
self.assertEqual(installed, [True])
开发者ID:marcelpetersen,项目名称:localnews,代码行数:17,代码来源:test_application.py
示例9: install_reactor
def install_reactor(explicit_reactor=None, verbose=False):
"""
Install Twisted reactor.
:param explicit_reactor: If provided, install this reactor. Else, install
the optimal reactor.
:type explicit_reactor: obj
:param verbose: If ``True``, print what happens.
:type verbose: bool
"""
import sys
import txaio
txaio.use_twisted() # just to be sure...
log = make_logger()
if explicit_reactor:
# install explicitly given reactor
##
from twisted.application.reactors import installReactor
log.info("Trying to install explicitly specified Twisted reactor '{reactor}'", reactor=explicit_reactor)
try:
installReactor(explicit_reactor)
except:
log.failure("Could not install Twisted reactor {reactor}\n{log_failure.value}",
reactor=explicit_reactor)
sys.exit(1)
else:
# automatically choose optimal reactor
##
log.debug("Automatically choosing optimal Twisted reactor")
install_optimal_reactor(verbose)
# now the reactor is installed, import it
from twisted.internet import reactor
txaio.config.loop = reactor
if verbose:
from twisted.python.reflect import qual
log.debug("Running Twisted reactor {reactor}", reactor=qual(reactor.__class__))
return reactor
开发者ID:honeyflyfish,项目名称:autobahn-python,代码行数:42,代码来源:choosereactor.py
示例10: opt_reactor
def opt_reactor(self, shortName):
"""
Which reactor to use (see --help-reactors for a list of possibilities)
"""
# Actually actually actually install the reactor right at this very
# moment, before any other code (for example, a sub-command plugin)
# runs and accidentally imports and installs the default reactor.
#
# This could probably be improved somehow.
try:
installReactor(shortName)
except NoSuchReactor:
msg = ("The specified reactor does not exist: '%s'.\n"
"See the list of available reactors with "
"--help-reactors" % (shortName,))
raise usage.UsageError(msg)
except Exception, e:
msg = ("The specified reactor cannot be used, failed with error: "
"%s.\nSee the list of available reactors with "
"--help-reactors" % (e,))
raise usage.UsageError(msg)
开发者ID:antong,项目名称:twisted,代码行数:21,代码来源:app.py
示例11: install_reactor
def install_reactor(explicitReactor=None, verbose=False):
"""
Install Twisted reactor.
:param explicitReactor: If provided, install this reactor. Else, install optimal reactor.
:type explicitReactor: obj
:param verbose: If ``True``, print what happens.
:type verbose: bool
"""
import sys
import txaio
txaio.use_twisted() # just to be sure...
if explicitReactor:
# install explicitly given reactor
##
from twisted.application.reactors import installReactor
print("Trying to install explicitly specified Twisted reactor '%s'" % explicitReactor)
try:
installReactor(explicitReactor)
except Exception as e:
print("Could not install Twisted reactor %s%s" % (explicitReactor, ' ["%s"]' % e if verbose else ''))
sys.exit(1)
else:
# automatically choose optimal reactor
##
if verbose:
print("Automatically choosing optimal Twisted reactor")
install_optimal_reactor(verbose)
# now the reactor is installed, import it
from twisted.internet import reactor
txaio.config.loop = reactor
if verbose:
from twisted.python.reflect import qual
print("Running Twisted reactor %s" % qual(reactor.__class__))
return reactor
开发者ID:Avnerus,项目名称:AutobahnPython,代码行数:39,代码来源:choosereactor.py
示例12: test_installReactorReturnsReactor
def test_installReactorReturnsReactor(self):
"""
Test that the L{reactors.installReactor} function correctly returns
the installed reactor.
"""
reactor = object()
def install():
from twisted import internet
self.patch(internet, 'reactor', reactor)
name = 'fakereactortest'
package = __name__
description = 'description'
self.pluginResults = [FakeReactor(install, name, package, description)]
installed = reactors.installReactor(name)
self.assertIdentical(installed, reactor)
开发者ID:marcelpetersen,项目名称:localnews,代码行数:15,代码来源:test_application.py
示例13: testReactor
import sys
from twisted.application import reactors
reactors.installReactor('qt4')
from twisted.internet import reactor, task
from twisted.python import log
log.startLogging(sys.stdout)
def testReactor():
print 'tick...'
def doit():
task.LoopingCall(testReactor).start(1.0)
reactor.callLater(15.0,reactor.stop)
reactor.callWhenRunning(doit)
log.msg('calling reactor.run()')
reactor.run()
log.msg('fell off the bottom?...')
#sys.exit(app.exec_())
开发者ID:D3f0,项目名称:txscada,代码行数:22,代码来源:trivialscript.py
示例14: Copyright
__author__ = 'ght'
from twisted.application import reactors
reactors.installReactor('kqueue')
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test running processes.
"""
import gzip
import os
import sys
import signal
import StringIO
import errno
import gc
import stat
import operator
try:
import fcntl
except ImportError:
fcntl = process = None
else:
from twisted.internet import process
开发者ID:brianbirke,项目名称:qtreactor,代码行数:29,代码来源:FDtest.py
示例15: ConfigParser
config = ConfigParser(args.configfile)
except ConfigurationError as e:
parser.error(e)
# emulate twisted configuration:
twisted_config = DefaultDict()
twisted_config['nodaemon'] = not (args.daemon or config.daemon or False)
twisted_config['no_save'] = True
twisted_config['originalname'] = 'openvpn2dns'
twisted_config['prefix'] = 'openvpn2dns'
twisted_config['rundir'] = '.'
if (args.drop or config.drop) is True:
twisted_config['uid'] = args.user or config.user
if not twisted_config['uid']:
parser.error('Need user (and group) information to drop privileges')
twisted_config['gid'] = args.group or config.group
if not twisted_config['gid']:
parser.error('Need group information to drop privileges')
if (args.log or config.log) == 'syslog':
twisted_config['syslog'] = True
elif args.log or config.log:
twisted_config['log'] = args.log or config.log
twisted_config['pidfile'] = args.pidfile or config.pidfile
if args.reactor or config.reactor:
installReactor(args.reactor or config.reactor)
# run appliation:
OpenVpn2DnsApplication(config, twisted_config).run()
开发者ID:algby,项目名称:openvpn2dns,代码行数:30,代码来源:launch.py
示例16: installReactor
import sys, random, time, pprint
if "bsd" in sys.platform or sys.platform.startswith("darwin"):
from twisted.internet import kqreactor
kqreactor.install()
elif sys.platform in ["win32"]:
from twisted.application.reactors import installReactor
installReactor("iocp")
elif sys.platform.startswith("linux"):
from twisted.internet import epollreactor
epollreactor.install()
from twisted.internet import reactor
from twisted.python import log
from twisted.internet.defer import Deferred, DeferredList, gatherResults, returnValue, inlineCallbacks
from twisted.internet.threads import deferToThread
def newid():
return "".join(
[random.choice("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_") for i in xrange(16)]
)
if sys.platform.startswith("win"):
rtime = time.clock
开发者ID:luodaobin,项目名称:scratchbox,代码行数:31,代码来源:test.py
示例17: Copyright
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for lots of functionality provided by L{twisted.internet}.
"""
import os
import sys
import time
from twisted.application import reactors
reactors.installReactor("qt4")
from twisted.python.compat import _PY3
from twisted.trial import unittest
from twisted.internet import reactor, protocol, error, abstract, defer
from twisted.internet import interfaces, base
try:
from twisted.internet import ssl
except ImportError:
ssl = None
if ssl and not ssl.supported:
ssl = None
from twisted.internet.defer import Deferred, maybeDeferred
from twisted.python import runtime
开发者ID:yucheng1992,项目名称:Heuristic-problem-solving,代码行数:30,代码来源:test_internet.py
示例18: main
def main(args=None, locals_=None, banner=None):
translations.init()
# TODO: maybe support displays other than raw_display?
config, options, exec_args = bpargs.parse(args, (
'Urwid options', None, [
Option('--twisted', '-T', action='store_true',
help=_('Run twisted reactor.')),
Option('--reactor', '-r',
help=_('Select specific reactor (see --help-reactors). '
'Implies --twisted.')),
Option('--help-reactors', action='store_true',
help=_('List available reactors for -r.')),
Option('--plugin', '-p',
help=_('twistd plugin to run (use twistd for a list). '
'Use "--" to pass further options to the plugin.')),
Option('--server', '-s', type='int',
help=_('Port to run an eval server on (forces Twisted).')),
]))
if options.help_reactors:
try:
from twisted.application import reactors
# Stolen from twisted.application.app (twistd).
for r in reactors.getReactorTypes():
print(' %-4s\t%s' % (r.shortName, r.description))
except ImportError:
sys.stderr.write('No reactors are available. Please install '
'twisted for reactor support.\n')
return
palette = [
(name, COLORMAP[color.lower()], 'default',
'bold' if color.isupper() else 'default')
for name, color in iteritems(config.color_scheme)]
palette.extend([
('bold ' + name, color + ',bold', background, monochrome)
for name, color, background, monochrome in palette])
if options.server or options.plugin:
options.twisted = True
if options.reactor:
try:
from twisted.application import reactors
except ImportError:
sys.stderr.write('No reactors are available. Please install '
'twisted for reactor support.\n')
return
try:
# XXX why does this not just return the reactor it installed?
reactor = reactors.installReactor(options.reactor)
if reactor is None:
from twisted.internet import reactor
except reactors.NoSuchReactor:
sys.stderr.write('Reactor %s does not exist\n' % (
options.reactor,))
return
event_loop = TwistedEventLoop(reactor)
elif options.twisted:
try:
from twisted.internet import reactor
except ImportError:
sys.stderr.write('No reactors are available. Please install '
'twisted for reactor support.\n')
return
event_loop = TwistedEventLoop(reactor)
else:
# None, not urwid.SelectEventLoop(), to work with
# screens that do not support external event loops.
event_loop = None
# TODO: there is also a glib event loop. Do we want that one?
# __main__ construction from bpython.cli
if locals_ is None:
main_mod = sys.modules['__main__'] = ModuleType('__main__')
locals_ = main_mod.__dict__
if options.plugin:
try:
from twisted import plugin
from twisted.application import service
except ImportError:
sys.stderr.write('No twisted plugins are available. Please install '
'twisted for twisted plugin support.\n')
return
for plug in plugin.getPlugins(service.IServiceMaker):
if plug.tapname == options.plugin:
break
else:
sys.stderr.write('Plugin %s does not exist\n' % (options.plugin,))
return
plugopts = plug.options()
plugopts.parseOptions(exec_args)
serv = plug.makeService(plugopts)
locals_['service'] = serv
reactor.callWhenRunning(serv.startService)
exec_args = []
interpreter = repl.Interpreter(locals_, locale.getpreferredencoding())
#.........这里部分代码省略.........
开发者ID:daronwolff,项目名称:CHART_IN_REAL_TIME_socketio_python,代码行数:101,代码来源:urwid.py
示例19: install
def install(self):
installReactor(self.options.reactor)
import twisted.internet.reactor
return twisted.internet.reactor
开发者ID:eswald,项目名称:parlance,代码行数:4,代码来源:reactor.py
注:本文中的twisted.application.reactors.installReactor函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论