• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python log.addObserver函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中twisted.python.log.addObserver函数的典型用法代码示例。如果您正苦于以下问题:Python addObserver函数的具体用法?Python addObserver怎么用?Python addObserver使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了addObserver函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

    def main(self):
        """Parse arguments and run the script's main function via ``react``."""
        observer = None
        try:
            if not self.log_directory.exists():
                self.log_directory.makedirs()
            log_path = self.log_directory.child(
                b"%s-%d.log" % (os.path.basename(self.sys_module.argv[0]),
                                os.getpid()))
            log_file = log_path.open("a")
            observer = FileLogObserver(log_file).emit
            addObserver(observer)
            msg("Arguments: %s" % (self.sys_module.argv,))
        except (OSError, IOError):
            pass

        options = self._parse_options(self.sys_module.argv[1:])
        # XXX: We shouldn't be using this private _reactor API. See
        # https://twistedmatrix.com/trac/ticket/6200 and
        # https://twistedmatrix.com/trac/ticket/7527
        self._react(self.script.main, (options,), _reactor=self._reactor)

        # Not strictly necessary, but nice cleanup for tests:
        if observer is not None:
            removeObserver(observer)
            log_file.close()
开发者ID:alex-docker,项目名称:flocker,代码行数:26,代码来源:script.py


示例2: test_default_item_completed

    def test_default_item_completed(self):
        item = dict(name='name')
        assert self.pipe.item_completed([], item, self.info) is item

        # Check that failures are logged by default
        fail = Failure(Exception())
        results = [(True, 1), (False, fail)]

        events = []
        txlog.addObserver(events.append)
        new_item = self.pipe.item_completed(results, item, self.info)
        txlog.removeObserver(events.append)
        self.flushLoggedErrors()

        assert new_item is item
        assert len(events) == 1
        assert events[0]['logLevel'] == log.ERROR
        assert events[0]['failure'] is fail

        # disable failure logging and check again
        self.pipe.LOG_FAILED_RESULTS = False
        events = []
        txlog.addObserver(events.append)
        new_item = self.pipe.item_completed(results, item, self.info)
        txlog.removeObserver(events.append)
        self.flushLoggedErrors()
        assert new_item is item
        assert len(events) == 0
开发者ID:1012,项目名称:scrapy,代码行数:28,代码来源:test_pipeline_media.py


示例3: test_connectionLostLogMsg

    def test_connectionLostLogMsg(self):
        """
        When a connection is lost, an informative message should be logged
        (see L{getExpectedConnectionLostLogMsg}): an address identifying
        the port and the fact that it was closed.
        """

        loggedMessages = []
        def logConnectionLostMsg(eventDict):
            loggedMessages.append(log.textFromEventDict(eventDict))

        reactor = self.buildReactor()
        p = self.getListeningPort(reactor)
        expectedMessage = self.getExpectedConnectionLostLogMsg(p)
        log.addObserver(logConnectionLostMsg)

        def stopReactor(ignored):
            log.removeObserver(logConnectionLostMsg)
            reactor.stop()

        def doStopListening():
            log.addObserver(logConnectionLostMsg)
            maybeDeferred(p.stopListening).addCallback(stopReactor)

        reactor.callWhenRunning(doStopListening)
        reactor.run()

        self.assertIn(expectedMessage, loggedMessages)
开发者ID:antong,项目名称:twisted,代码行数:28,代码来源:test_tcp.py


示例4: __init__

    def __init__(self, **kwargs):
        '''
        Initialize the C{Scheduler} instance.
        '''

        # Handle command line options
        from optparse import OptionParser
        parser = OptionParser(usage="%prog")
        parser.add_option('-v', '--verbose', action='store_true',
                dest='verbose', default=False, help='Enable verbose mode')

        (options, args) = parser.parse_args()

        # Don't accept any positional arguments
        if args:
            parser.error('Positional parameters are not supported: %s' % ' '.join(args))
            sys.exit(1)

        # Logging
        # TODO: log in database table perhaps? See ticket:6.
        log_fp = open(os.path.join(ssscrape.config.LOG_DIR, 'ssscrape-scheduler.log'), 'a')
        log.addObserver(log.FileLogObserver(log_fp).emit)

        if options.verbose:
            log.addObserver(log.FileLogObserver(sys.stdout).emit)


        # We cannot recover from SQL errors.
        ssscrape.database.add_error_callback(self.stop)
