本文整理汇总了Python中twisted.internet.glib2reactor.install函数的典型用法代码示例。如果您正苦于以下问题:Python install函数的具体用法?Python install怎么用?Python install使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了install函数的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: run
def run():
try:
log.init('client.log')
context = daemonise.getContext(
pidfile = _config['client-pid-file'],
signal_map = {
signal.SIGUSR1: 'terminate',
signal.SIGHUP: configuration.reload,
},
)
_log.debug('Entering running context...')
with context:
_log.info('Configuration file: {0}'.format(configuration.filename()))
_log.info('Initializing ConSys client daemon...')
# Install GLib reactor
from twisted.internet import glib2reactor
glib2reactor.install()
from consys.common import app
from consys.client import network, persistent
app.startup()
app.dispatch_loop()
_log.info('Terminating ConSys client daemon...')
except Exception:
_log.exception('Unhandled exception in main thread, exiting')
开发者ID:neerc-linux,项目名称:ConSys,代码行数:26,代码来源:__init__.py
示例2: daemon
def daemon():
from twisted.internet import glib2reactor
glib2reactor.install()
from twisted.internet import reactor
main_app = Application("Audio Failure Monitor") #, uid, gid)
services = IServiceCollection(main_app)
options = DaemonOptions()
options.parseOptions()
application = options.getService()
application.infotub.setServiceParent(services)
application.coretub.setServiceParent(services)
app.startApplication(main_app, False)
reactor.addSystemEventTrigger('before', 'shutdown',
logging.getLogger(__name__).info,
'Stopping AFM')
logging.getLogger(__name__).info("AFM Started")
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
开发者ID:UfSoft,项目名称:afm,代码行数:26,代码来源:service.py
示例3: installDbusReactor
def installDbusReactor():
try:
from twisted.internet import glib2reactor
glib2reactor.install()
return True
except:
try:
from twisted.internet import gtk2reactor
gtk2reactor.install()
return True
except:
return False
开发者ID:braams,项目名称:shtoom,代码行数:12,代码来源:dbus.py
示例4: main
def main(port=4118, parentpid=None):
import os
os.environ['NO_GAIL'] = '1'
os.environ['NO_AT_BRIDGE'] = '1'
import twisted
gtkVersion = None
try:
from gi.repository import Gtk
gtkVersion = Gtk._version
except:
pass
if not gtkVersion or gtkVersion == '2.0':
# As per Ubuntu 11.10, twisted glib2reactor
# works with gtk2, which fails with gtk3
from twisted.internet import glib2reactor
glib2reactor.install()
elif gtkVersion >= '3.0':
try:
# Exist in Ubuntu 12.04, but not on
# Ubuntu 11.10 / Fedora 16
from twisted.internet import gtk3reactor
gtk3reactor.install()
except:
pass
from twisted.internet import reactor
from twisted.web import server, xmlrpc
from xmlrpc_daemon import XMLRPCLdtpd
import twisted.internet
import socket
import pyatspi
import traceback
_ldtp_debug = os.environ.get('LDTP_DEBUG', None)
_ldtp_debug_file = os.environ.get('LDTP_DEBUG_FILE', None)
try:
pyatspi.setCacheLevel(pyatspi.CACHE_PROPERTIES)
r = XMLRPCLdtpd()
xmlrpc.addIntrospection(r)
if parentpid:
reactor.callWhenRunning(SignalParent(parentpid).send_later)
reactor.listenTCP(port, server.Site(r))
reactor.run()
except twisted.internet.error.CannotListenError:
if _ldtp_debug:
print(traceback.format_exc())
except socket.error:
if _ldtp_debug:
print(traceback.format_exc())
if _ldtp_debug_file:
with open(_ldtp_debug_file, "a") as fp:
fp.write(traceback.format_exc())
开发者ID:IsSuEat,项目名称:ldtp2,代码行数:53,代码来源:__init__.py
示例5: __init__
def __init__(self, mcserver):
from twisted.internet import glib2reactor
glib2reactor.install()
from twisted.internet import reactor
from twisted.web import xmlrpc, server
class Test(xmlrpc.XMLRPC):
server = mcserver
def xmlrpc_list(self):
return self.server.list()
t = Test()
config = mcserver.get_config(self)
reactor.listenTCP(config['port'], server.Site(t))
开发者ID:tylerwhall,项目名称:Minecraft-server-wrapper,代码行数:12,代码来源:server.py
示例6: avahiClient
import dbus.glib
except ImportError:
dbus = None
if dbus:
try:
import avahi
except ImportError:
avahi = None
else:
avahi = None
from twisted.internet import defer, threads
from twisted.internet import glib2reactor
import logging
glib2reactor.install()
class avahiClient():
def __init__(self, type):
self._callbacks = {'new-service': [], 'remove-service': [] }
# initialize dbus stuff needed for discovery
self.bus = dbus.SystemBus()
avahi_bus = self.bus.get_object(avahi.DBUS_NAME, avahi.DBUS_PATH_SERVER)
self.server = dbus.Interface(avahi_bus, avahi.DBUS_INTERFACE_SERVER)
#stype = '_workstation._tcp'
#stype = '_controlaula._tcp'
开发者ID:Chisco77,项目名称:controlies,代码行数:30,代码来源:avahiClient.py
示例7:
import os
import random
import shutil
import signal
import subprocess
import time
from optparse import OptionParser
import dbus
import dbus.mainloop.glib # this is black magic. DO NOT REMOVE!
from distutils.spawn import find_executable
from twisted.internet import glib2reactor
glib2reactor.install() # NOQA: before any reactor import
from django.conf import settings
from utilities import utils, dev_launcher
from twisted.internet import reactor, defer
from ubuntuone.platform.linux import tools
from magicicada.filesync import services
from magicicada.server.integration.helpers import debug, retryable
# to make dbus work
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
LIB_DIR = os.path.abspath("lib")
# this should be done manually before:
开发者ID:magicicada-bot,项目名称:magicicada-server,代码行数:31,代码来源:run_integtests.py
示例8: ClientProtocol
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import os, sys
if __name__ == '__main__':
from twisted.internet import glib2reactor as reactor
reactor.install()
from twisted.internet import defer, reactor
from twisted.internet.protocol import Protocol, ClientFactory
import time, datetime
from gzip import GzipFile
from StringIO import StringIO
import gobject
import gc
#
from util import debug
DEBUG = 2
class ClientProtocol(Protocol):
def __init__(self):
self._recv_data = ''
self._header = ''
self.content_encoding = ''
self.content_size = 0
def connectionMade(self):
debug(DEBUG+1, '%s connectionMade with: %s', self, self.transport.getPeer())
#self.transport.setTcpKeepAlive(1)
self.factory.connectionMade(self.transport.getPeer().host)
开发者ID:hosle,项目名称:tapas,代码行数:30,代码来源:connection.py
示例9: __init__
#!/usr/bin/python
from twisted.internet import glib2reactor #We need this reactor to let dbus and twisted play nice together.
glib2reactor.install() #Turn glib2reactor into the reactor.
from twisted.internet import reactor #Now import reactor, being glib2reactor.
from twisted.web import http
from twisted.web.static import File
import os
import json
import sys
import jinja2
import dbus
import gobject
from dbus.mainloop.glib import DBusGMainLoop
import daemon
import syslog
class DynamicRouterConfig:
def __init__(self,conffile):
infile=open(conffile,"r")
self.config=json.load(infile)
infile.close()
def getGroupName(self,host):
for clientnet in self.config["devices"]["clients"]:
if clientnet["ip"] == host:
return clientnet["groupname"]
return None
def clientips(self):
for clientnet in self.config["devices"]["clients"]:
yield clientnet["ip"]
def getGatewayList(self,host):
groupname=self.getGroupName(host)
开发者ID:DNPA,项目名称:dynr-web,代码行数:31,代码来源:dynr-web.py
注:本文中的twisted.internet.glib2reactor.install函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论