本文整理汇总了Python中twisted.python.log.addObserver函数的典型用法代码示例。如果您正苦于以下问题:Python addObserver函数的具体用法?Python addObserver怎么用?Python addObserver使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了addObserver函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main(self):
"""Parse arguments and run the script's main function via ``react``."""
observer = None
try:
if not self.log_directory.exists():
self.log_directory.makedirs()
log_path = self.log_directory.child(
b"%s-%d.log" % (os.path.basename(self.sys_module.argv[0]),
os.getpid()))
log_file = log_path.open("a")
observer = FileLogObserver(log_file).emit
addObserver(observer)
msg("Arguments: %s" % (self.sys_module.argv,))
except (OSError, IOError):
pass
options = self._parse_options(self.sys_module.argv[1:])
# XXX: We shouldn't be using this private _reactor API. See
# https://twistedmatrix.com/trac/ticket/6200 and
# https://twistedmatrix.com/trac/ticket/7527
self._react(self.script.main, (options,), _reactor=self._reactor)
# Not strictly necessary, but nice cleanup for tests:
if observer is not None:
removeObserver(observer)
log_file.close()
开发者ID:alex-docker,项目名称:flocker,代码行数:26,代码来源:script.py
示例2: test_default_item_completed
def test_default_item_completed(self):
item = dict(name='name')
assert self.pipe.item_completed([], item, self.info) is item
# Check that failures are logged by default
fail = Failure(Exception())
results = [(True, 1), (False, fail)]
events = []
txlog.addObserver(events.append)
new_item = self.pipe.item_completed(results, item, self.info)
txlog.removeObserver(events.append)
self.flushLoggedErrors()
assert new_item is item
assert len(events) == 1
assert events[0]['logLevel'] == log.ERROR
assert events[0]['failure'] is fail
# disable failure logging and check again
self.pipe.LOG_FAILED_RESULTS = False
events = []
txlog.addObserver(events.append)
new_item = self.pipe.item_completed(results, item, self.info)
txlog.removeObserver(events.append)
self.flushLoggedErrors()
assert new_item is item
assert len(events) == 0
开发者ID:1012,项目名称:scrapy,代码行数:28,代码来源:test_pipeline_media.py
示例3: test_connectionLostLogMsg
def test_connectionLostLogMsg(self):
"""
When a connection is lost, an informative message should be logged
(see L{getExpectedConnectionLostLogMsg}): an address identifying
the port and the fact that it was closed.
"""
loggedMessages = []
def logConnectionLostMsg(eventDict):
loggedMessages.append(log.textFromEventDict(eventDict))
reactor = self.buildReactor()
p = self.getListeningPort(reactor)
expectedMessage = self.getExpectedConnectionLostLogMsg(p)
log.addObserver(logConnectionLostMsg)
def stopReactor(ignored):
log.removeObserver(logConnectionLostMsg)
reactor.stop()
def doStopListening():
log.addObserver(logConnectionLostMsg)
maybeDeferred(p.stopListening).addCallback(stopReactor)
reactor.callWhenRunning(doStopListening)
reactor.run()
self.assertIn(expectedMessage, loggedMessages)
开发者ID:antong,项目名称:twisted,代码行数:28,代码来源:test_tcp.py
示例4: __init__
def __init__(self, **kwargs):
'''
Initialize the C{Scheduler} instance.
'''
# Handle command line options
from optparse import OptionParser
parser = OptionParser(usage="%prog")
parser.add_option('-v', '--verbose', action='store_true',
dest='verbose', default=False, help='Enable verbose mode')
(options, args) = parser.parse_args()
# Don't accept any positional arguments
if args:
parser.error('Positional parameters are not supported: %s' % ' '.join(args))
sys.exit(1)
# Logging
# TODO: log in database table perhaps? See ticket:6.
log_fp = open(os.path.join(ssscrape.config.LOG_DIR, 'ssscrape-scheduler.log'), 'a')
log.addObserver(log.FileLogObserver(log_fp).emit)
if options.verbose:
log.addObserver(log.FileLogObserver(sys.stdout).emit)
# We cannot recover from SQL errors.
ssscrape.database.add_error_callback(self.stop)
开发者ID:jijkoun,项目名称:ssscrape,代码行数:29,代码来源:scheduler.py
示例5: test_newPluginsOnReadOnlyPath
def test_newPluginsOnReadOnlyPath(self):
"""
Verify that a failure to write the dropin.cache file on a read-only
path will not affect the list of plugins returned.
Note: this test should pass on both Linux and Windows, but may not
provide useful coverage on Windows due to the different meaning of
"read-only directory".
"""
self.unlockSystem()
self.sysplug.child('newstuff.py').setContent(pluginFileContents('one'))
self.lockSystem()
# Take the developer path out, so that the system plugins are actually
# examined.
sys.path.remove(self.devPath.path)
# Start observing log events to see the warning
events = []
addObserver(events.append)
self.addCleanup(removeObserver, events.append)
self.assertIn('one', self.getAllPlugins())
# Make sure something was logged about the cache.
expected = "Unable to write to plugin cache %s: error number %d" % (
self.syscache.path, errno.EPERM)
for event in events:
if expected in textFromEventDict(event):
break
else:
self.fail(
"Did not observe unwriteable cache warning in log "
"events: %r" % (events,))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:34,代码来源:test_plugin.py
示例6: test_send_catch_log
def test_send_catch_log(self):
test_signal = object()
handlers_called = set()
def log_received(event):
handlers_called.add(log_received)
assert "error_handler" in event['message'][0]
assert event['logLevel'] == log.ERROR
txlog.addObserver(log_received)
dispatcher.connect(self.error_handler, signal=test_signal)
dispatcher.connect(self.ok_handler, signal=test_signal)
result = yield defer.maybeDeferred(self._get_result, test_signal, arg='test',
handlers_called=handlers_called)
assert self.error_handler in handlers_called
assert self.ok_handler in handlers_called
assert log_received in handlers_called
self.assertEqual(result[0][0], self.error_handler)
self.assert_(isinstance(result[0][1], Failure))
self.assertEqual(result[1], (self.ok_handler, "OK"))
txlog.removeObserver(log_received)
self.flushLoggedErrors()
dispatcher.disconnect(self.error_handler, signal=test_signal)
dispatcher.disconnect(self.ok_handler, signal=test_signal)
开发者ID:jtwaleson,项目名称:scrapy,代码行数:26,代码来源:test_utils_signal.py
示例7: test_malformedHeaderCGI
def test_malformedHeaderCGI(self):
"""
Check for the error message in the duplicated header
"""
cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
url = url.encode("ascii")
agent = client.Agent(reactor)
d = agent.request(b"GET", url)
d.addCallback(discardBody)
loggedMessages = []
def addMessage(eventDict):
loggedMessages.append(log.textFromEventDict(eventDict))
log.addObserver(addMessage)
self.addCleanup(log.removeObserver, addMessage)
def checkResponse(ignored):
self.assertIn("ignoring malformed CGI header: " + repr(b'XYZ'),
loggedMessages)
d.addCallback(checkResponse)
return d
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:26,代码来源:test_cgi.py
示例8: test_route_connection_refused_error
def test_route_connection_refused_error(self):
self.agent_mock.request.side_effect = MockAssist(
[self._raise_connection_refused_error])
router_data = dict(node_id="http://somewhere", uaid=dummy_uaid)
self.router_mock.clear_node.return_value = None
log.addObserver(self._mockObserver)
d = self.router.route_notification(self.notif, router_data)
def verify_retry(fail):
exc = fail.value
ok_(exc, RouterException)
eq_(exc.status_code, 503)
eq_(len(self.router_mock.clear_node.mock_calls), 1)
self.router_mock.clear_node.reset_mock()
self.flushLoggedErrors()
def verify_deliver(fail):
ok_(self._contains_err('ConnectionRefusedError'))
exc = fail.value
ok_(exc, RouterException)
eq_(exc.status_code, 503)
eq_(len(self.router_mock.clear_node.mock_calls), 1)
self.router_mock.clear_node.reset_mock()
d = self.router.route_notification(self.notif, router_data)
d.addBoth(verify_retry)
return d
d.addBoth(verify_deliver)
return d
开发者ID:rmoorman,项目名称:autopush,代码行数:28,代码来源:test_router.py
示例9: observe
def observe(sentry_dsn):
''' Decorator adding twisted raven client and blocking client if reactor
is not yet running.
:param str sentry_dsn: URL of sentry API
'''
# create blocking client:
raven_client = raven.base.Client(sentry_dsn)
# add twisted logObserver with twisted raven client:
observer = get_observer(sentry_dsn)
if observer:
log.addObserver(observer)
import pytest; pytest.set_trace()
def decorator(function):
@wraps(function)
def wrapper(*args, **kwargs):
''' Calls original function, catches any exception, sends it
to sentry and re-raises it again. '''
try:
return function(*args, **kwargs)
except:
raven_client.captureException(sys.exc_info())
raise # re-raise caught exception
return wrapper
return decorator
开发者ID:pwilczynskiclearcode,项目名称:raven-python,代码行数:29,代码来源:__init__.py
示例10: main
def main():
factory = VatsimClientFactory()
log.startLogging(sys.stdout)
#log.addObserver(log.FileLogObserver(open("trace.log",'w')))
addObserver(FileLogObserver(open("trace.log",'w')).emit)
reactor.connectTCP('USA-E.vatsim.net',6809, factory)
reactor.run()
开发者ID:scottjab,项目名称:pyvatsim,代码行数:7,代码来源:pyvatsim.py
示例11: start_filelog
def start_filelog(filename=None):
if filename is None:
filename = "%s.proxy_log.txt" % datetime.now(
).strftime("%Y.%m.%d_%H.%M.%S")
f = open(filename, "w")
log.addObserver(MinecraftLogObserver(f).emit)
msg("Started logging to file %s" % filename)
开发者ID:FrederickGeek8,项目名称:TwistedBot,代码行数:7,代码来源:logbot.py
示例12: run
def run(self):
retn = False
try:
for user in self._authBlacklist:
self._userdb.blacklistUser(user)
site = self._site()
if not site:
return False
# start listen for incoming request
self.__tcpPort = reactor.listenTCP(self._port,
site,
self._connections,
self._listenAddress)
# setup signal handler
self.__sighup = False
signal.signal(signal.SIGHUP, self._sighupHandler)
task.LoopingCall(self._reloadTask).start(60, False)
# start processing
Logging.info("start listening")
log.addObserver(logSC3)
reactor.run()
retn = True
except Exception, e:
Logging.error(str(e))
开发者ID:marcelobianchi,项目名称:seiscomp3,代码行数:30,代码来源:fdsnws.py
示例13: __init__
def __init__(self):
class logObserver:
def __init__(self, con):
self.con = con
def emit(self, eventDict):
edm = eventDict['message']
if not edm:
if eventDict['isError'] and 'failure' in eventDict:
text = ((eventDict.get('why') or 'Unhandled Error')
+ '\n' + eventDict['failure'].getTraceback())
elif 'format' in eventDict:
text = eventDict['format'] % eventDict
else:
text = str(eventDict)
else:
text = ' '.join(map(reflect.safe_str, edm))
self.con.addLine(text)
stdscr = curses.initscr() # initialize curses
self.screen = Screen(stdscr) # create Screen object
log.addObserver(logObserver(self.screen).emit)
stdscr.refresh()
reactor.addReader(self.screen) # add screen object as a reader to the reactor
task.LoopingCall(self.screen.updateDisplay).start(.25)
开发者ID:jlew,项目名称:GrooveBot,代码行数:28,代码来源:ConsoleController.py
示例14: main
def main():
config = ConfigParser.ConfigParser()
config.read(["/etc/awsdns.ini", os.path.abspath("./awsdns.ini"), os.path.expanduser("~/awsnds.ini")])
# TODO: what happens if we use more than one DNS server?
resolver = EC2Resolver(
config,
servers=[(config.get('awsdns', 'dns_server'), 53)]
)
f = server.DNSServerFactory(clients=[resolver])
p = dns.DNSDatagramProtocol(f)
reactor.listenUDP(53, p)
reactor.listenTCP(53, f)
try:
loglevel = util.logging_constant(config.get('awsdns', 'loglevel'))
except ConfigParser.NoOptionError:
loglevel = logging.INFO
try:
logfile = config.get('awsdns', 'logfile')
fh = open("awsdns.log", "a")
except ConfigParser.NoOptionError:
fh = sys.stdout
observer = LevelFileLogObserver(fh, loglevel)
log.addObserver(observer)
reactor.run()
开发者ID:jjmojojjmojo,项目名称:awsdns,代码行数:33,代码来源:__init__.py
示例15: test_wsgiErrors
def test_wsgiErrors(self):
"""
The C{'wsgi.errors'} key of the C{environ} C{dict} passed to the
application is a file-like object (as defined in the U{Input and Errors
Streams<http://www.python.org/dev/peps/pep-0333/#input-and-error-streams>}
section of PEP 333) which converts bytes written to it into events for
the logging system.
"""
events = []
addObserver(events.append)
self.addCleanup(removeObserver, events.append)
errors = self.render('GET', '1.1', [], [''])
def cbErrors((environ, startApplication)):
errors = environ['wsgi.errors']
errors.write('some message\n')
errors.writelines(['another\nmessage\n'])
errors.flush()
self.assertEqual(events[0]['message'], ('some message\n',))
self.assertEqual(events[0]['system'], 'wsgi')
self.assertTrue(events[0]['isError'])
self.assertEqual(events[1]['message'], ('another\nmessage\n',))
self.assertEqual(events[1]['system'], 'wsgi')
self.assertTrue(events[1]['isError'])
self.assertEqual(len(events), 2)
errors.addCallback(cbErrors)
return errors
开发者ID:AnthonyNystrom,项目名称:YoGoMee,代码行数:27,代码来源:test_wsgi.py
示例16: setUp
def setUp(self):
self.catcher = []
observer = self.catcher.append
log.addObserver(observer)
self.addCleanup(log.removeObserver, observer)
self.db = MagicMock()
self.ws = MagicMock()
开发者ID:M0Rf30,项目名称:OpenBazaar-Server,代码行数:7,代码来源:test_listeners.py
示例17: assertLogMessage
def assertLogMessage(testCase, expectedMessages, callable, *args, **kwargs):
"""
Assert that the callable logs the expected messages when called.
XXX: Put this somewhere where it can be re-used elsewhere. See #6677.
@param testCase: The test case controlling the test which triggers the
logged messages and on which assertions will be called.
@type testCase: L{unittest.SynchronousTestCase}
@param expectedMessages: A L{list} of the expected log messages
@type expectedMessages: L{list}
@param callable: The function which is expected to produce the
C{expectedMessages} when called.
@type callable: L{callable}
@param args: Positional arguments to be passed to C{callable}.
@type args: L{list}
@param kwargs: Keyword arguments to be passed to C{callable}.
@type kwargs: L{dict}
"""
loggedMessages = []
log.addObserver(loggedMessages.append)
testCase.addCleanup(log.removeObserver, loggedMessages.append)
callable(*args, **kwargs)
testCase.assertEqual(
[m['message'][0] for m in loggedMessages],
expectedMessages)
开发者ID:alfonsjose,项目名称:international-orders-app,代码行数:32,代码来源:test_server.py
示例18: test_deferred
def test_deferred(self):
stdout = StringIO()
log.addObserver(LevelFileLogObserver(stdout, logging.INFO))
self.call_count = 0
def test_result(d, count):
long_calculation(count)
self.call_count += 1
@twisted_timeit
def defer_calc(count_iteration, count_run):
d = defer.Deferred()
for i in xrange(count_iteration):
d.addCallback(test_result, count_run)
return d
d = defer_calc(self.count_iteration, self.count_run)
d.callback(None)
self.assertEqual(self.call_count, self.count_iteration)
result_log = stdout.getvalue()
self.assertEqual('Time for deferred' in result_log, True)
print '\n', self.sep_line_two, '\nLog from test_deferred:\n', \
self.sep_line_one, '\n', result_log, '\n', self.sep_line_two, '\n'
开发者ID:viatoriche,项目名称:progress_tests,代码行数:26,代码来源:test.py
示例19: stopOnError
def stopOnError(case, reactor, publisher=None):
"""
Stop the reactor as soon as any error is logged on the given publisher.
This is beneficial for tests which will wait for a L{Deferred} to fire
before completing (by passing or failing). Certain implementation bugs may
prevent the L{Deferred} from firing with any result at all (consider a
protocol's {dataReceived} method that raises an exception: this exception
is logged but it won't ever cause a L{Deferred} to fire). In that case the
test would have to complete by timing out which is a much less desirable
outcome than completing as soon as the unexpected error is encountered.
@param case: A L{SynchronousTestCase} to use to clean up the necessary log
observer when the test is over.
@param reactor: The reactor to stop.
@param publisher: A L{LogPublisher} to watch for errors. If C{None}, the
global log publisher will be watched.
"""
if publisher is None:
from twisted.python import log as publisher
running = [None]
def stopIfError(event):
if running and event.get('isError'):
running.pop()
reactor.stop()
publisher.addObserver(stopIfError)
case.addCleanup(publisher.removeObserver, stopIfError)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:27,代码来源:reactormixins.py
示例20: main
def main(port, database=None, debug=False, stdout=True):
if stdout:
log.startLogging(sys.stdout)
if debug:
log.addObserver(shutdownOnError)
reactor.listenTCP(port, ServerFactory(database))
reactor.run()
开发者ID:pydsigner,项目名称:Lila,代码行数:7,代码来源:server.py
注:本文中的twisted.python.log.addObserver函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论