开发者ID:jijkoun,项目名称:ssscrape,代码行数:29,代码来源:scheduler.py


示例5: test_newPluginsOnReadOnlyPath

    def test_newPluginsOnReadOnlyPath(self):
        """
        Verify that a failure to write the dropin.cache file on a read-only
        path will not affect the list of plugins returned.

        Note: this test should pass on both Linux and Windows, but may not
        provide useful coverage on Windows due to the different meaning of
        "read-only directory".
        """
        self.unlockSystem()
        self.sysplug.child('newstuff.py').setContent(pluginFileContents('one'))
        self.lockSystem()

        # Take the developer path out, so that the system plugins are actually
        # examined.
        sys.path.remove(self.devPath.path)

        # Start observing log events to see the warning
        events = []
        addObserver(events.append)
        self.addCleanup(removeObserver, events.append)

        self.assertIn('one', self.getAllPlugins())

        # Make sure something was logged about the cache.
        expected = "Unable to write to plugin cache %s: error number %d" % (
            self.syscache.path, errno.EPERM)
        for event in events:
            if expected in textFromEventDict(event):
                break
        else:
            self.fail(
                "Did not observe unwriteable cache warning in log "
                "events: %r" % (events,))
开发者ID:Alberto-Beralix,项目名称:Beralix,代码行数:34,代码来源:test_plugin.py


示例6: test_send_catch_log

    def test_send_catch_log(self):
        test_signal = object()
        handlers_called = set()

        def log_received(event):
            handlers_called.add(log_received)
            assert "error_handler" in event['message'][0]
            assert event['logLevel'] == log.ERROR

        txlog.addObserver(log_received)
        dispatcher.connect(self.error_handler, signal=test_signal)
        dispatcher.connect(self.ok_handler, signal=test_signal)
        result = yield defer.maybeDeferred(self._get_result, test_signal, arg='test',
                                           handlers_called=handlers_called)

        assert self.error_handler in handlers_called
        assert self.ok_handler in handlers_called
        assert log_received in handlers_called
        self.assertEqual(result[0][0], self.error_handler)
        self.assert_(isinstance(result[0][1], Failure))
        self.assertEqual(result[1], (self.ok_handler, "OK"))

        txlog.removeObserver(log_received)
        self.flushLoggedErrors()
        dispatcher.disconnect(self.error_handler, signal=test_signal)
        dispatcher.disconnect(self.ok_handler, signal=test_signal)
开发者ID:jtwaleson,项目名称:scrapy,代码行数:26,代码来源:test_utils_signal.py


示例7: test_malformedHeaderCGI

    def test_malformedHeaderCGI(self):
        """
        Check for the error message in the duplicated header
        """
        cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)

        portnum = self.startServer(cgiFilename)
        url = "http://localhost:%d/cgi" % (portnum,)
        url = url.encode("ascii")
        agent = client.Agent(reactor)
        d = agent.request(b"GET", url)
        d.addCallback(discardBody)
        loggedMessages = []

        def addMessage(eventDict):
            loggedMessages.append(log.textFromEventDict(eventDict))

        log.addObserver(addMessage)
        self.addCleanup(log.removeObserver, addMessage)

        def checkResponse(ignored):
            self.assertIn("ignoring malformed CGI header: " + repr(b'XYZ'),
                          loggedMessages)

        d.addCallback(checkResponse)
        return d
开发者ID:JohnDoes95,项目名称:project_parser,代码行数:26,代码来源:test_cgi.py


