本文整理汇总了Python中twisted.logger.globalLogPublisher.addObserver函数的典型用法代码示例。如果您正苦于以下问题:Python addObserver函数的具体用法?Python addObserver怎么用?Python addObserver使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了addObserver函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: init
def init(outFile):
level = levels[config.LOG_LEVEL]
predicate = LogLevelFilterPredicate(defaultLogLevel=level)
observer = FilteringLogObserver(textFileLogObserver(outFile=outFile), [predicate])
observer._encoding = "utf-8"
globalLogPublisher.addObserver(observer)
log.info("Start logging with {l}", l=level)
开发者ID:melnikk,项目名称:worker,代码行数:7,代码来源:logs.py
示例2: test_verbose_logging
def test_verbose_logging(self):
"""
If verbose logging is turned on, the full request and response is
logged.
"""
self.patch(helpers, 'get_site', partial(get_site, logging=True))
logged_events = []
addObserver(logged_events.append)
self.addCleanup(removeObserver, logged_events.append)
response, url = self.make_request_to_site()
self.assertEqual(2, len(logged_events))
self.assertTrue(all([not event['isError'] for event in logged_events]))
messages = [get_log_message(event) for event in logged_events]
request_match = re.compile(
"^Received request: GET (?P<url>.+)\n"
"Headers: (?P<headers>\{.+\})\n\s*$"
).match(messages[0])
self.assertNotEqual(None, request_match)
self.assertEqual(url, request_match.group('url'))
headers = json.loads(request_match.group('headers'))
self.assertEqual(['two'], headers.get('One'))
response_match = re.compile(
"^Responding with 200 for: GET (?P<url>.+)\n"
"Headers: (?P<headers>\{.+\})\n"
"\nresponse\!\n\s*$"
).match(messages[1])
self.assertNotEqual(None, response_match)
self.assertEqual(url, response_match.group('url'))
headers = json.loads(response_match.group('headers'))
self.assertEqual(['application/json'], headers.get('Content-Type'))
开发者ID:derwolfe,项目名称:mimic,代码行数:34,代码来源:test_resource.py
示例3: start_upload_server
def start_upload_server():
import argparse
from twisted.internet import reactor
from twisted.logger import Logger, globalLogPublisher, STDLibLogObserver
from twisted.web.server import Site
from twisted.web.resource import Resource
from cheesepi.server.upload import UploadHandler
# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=18090,
help='Port to listen on')
args = parser.parse_args()
init_logging()
# Make twisted logging write to pythons logging module
globalLogPublisher.addObserver(STDLibLogObserver(name="cheesepi.server.upload"))
# Use twisted logger when in twisted
log = Logger()
root = Resource()
root.putChild("upload", UploadHandler())
upload_server = Site(root)
reactor.listenTCP(args.port, upload_server)
log.info("Starting upload server on port %d..." % args.port)
reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:31,代码来源:utils.py
示例4: start_control_server
def start_control_server():
import argparse
from twisted.internet import reactor
from twisted.logger import Logger, globalLogPublisher, STDLibLogObserver
from cheesepi.server.control import (CheeseRPCServerFactory,
CheeseRPCServer)
from cheesepi.server.storage.mongo import MongoDAO
# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=18080,
help='Port to listen on')
args = parser.parse_args()
init_logging()
# Make twisted logging write to pythons logging module
globalLogPublisher.addObserver(STDLibLogObserver(name="cheesepi.server.control"))
# Use twisted logger when in twisted
log = Logger()
# Logging
#log = Logger()
#globalLogPublisher.addObserver(PrintingObserver())
#dao = MongoDAO()
dao = MongoDAO('localhost', 27017)
control_server = CheeseRPCServer(dao).getStreamFactory(CheeseRPCServerFactory)
reactor.listenTCP(args.port, control_server)
log.info("Starting control server on port %d..." % args.port)
reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:35,代码来源:utils.py
示例5: startService
def startService(self):
self.stdlib_cleanup = stdlib_logging_to_eliot_configuration(getLogger())
self.twisted_observer = TwistedLoggerToEliotObserver()
globalLogPublisher.addObserver(self.twisted_observer)
for dest in self.destinations:
add_destination(dest)
开发者ID:LeastAuthority,项目名称:leastauthority.com,代码行数:7,代码来源:eliot_destination.py
示例6: noiseControl
def noiseControl(options):
# terminal noise/info logic
# allows the specification of the log file location
if not options["loud"]:
log_path = options["log"]
globalLogPublisher.addObserver(hendrixObserver(log_path))
return None
开发者ID:pombredanne,项目名称:hendrix,代码行数:7,代码来源:ux.py
示例7: __init__
def __init__(self, reactor, config_filename):
self._network = None
self._proc = None
self._reactor = reactor
self._config_filename = config_filename
self.connections = dict()
with open(config_filename) as f:
self.config = json.load(f)
f = open(self.core_config["log_file"], "a")
globalLogPublisher.addObserver(textFileLogObserver(f))
self.api = ApiProxy(self._reactor)
self.server_factory = pb.PBServerFactory(self.api)
开发者ID:nanonyme,项目名称:nanobot,代码行数:12,代码来源:nanobot.py
示例8: init_logging
def init_logging(log_level):
"""
Initialise the logging by adding an observer to the global log publisher.
:param str log_level: The minimum log level to log messages for.
"""
log_level_filter = LogLevelFilterPredicate(
LogLevel.levelWithName(log_level))
log_level_filter.setLogLevelForNamespace(
'twisted.web.client._HTTP11ClientFactory', LogLevel.warn)
log_observer = FilteringLogObserver(
textFileLogObserver(sys.stdout), [log_level_filter])
globalLogPublisher.addObserver(log_observer)
开发者ID:praekeltfoundation,项目名称:marathon-acme,代码行数:13,代码来源:cli.py
示例9: getLogger
def getLogger(level):
loglevel = getattr(LogLevel, level)
filter_ = LogLevelFilterPredicate(defaultLogLevel=loglevel)
if loglevel > LogLevel.debug:
filter_.setLogLevelForNamespace('stdout', LogLevel.warn)
observer = FilteringLogObserver(stdoutFileLogObserver(), [filter_])
# observer = FilteringLogObserver(globalLogPublisher, [filter])
# log = Logger()
# globalLogBeginner.beginLoggingTo([observer])
globalLogPublisher.addObserver(observer)
return lambda event: None
开发者ID:bverdu,项目名称:onDemand,代码行数:13,代码来源:logger.py
示例10: startService
def startService(self):
super(SpreadFlowService, self).startService()
if self.options['confpath']:
confpath = self.options['confpath']
else:
confpath = os.path.join(os.getcwd(), 'spreadflow.conf')
stream = config_eval(confpath)
pipeline = list()
pipeline.append(AliasResolverPass())
pipeline.append(PortsValidatorPass())
if self.options['multiprocess']:
pipeline.append(PartitionExpanderPass())
pipeline.append(PartitionBoundsPass())
if self.options['partition']:
pipeline.append(PartitionWorkerPass())
partition = self.options['partition']
stream.append(AddTokenOp(PartitionSelectToken(partition)))
else:
pipeline.append(PartitionControllersPass())
pipeline.append(ComponentsPurgePass())
pipeline.append(EventHandlersPass())
for compiler_step in pipeline:
stream = compiler_step(stream)
self._eventdispatcher = EventDispatcher()
if self.options['oneshot']:
self._eventdispatcher.add_listener(JobEvent, 0, self._oneshot_job_event_handler)
connection_parser = ConnectionParser()
stream = connection_parser.extract(stream)
self._scheduler = Scheduler(connection_parser.get_portmap(), self._eventdispatcher)
event_handler_parser = EventHandlerParser()
stream = event_handler_parser.extract(stream)
for event_type, priority, callback in event_handler_parser.get_handlers():
self._eventdispatcher.add_listener(event_type, priority, callback)
if self.options['queuestatus']:
statuslog = SpreadFlowQueuestatusLogger(self.options['queuestatus'])
statuslog.watch(1, self._scheduler)
globalLogPublisher.addObserver(statuslog.logstatus)
self._scheduler.run().addBoth(self._stop)
开发者ID:spreadflow,项目名称:spreadflow-core,代码行数:50,代码来源:service.py
示例11: test_doStartLoggingStatement
def test_doStartLoggingStatement(self):
"""
L{Factory.doStart} logs that it is starting a factory, followed by
the L{repr} of the L{Factory} instance that is being started.
"""
events = []
globalLogPublisher.addObserver(events.append)
self.addCleanup(
lambda: globalLogPublisher.removeObserver(events.append))
f = Factory()
f.doStart()
self.assertIs(events[0]['factory'], f)
self.assertEqual(events[0]['log_level'], LogLevel.info)
self.assertEqual(events[0]['log_format'],
'Starting factory {factory!r}')
开发者ID:Architektor,项目名称:PySnip,代码行数:17,代码来源:test_protocol.py
示例12: redirect_to_twisted
def redirect_to_twisted(self):
"""
Redirect Eliot logs to Twisted.
@return: L{list} of L{dict} - the log messages written to Twisted will
eventually be appended to this list.
"""
written = []
def got_event(event):
if event.get("log_namespace") == "eliot":
written.append((event["log_level"].name, event["eliot"]))
globalLogPublisher.addObserver(got_event)
self.addCleanup(globalLogPublisher.removeObserver, got_event)
destination = TwistedDestination()
addDestination(destination)
self.addCleanup(removeDestination, destination)
return written
开发者ID:ClusterHQ,项目名称:eliot,代码行数:19,代码来源:test_twisted.py
示例13: importFailureObserver
optParameters = []
optFlags = [
['help', 'h', 'Display this help and exit.']
]
@provider(ILogObserver)
def importFailureObserver(event):
if 'failure' in event and event['failure'].type is ImportError:
log.err("ERROR: %s. Please run `pip install -U -r requirements.txt` "
"from Cowrie's install directory and virtualenv to install "
"the new dependency" % event['failure'].value.message)
globalLogPublisher.addObserver(importFailureObserver)
@implementer(IServiceMaker, IPlugin)
class CowrieServiceMaker(object):
tapname = "cowrie"
description = "She sells sea shells by the sea shore."
options = Options
output_plugins = None
def makeService(self, options):
"""
Construct a TCPServer from a factory defined in Cowrie.
"""
if options["help"] is True:
开发者ID:micheloosterhof,项目名称:cowrie,代码行数:31,代码来源:cowrie_plugin.py
示例14: startService
def startService(self):
self.stdlib_cleanup = _stdlib_logging_to_eliot_configuration(getLogger())
self.twisted_observer = _TwistedLoggerToEliotObserver()
globalLogPublisher.addObserver(self.twisted_observer)
add_destinations(*self.destinations)
return Service.startService(self)
开发者ID:tahoe-lafs,项目名称:tahoe-lafs,代码行数:6,代码来源:eliotutil.py
示例15: run
def run():
"""
Entry point into (native) worker processes. This wires up stuff such that
a worker instance is talking WAMP-over-stdio to the node controller.
"""
import os
import sys
import platform
import signal
# Ignore SIGINT so we get consistent behavior on control-C versus
# sending SIGINT to the controller process. When the controller is
# shutting down, it sends TERM to all its children but ctrl-C
# handling will send a SIGINT to all the processes in the group
# (so then the controller sends a TERM but the child already or
# will very shortly get a SIGINT as well). Twisted installs signal
# handlers, but not for SIGINT if there's already a custom one
# present.
def ignore(sig, frame):
log.debug("Ignoring SIGINT in worker.")
signal.signal(signal.SIGINT, ignore)
# create the top-level parser
#
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--reactor',
default=None,
choices=['select', 'poll', 'epoll', 'kqueue', 'iocp'],
help='Explicit Twisted reactor selection (optional).')
parser.add_argument('--loglevel',
default="info",
choices=['none', 'error', 'warn', 'info', 'debug', 'trace'],
help='Initial log level.')
parser.add_argument('-c',
'--cbdir',
type=str,
help="Crossbar.io node directory (required).")
parser.add_argument('-n',
'--node',
type=str,
help='Crossbar.io node ID (required).')
parser.add_argument('-w',
'--worker',
type=str,
help='Crossbar.io worker ID (required).')
parser.add_argument('-r',
'--realm',
type=str,
help='Crossbar.io node (management) realm (required).')
parser.add_argument('-t',
'--type',
choices=['router', 'container'],
help='Worker type (required).')
parser.add_argument('--title',
type=str,
default=None,
help='Worker process title to set (optional).')
options = parser.parse_args()
# make sure logging to something else than stdio is setup _first_
#
from crossbar._logging import make_JSON_observer, cb_logging_aware, _stderr
from crossbar._logging import make_logger, start_logging, set_global_log_level
from twisted.logger import globalLogPublisher as log_publisher
# Set the global log level
set_global_log_level(options.loglevel)
log = make_logger()
# Print a magic phrase that tells the capturing logger that it supports
# Crossbar's rich logging
print(cb_logging_aware, file=_stderr)
_stderr.flush()
flo = make_JSON_observer(_stderr)
log_publisher.addObserver(flo)
start_logging()
try:
import setproctitle
except ImportError:
log.debug("Could not set worker process title (setproctitle not installed)")
else:
# set process title if requested to
#
if options.title:
setproctitle.setproctitle(options.title)
else:
#.........这里部分代码省略.........
开发者ID:gluegl,项目名称:crossbar,代码行数:101,代码来源:process.py
示例16: __enter__
def __enter__(self):
set_global_log_level(self.desired_level)
globalLogPublisher.addObserver(self.logs.append)
return self
开发者ID:RonaldChristopher,项目名称:crossbar,代码行数:4,代码来源:_logging.py
示例17:
import io
from twisted.logger import jsonFileLogObserver, globalLogPublisher
from ad_hoc import AdHoc
globalLogPublisher.addObserver(jsonFileLogObserver(io.open("log.json", "a")))
AdHoc(3, 4).logMessage()
开发者ID:vmarkovtsev,项目名称:twisted,代码行数:7,代码来源:ad_hoc_save.py
示例18: apply_logging
#.........这里部分代码省略.........
_logger = loggers.get(record.name, None)
if _logger is None:
_logger = loggers[record.name] = Logger(record.name)
t = self.format(record)
if six.PY2 and isinstance(t, str):
# Libraries outside of our control can throw exceptions with
# a "str with known encoding", which can cause issues down
# the logging pipeline. Her
t = t.decode('utf8', errors='replace')
_logger.emit(LOGLEVEL_TWISTED_MAP[record.levelno], log_text=t)
if self.logger_dest is not None:
from twisted.python.logfile import DailyLogFile
class DailyLogWithLeadingZero(DailyLogFile):
def suffix(self, tupledate):
# this closely imitates the same function from parent class
try:
return '-'.join(("%02d" % i for i in tupledate))
except:
# try taking a float unixtime
return '-'.join(("%02d" % i for i in
self.toDate(tupledate)))
self.logger_dest = abspath(self.logger_dest)
if access(dirname(self.logger_dest), os.R_OK | os.W_OK):
log_dest = DailyLogWithLeadingZero \
.fromFullPath(self.logger_dest)
else:
Logger().warn("%r is not accessible. We need rwx on it to "
"rotate logs." % dirname(self.logger_dest))
log_dest = open(self.logger_dest, 'w+')
formatter = logging.Formatter(self.LOGGING_PROD_FORMAT)
else:
formatter = logging.Formatter(self.LOGGING_DEVEL_FORMAT)
log_dest = sys.stdout
try:
import colorama
colorama.init()
logger.debug("colorama loaded.")
except Exception as e:
logger.debug("coloarama not loaded: %r" % e)
def record_as_string(record):
if 'log_failure' in record:
failure = record['log_failure']
try:
s = pformat(vars(failure.value))
except TypeError:
# vars() argument must have __dict__ attribute
s = repr(failure.value)
return "%s: %s" % (failure.type, s)
if 'log_text' in record:
return record['log_text'] + "\n"
if 'log_format' in record:
level = record.get('log_level', LogLevel.debug)
level = LOGLEVEL_MAP_ABB[TWISTED_LOGLEVEL_MAP[level]]
text = record['log_format'].format(**record) + "\n"
ns = record.get('log_namespace', "???")
lineno = 0
record = logging.LogRecord('?', level, ns, lineno, text,
None, None)
record.l = level
record.module = ns.split('.')[-2]
return formatter.format(record)
if 'log_io' in record:
return record['log_io'] + "\n"
if 'message' in record:
return record['message'] + "\n"
return pformat(record)
observer = FileLogObserver(log_dest, record_as_string)
globalLogPublisher.addObserver(observer)
self._clear_other_observers(globalLogPublisher, observer)
handler = TwistedHandler()
handler.setFormatter(formatter)
logging.getLogger().addHandler(handler)
self.pre_logging_apply()
for l in self._loggers or []:
l.apply()
return self
开发者ID:cemrecan,项目名称:neurons,代码行数:101,代码来源:config.py
示例19: Logger
from __future__ import print_function, absolute_import
from twisted.internet import reactor
from twisted.logger import Logger, globalLogPublisher
from cheeselib.logger import PrintingObserver
from cheeselib.server.rpc import CheeseRPCServerFactory, CheeseRPCServer
from cheeselib.server.storage.mongo import MongoDAO
SERVER_PORT = 18080
log = Logger()
globalLogPublisher.addObserver(PrintingObserver())
dao = MongoDAO()
rpc_server = CheeseRPCServer(dao).getStreamFactory(CheeseRPCServerFactory)
reactor.listenTCP(SERVER_PORT, rpc_server)
log.info("Starting server on port %d..." % SERVER_PORT)
reactor.run()
开发者ID:haf,项目名称:cheesepi,代码行数:22,代码来源:test_server.py
示例20: import
from .defaults import DEFAULT_LOG_FILE
from twisted.logger import (
ILogObserver, jsonFileLogObserver, FilteringLogObserver,
LogLevelFilterPredicate, LogLevel, globalLogPublisher
)
from zope.interface import provider
import io
@provider(ILogObserver)
def hendrixObserver(path=DEFAULT_LOG_FILE, log_level=LogLevel.warn):
json_observer = jsonFileLogObserver(
io.open(path, 'a')
)
return FilteringLogObserver(
json_observer,
[LogLevelFilterPredicate(log_level), ]
)
globalLogPublisher.addObserver(hendrixObserver(log_level=LogLevel.debug))
开发者ID:SlashRoot,项目名称:hendrix,代码行数:22,代码来源:logger.py
注:本文中的twisted.logger.globalLogPublisher.addObserver函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论