示例8: test_route_connection_refused_error

    def test_route_connection_refused_error(self):
        self.agent_mock.request.side_effect = MockAssist(
            [self._raise_connection_refused_error])
        router_data = dict(node_id="http://somewhere", uaid=dummy_uaid)
        self.router_mock.clear_node.return_value = None
        log.addObserver(self._mockObserver)
        d = self.router.route_notification(self.notif, router_data)

        def verify_retry(fail):
            exc = fail.value
            ok_(exc, RouterException)
            eq_(exc.status_code, 503)
            eq_(len(self.router_mock.clear_node.mock_calls), 1)
            self.router_mock.clear_node.reset_mock()
            self.flushLoggedErrors()

        def verify_deliver(fail):
            ok_(self._contains_err('ConnectionRefusedError'))
            exc = fail.value
            ok_(exc, RouterException)
            eq_(exc.status_code, 503)
            eq_(len(self.router_mock.clear_node.mock_calls), 1)
            self.router_mock.clear_node.reset_mock()
            d = self.router.route_notification(self.notif, router_data)
            d.addBoth(verify_retry)
            return d
        d.addBoth(verify_deliver)
        return d
开发者ID:rmoorman,项目名称:autopush,代码行数:28,代码来源:test_router.py


示例9: observe

def observe(sentry_dsn):
    ''' Decorator adding twisted raven client and blocking client if reactor
    is not yet running.
    :param str sentry_dsn: URL of sentry API
    '''

    # create blocking client:
    raven_client = raven.base.Client(sentry_dsn)

    # add twisted logObserver with twisted raven client:
    observer = get_observer(sentry_dsn)
    if observer:
        log.addObserver(observer)

    import pytest; pytest.set_trace()
    def decorator(function):

        @wraps(function)
        def wrapper(*args, **kwargs):
            ''' Calls original function, catches any exception, sends it
            to sentry and re-raises it again. '''
            try:
                return function(*args, **kwargs)
            except:
                raven_client.captureException(sys.exc_info())
                raise  # re-raise caught exception
        return wrapper

    return decorator
开发者ID:pwilczynskiclearcode,项目名称:raven-python,代码行数:29,代码来源:__init__.py


示例10: main

def main():
    factory = VatsimClientFactory()
    log.startLogging(sys.stdout)
    #log.addObserver(log.FileLogObserver(open("trace.log",'w')))
    addObserver(FileLogObserver(open("trace.log",'w')).emit)
    reactor.connectTCP('USA-E.vatsim.net',6809, factory)
    reactor.run()
开发者ID:scottjab,项目名称:pyvatsim,代码行数:7,代码来源:pyvatsim.py


示例11: start_filelog

def start_filelog(filename=None):
    if filename is None:
        filename = "%s.proxy_log.txt" % datetime.now(
        ).strftime("%Y.%m.%d_%H.%M.%S")
    f = open(filename, "w")
    log.addObserver(MinecraftLogObserver(f).emit)
    msg("Started logging to file %s" % filename)
开发者ID:FrederickGeek8,项目名称:TwistedBot,代码行数:7,代码来源:logbot.py


示例12: run

	def run(self):
		retn = False
		try:
			for user in self._authBlacklist:
				self._userdb.blacklistUser(user)

			site = self._site()

			if not site:
				return False

			# start listen for incoming request
			self.__tcpPort = reactor.listenTCP(self._port,
			                                   site,
	 		                                   self._connections,
			                                   self._listenAddress)

			# setup signal handler
			self.__sighup = False
			signal.signal(signal.SIGHUP, self._sighupHandler)
		        task.LoopingCall(self._reloadTask).start(60, False)

			# start processing
			Logging.info("start listening")
			log.addObserver(logSC3)

			reactor.run()
			retn = True
		except Exception, e:
			Logging.error(str(e))
开发者ID:marcelobianchi,项目名称:seiscomp3,代码行数:30,代码来源:fdsnws.py


示例13: __init__

    def __init__(self):
        class logObserver:
            def __init__(self, con):
                self.con = con

            def emit(self, eventDict):
                edm = eventDict['message']
                if not edm:
                    if eventDict['isError'] and 'failure' in eventDict:
                        text = ((eventDict.get('why') or 'Unhandled Error')
                                + '\n' + eventDict['failure'].getTraceback())
                    elif 'format' in eventDict:
                        text = eventDict['format'] % eventDict
                    else:
                        text = str(eventDict)
                else:
                    text = ' '.join(map(reflect.safe_str, edm))
                
                self.con.addLine(text)


        stdscr = curses.initscr() # initialize curses
        self.screen = Screen(stdscr)   # create Screen object
        log.addObserver(logObserver(self.screen).emit)
        stdscr.refresh()
        reactor.addReader(self.screen) # add screen object as a reader to the reactor
        
        task.LoopingCall(self.screen.updateDisplay).start(.25)
开发者ID:jlew,项目名称:GrooveBot,代码行数:28,代码来源:ConsoleController.py


示例14: main

def main():
    config = ConfigParser.ConfigParser()
    
    config.read(["/etc/awsdns.ini", os.path.abspath("./awsdns.ini"), os.path.expanduser("~/awsnds.ini")])
    
    # TODO: what happens if we use more than one DNS server?
    resolver = EC2Resolver(
        config,
        servers=[(config.get('awsdns', 'dns_server'), 53)]
    )
    
    f = server.DNSServerFactory(clients=[resolver])
    p = dns.DNSDatagramProtocol(f)
    
    reactor.listenUDP(53, p)
    reactor.listenTCP(53, f)
    
    try:
        loglevel = util.logging_constant(config.get('awsdns', 'loglevel'))
    except ConfigParser.NoOptionError:
        loglevel = logging.INFO
    
    try:
        logfile = config.get('awsdns', 'logfile')
        fh = open("awsdns.log", "a")
    except ConfigParser.NoOptionError:
        fh = sys.stdout
    
    observer = LevelFileLogObserver(fh, loglevel)
    
    log.addObserver(observer)
    
    reactor.run()
开发者ID:jjmojojjmojo,项目名称:awsdns,代码行数:33,代码来源:__init__.py


示例15: test_wsgiErrors

    def test_wsgiErrors(self):
        """
        The C{'wsgi.errors'} key of the C{environ} C{dict} passed to the
        application is a file-like object (as defined in the U{Input and Errors
        Streams<http://www.python.org/dev/peps/pep-0333/#input-and-error-streams>}
        section of PEP 333) which converts bytes written to it into events for
        the logging system.
        """
        events = []
        addObserver(events.append)
        self.addCleanup(removeObserver, events.append)

        errors = self.render('GET', '1.1', [], [''])
        def cbErrors((environ, startApplication)):
            errors = environ['wsgi.errors']
            errors.write('some message\n')
            errors.writelines(['another\nmessage\n'])
            errors.flush()
            self.assertEqual(events[0]['message'], ('some message\n',))
            self.assertEqual(events[0]['system'], 'wsgi')
            self.assertTrue(events[0]['isError'])
            self.assertEqual(events[1]['message'], ('another\nmessage\n',))
            self.assertEqual(events[1]['system'], 'wsgi')
            self.assertTrue(events[1]['isError'])
            self.assertEqual(len(events), 2)
        errors.addCallback(cbErrors)
        return errors
开发者ID:AnthonyNystrom,项目名称:YoGoMee,代码行数:27,代码来源:test_wsgi.py


示例16: setUp

 def setUp(self):
     self.catcher = []
     observer = self.catcher.append
     log.addObserver(observer)
     self.addCleanup(log.removeObserver, observer)
     self.db = MagicMock()
     self.ws = MagicMock()
开发者ID:M0Rf30,项目名称:OpenBazaar-Server,代码行数:7,代码来源:test_listeners.py


示例17: assertLogMessage

def assertLogMessage(testCase, expectedMessages, callable, *args, **kwargs):
    """
    Assert that the callable logs the expected messages when called.

    XXX: Put this somewhere where it can be re-used elsewhere. See #6677.

    @param testCase: The test case controlling the test which triggers the
        logged messages and on which assertions will be called.
    @type testCase: L{unittest.SynchronousTestCase}

    @param expectedMessages: A L{list} of the expected log messages
    @type expectedMessages: L{list}

    @param callable: The function which is expected to produce the
        C{expectedMessages} when called.
    @type callable: L{callable}

    @param args: Positional arguments to be passed to C{callable}.
    @type args: L{list}

    @param kwargs: Keyword arguments to be passed to C{callable}.
    @type kwargs: L{dict}
    """
    loggedMessages = []
    log.addObserver(loggedMessages.append)
    testCase.addCleanup(log.removeObserver, loggedMessages.append)

    callable(*args, **kwargs)

    testCase.assertEqual(
        [m['message'][0] for m in loggedMessages],
        expectedMessages)
开发者ID:alfonsjose,项目名称:international-orders-app,代码行数:32,代码来源:test_server.py


示例18: test_deferred

    def test_deferred(self):
        stdout = StringIO()
        log.addObserver(LevelFileLogObserver(stdout, logging.INFO))
        self.call_count = 0

        def test_result(d, count):
            long_calculation(count)
            self.call_count += 1


        @twisted_timeit
        def defer_calc(count_iteration, count_run):
            d = defer.Deferred()
            for i in xrange(count_iteration):
                d.addCallback(test_result, count_run)
            return d

        d = defer_calc(self.count_iteration, self.count_run)
        d.callback(None)

        self.assertEqual(self.call_count, self.count_iteration)

        result_log = stdout.getvalue()
        self.assertEqual('Time for deferred' in result_log, True)
        print '\n', self.sep_line_two, '\nLog from test_deferred:\n', \
            self.sep_line_one, '\n', result_log, '\n', self.sep_line_two, '\n'
开发者ID:viatoriche,项目名称:progress_tests,代码行数:26,代码来源:test.py


示例19: stopOnError

def stopOnError(case, reactor, publisher=None):
    """
    Stop the reactor as soon as any error is logged on the given publisher.

    This is beneficial for tests which will wait for a L{Deferred} to fire
    before completing (by passing or failing).  Certain implementation bugs may
    prevent the L{Deferred} from firing with any result at all (consider a
    protocol's {dataReceived} method that raises an exception: this exception
    is logged but it won't ever cause a L{Deferred} to fire).  In that case the
    test would have to complete by timing out which is a much less desirable
    outcome than completing as soon as the unexpected error is encountered.

    @param case: A L{SynchronousTestCase} to use to clean up the necessary log
        observer when the test is over.
    @param reactor: The reactor to stop.
    @param publisher: A L{LogPublisher} to watch for errors.  If C{None}, the
        global log publisher will be watched.
    """
    if publisher is None:
        from twisted.python import log as publisher
    running = [None]
    def stopIfError(event):
        if running and event.get('isError'):
            running.pop()
            reactor.stop()
    publisher.addObserver(stopIfError)
    case.addCleanup(publisher.removeObserver, stopIfError)
开发者ID:12019,项目名称:OpenWrt_Luci_Lua,代码行数:27,代码来源:reactormixins.py


示例20: main

def main(port, database=None, debug=False, stdout=True):
    if stdout:
        log.startLogging(sys.stdout)
    if debug:
        log.addObserver(shutdownOnError)
    reactor.listenTCP(port, ServerFactory(database))
    reactor.run()
开发者ID:pydsigner,项目名称:Lila,代码行数:7,代码来源:server.py



注:本文中的twisted.python.log.addObserver函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python log.callWithLogger函数代码示例发布时间:2022-05-27
下一篇:
Python lockfile.FilesystemLock类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap