• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python utils.get_logger函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中swift.common.utils.get_logger函数的典型用法代码示例。如果您正苦于以下问题:Python get_logger函数的具体用法?Python get_logger怎么用?Python get_logger使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了get_logger函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: __init__

 def __init__(self, app, conf):
     #: The next WSGI application/filter in the paste.deploy pipeline.
     self.app = app
     #: The filter configuration dict.
     self.conf = conf
     #: The seconds to cache the x-container-meta-web-* headers.,
     self.cache_timeout = int(conf.get('cache_timeout', 300))
     #: Logger for this filter.
     self.logger = get_logger(conf, log_route='staticweb')
     access_log_conf = {}
     for key in ('log_facility', 'log_name', 'log_level'):
         value = conf.get('access_' + key, conf.get(key, None))
         if value:
             access_log_conf[key] = value
     #: Web access logger for this filter.
     self.access_logger = get_logger(access_log_conf,
                                     log_route='staticweb-access')
     #: Indicates whether full HTTP headers should be logged or not.
     self.log_headers = conf.get('log_headers') == 'True'
     # Results from the last call to self._start_response.
     self._response_status = None
     self._response_headers = None
     self._response_exc_info = None
     # Results from the last call to self._get_container_info.
     self._index = self._error = self._listings = self._listings_css = None
开发者ID:OpenStack-Kha,项目名称:swift,代码行数:25,代码来源:staticweb.py


示例2: test_get_logger

 def test_get_logger(self):
     sio = StringIO()
     logger = logging.getLogger('server')
     logger.addHandler(logging.StreamHandler(sio))
     logger = utils.get_logger(None, 'server', log_route='server')
     logger.warn('test1')
     self.assertEquals(sio.getvalue(), 'test1\n')
     logger.debug('test2')
     self.assertEquals(sio.getvalue(), 'test1\n')
     logger = utils.get_logger({'log_level': 'DEBUG'}, 'server',
                               log_route='server')
     logger.debug('test3')
     self.assertEquals(sio.getvalue(), 'test1\ntest3\n')
     # Doesn't really test that the log facility is truly being used all the
     # way to syslog; but exercises the code.
     logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server',
                               log_route='server')
     logger.warn('test4')
     self.assertEquals(sio.getvalue(),
                       'test1\ntest3\ntest4\n')
     # make sure debug doesn't log by default
     logger.debug('test5')
     self.assertEquals(sio.getvalue(),
                       'test1\ntest3\ntest4\n')
     # make sure notice lvl logs by default
     logger.notice('test6')
     self.assertEquals(sio.getvalue(),
                       'test1\ntest3\ntest4\ntest6\n')
开发者ID:AsylumCorp,项目名称:swift,代码行数:28,代码来源:test_utils.py


示例3: get_socket

def get_socket(conf):
    """Bind socket to bind ip:port in conf

    :param conf: Configuration dict to read settings from

    :returns: a socket object as returned from socket.listen or
              ssl.wrap_socket if conf specifies cert_file
    """
    try:
        bind_port = int(conf['bind_port'])
    except (ValueError, KeyError, TypeError):
        raise ConfigFilePortError()
    bind_addr = (conf.get('bind_ip', '0.0.0.0'), bind_port)
    address_family = [addr[0] for addr in socket.getaddrinfo(
        bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
        if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]
    sock = None
    bind_timeout = int(conf.get('bind_timeout', 30))
    retry_until = time.time() + bind_timeout
    warn_ssl = False

    try:
        keepidle = int(conf.get('keep_idle', 600))
        if keepidle <= 0 or keepidle >= 2 ** 15 - 1:
            raise ValueError()
    except (ValueError, KeyError, TypeError):
        raise ConfigFileError()

    while not sock and time.time() < retry_until:
        try:
            sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)),
                          family=address_family)
            if 'cert_file' in conf:
                warn_ssl = True
                sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
                                       keyfile=conf['key_file'])
        except socket.error as err:
            if err.args[0] != errno.EADDRINUSE:
                raise
            sleep(0.1)
    if not sock:
        raise Exception(_('Could not bind to %(addr)s:%(port)s '
                          'after trying for %(timeout)s seconds') % {
                              'addr': bind_addr[0], 'port': bind_addr[1],
                              'timeout': bind_timeout})
    # in my experience, sockets can hang around forever without keepalive
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    if hasattr(socket, 'TCP_KEEPIDLE'):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, keepidle)
    if warn_ssl:
        ssl_warning_message = _('WARNING: SSL should only be enabled for '
                                'testing purposes. Use external SSL '
                                'termination for a production deployment.')
        get_logger(conf).warning(ssl_warning_message)
        print(ssl_warning_message)
    return sock
开发者ID:mahak,项目名称:swift,代码行数:57,代码来源:wsgi.py


示例4: test_get_logger_console

 def test_get_logger_console(self):
     reload(utils)  # reset get_logger attrs
     logger = utils.get_logger(None)
     self.assertFalse(hasattr(utils.get_logger, 'console'))
     logger = utils.get_logger(None, log_to_console=True)
     self.assert_(hasattr(utils.get_logger, 'console'))
     self.assert_(isinstance(utils.get_logger.console,
                             logging.StreamHandler))
     # make sure you can't have two console handlers
     old_handler = utils.get_logger.console
     logger = utils.get_logger(None, log_to_console=True)
     self.assertNotEquals(utils.get_logger.console, old_handler)
     logger.logger.removeHandler(utils.get_logger.console)
开发者ID:edwardt,项目名称:swift,代码行数:13,代码来源:test_utils.py


示例5: test_capture_stdio

    def test_capture_stdio(self):
        # stubs
        logger = utils.get_logger(None, 'dummy')

        # mock utils system modules
        _orig_sys = utils.sys
        _orig_os = utils.os
        try:
            utils.sys = MockSys()
            utils.os = MockOs()

            # basic test
            utils.capture_stdio(logger)
            self.assert_(utils.sys.excepthook is not None)
            self.assertEquals(utils.os.closed_fds, utils.sys.stdio_fds)
            self.assert_(isinstance(utils.sys.stdout, utils.LoggerFileObject))
            self.assert_(isinstance(utils.sys.stderr, utils.LoggerFileObject))

            # reset; test same args, but exc when trying to close stdio
            utils.os = MockOs(raise_funcs=('dup2',))
            utils.sys = MockSys()

            # test unable to close stdio
            utils.capture_stdio(logger)
            self.assert_(utils.sys.excepthook is not None)
            self.assertEquals(utils.os.closed_fds, [])
            self.assert_(isinstance(utils.sys.stdout, utils.LoggerFileObject))
            self.assert_(isinstance(utils.sys.stderr, utils.LoggerFileObject))

            # reset; test some other args
            utils.os = MockOs()
            utils.sys = MockSys()
            logger = utils.get_logger(None, log_to_console=True)

            # test console log
            utils.capture_stdio(logger, capture_stdout=False,
                                capture_stderr=False)
            self.assert_(utils.sys.excepthook is not None)
            # when logging to console, stderr remains open
            self.assertEquals(utils.os.closed_fds, utils.sys.stdio_fds[:2])
            reset_loggers()

            # stdio not captured
            self.assertFalse(isinstance(utils.sys.stdout,
                                        utils.LoggerFileObject))
            self.assertFalse(isinstance(utils.sys.stderr,
                                        utils.LoggerFileObject))
            reset_loggers()
        finally:
            utils.sys = _orig_sys
            utils.os = _orig_os
开发者ID:AsylumCorp,项目名称:swift,代码行数:51,代码来源:test_utils.py


示例6: get_socket

def get_socket(conf):
    """Bind socket to bind ip:port in conf

    :param conf: Configuration dict to read settings from

    :returns : a socket object as returned from socket.listen or
               ssl.wrap_socket if conf specifies cert_file
    """
    try:
        bind_port = int(conf["bind_port"])
    except (ValueError, KeyError, TypeError):
        raise ConfigFilePortError()
    bind_addr = (conf.get("bind_ip", "0.0.0.0"), bind_port)
    address_family = [
        addr[0]
        for addr in socket.getaddrinfo(bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
        if addr[0] in (socket.AF_INET, socket.AF_INET6)
    ][0]
    sock = None
    bind_timeout = int(conf.get("bind_timeout", 30))
    retry_until = time.time() + bind_timeout
    warn_ssl = False
    while not sock and time.time() < retry_until:
        try:
            sock = listen(bind_addr, backlog=int(conf.get("backlog", 4096)), family=address_family)
            if "cert_file" in conf:
                warn_ssl = True
                sock = ssl.wrap_socket(sock, certfile=conf["cert_file"], keyfile=conf["key_file"])
        except socket.error as err:
            if err.args[0] != errno.EADDRINUSE:
                raise
            sleep(0.1)
    if not sock:
        raise Exception(
            _("Could not bind to %s:%s " "after trying for %s seconds") % (bind_addr[0], bind_addr[1], bind_timeout)
        )
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    # in my experience, sockets can hang around forever without keepalive
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    if hasattr(socket, "TCP_KEEPIDLE"):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
    if warn_ssl:
        ssl_warning_message = _(
            "WARNING: SSL should only be enabled for "
            "testing purposes. Use external SSL "
            "termination for a production deployment."
        )
        get_logger(conf).warning(ssl_warning_message)
        print(ssl_warning_message)
    return sock
开发者ID:vandanashah,项目名称:swift,代码行数:51,代码来源:wsgi.py


示例7: __init__

 def __init__(self, app, conf):
     self.app = app
     self.conf = conf
     self.logger = get_logger(conf, log_route='lite-swauth')
     self.profile_path = 'profile'
     self.super_admin_key = conf.get('super_admin_key')
     if not self.super_admin_key:
         msg = 'No super_admin_key set in conf file; ' \
               'Swauth administration features will be disabled.'
         try:
             self.logger.warn(msg)
         except Exception:
             pass
     self.auth_prefix = conf.get('auth_prefix', '/auth/')
     if not self.auth_prefix:
         self.auth_prefix = '/auth/'
     if self.auth_prefix[0] != '/':
         self.auth_prefix = '/' + self.auth_prefix
     if self.auth_prefix[-1] != '/':
         self.auth_prefix += '/'
     self.version = 'v2'
     # url for whitelist objects
     # Example: /v1/liteauth/whitelist
     self.whitelist_url = conf.get('whitelist_url', '').lower().rstrip('/')
     if not self.whitelist_url:
         raise ValueError('whitelist_url not set in config file')
     # url for invite objects
     # Example: /v1/liteauth/invites
     self.invite_url = conf.get('invite_url', '').lower().rstrip('/')
     if not self.invite_url:
         raise ValueError('invite_url not set in config file')
开发者ID:mgeisler,项目名称:liteauth,代码行数:31,代码来源:swauth_manager.py


示例8: __init__

 def __init__(self, conf):
     """
     Creates a new WSGI application for the Swift Object Server. An
     example configuration is given at
     <source-dir>/etc/object-server.conf-sample or
     /etc/swift/object-server.conf-sample.
     """
     self.logger = get_logger(conf, log_route='object-server')
     self.devices = conf.get('devices', '/srv/node/')
     self.mount_check = conf.get('mount_check', 'true').lower() in \
                           ('true', 't', '1', 'on', 'yes', 'y')
     self.node_timeout = int(conf.get('node_timeout', 3))
     self.conn_timeout = float(conf.get('conn_timeout', 0.5))
     self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
     self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
     self.log_requests = conf.get('log_requests', 't')[:1].lower() == 't'
     self.max_upload_time = int(conf.get('max_upload_time', 86400))
     self.slow = int(conf.get('slow', 0))
     self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
     default_allowed_headers = '''
         content-disposition,
         content-encoding,
         x-delete-at,
         x-object-manifest,
     '''
     self.allowed_headers = set(i.strip().lower() for i in
             conf.get('allowed_headers',
             default_allowed_headers).split(',') if i.strip() and
             i.strip().lower() not in DISALLOWED_HEADERS)
     self.expiring_objects_account = \
         (conf.get('auto_create_account_prefix') or '.') + \
         'expiring_objects'
     self.expiring_objects_container_divisor = \
         int(conf.get('expiring_objects_container_divisor') or 86400)
开发者ID:Nupta,项目名称:swift,代码行数:34,代码来源:server.py


示例9: __init__

 def __init__(self, app, conf):
     self.app = app
     self.conf = conf
     self.logger = get_logger(conf, log_route='liteauth')
     self.provider = load_oauth_provider(
         conf.get('oauth_provider', 'google_oauth'))
     self.auth_endpoint = conf.get('auth_endpoint', '')
     if not self.auth_endpoint:
         raise ValueError('auth_endpoint not set in config file')
     if isinstance(self.auth_endpoint, unicode):
         self.auth_endpoint = self.auth_endpoint.encode('utf-8')
     parsed_path = urlparse(self.auth_endpoint)
     if not parsed_path.netloc:
         raise ValueError('auth_endpoint is invalid in config file')
     self.auth_domain = parsed_path.netloc
     self.login_path = parsed_path.path
     self.scheme = parsed_path.scheme
     if self.scheme != 'https':
         raise ValueError('auth_endpoint must have https:// scheme')
     # by default service_domain can be extracted from the endpoint
     # in case where auth domain is different from service domain
     # you need to set up the service domain separately
     # Example:
     # auth_endpoint = https://auth.example.com/login
     # service_domain = https://www.example.com
     self.service_domain = conf.get('service_domain',
                                    '%s://%s'
                                    % (self.scheme, self.auth_domain))
     self.storage_driver = None
     self.oauth_login_timeout = 3600
开发者ID:pkit,项目名称:liteauth,代码行数:30,代码来源:oauth_login.py


示例10: __init__

 def __init__(self, conf):
     self.logger = get_logger(conf)
     self.root = conf.get('devices', '/srv/node')
     self.mount_check = conf.get('mount_check', 'true').lower() in \
                           ('true', 't', '1', 'on', 'yes', 'y')
     self.replicator_rpc = \
         ReplicatorRpc(self.root, DATADIR, AccountBroker, self.mount_check)
开发者ID:edwardt,项目名称:swift,代码行数:7,代码来源:server.py


示例11: __init__

    def __init__(self, app, conf):
        self.app = app
        self.logger = get_logger(conf)

        # TODO Check pipeline

        self.verb_acl = {}
        verb_acl = conf.get('verb_acl', None)
        if verb_acl is None:
            raise ValueError(
                'verb_acl: missing "verb_acl" configuration entry')
        for acl in verb_acl.split(';'):
            if not acl:
                continue
            if acl.count(':') != 1:
                raise ValueError('verb_acl: bad format: "%s"' % acl)
            methods, blocks = acl.split(':', 1)
            for method in methods.split(','):
                if not method:
                    raise ValueError(
                        'verb_acl: bad format: missing method: "%s"' % acl)
                for block in blocks.split(','):
                    if not block:
                        raise ValueError(
                            'verb_acl: bad format: empty address block: "%s"' %
                            acl)
                    self.verb_acl.setdefault(method.upper(), []).append(block)
        self.logger.debug("Verb ACL: " + str(self.verb_acl))
开发者ID:jfsmig,项目名称:oio-swift,代码行数:28,代码来源:verb_acl.py


示例12: test_run_daemon

    def test_run_daemon(self):
        sample_conf = "[my-daemon]\nuser = %s\n" % getuser()
        with tmpfile(sample_conf) as conf_file:
            with mock.patch.dict('os.environ', {'TZ': ''}):
                with mock.patch('time.tzset') as mock_tzset:
                    daemon.run_daemon(MyDaemon, conf_file)
                    self.assertTrue(MyDaemon.forever_called)
                    self.assertEqual(os.environ['TZ'], 'UTC+0')
                    self.assertEqual(mock_tzset.mock_calls, [mock.call()])
            daemon.run_daemon(MyDaemon, conf_file, once=True)
            self.assertEqual(MyDaemon.once_called, True)

            # test raise in daemon code
            with mock.patch.object(MyDaemon, 'run_once', MyDaemon.run_raise):
                self.assertRaises(OSError, daemon.run_daemon, MyDaemon,
                                  conf_file, once=True)

            # test user quit
            sio = StringIO()
            logger = logging.getLogger('server')
            logger.addHandler(logging.StreamHandler(sio))
            logger = utils.get_logger(None, 'server', log_route='server')
            with mock.patch.object(MyDaemon, 'run_forever', MyDaemon.run_quit):
                daemon.run_daemon(MyDaemon, conf_file, logger=logger)
            self.assertTrue('user quit' in sio.getvalue().lower())

            # test missing section
            sample_conf = "[default]\nuser = %s\n" % getuser()
            with tmpfile(sample_conf) as conf_file:
                self.assertRaisesRegexp(SystemExit,
                                        'Unable to find my-daemon '
                                        'config section in.*',
                                        daemon.run_daemon, MyDaemon,
                                        conf_file, once=True)
开发者ID:clayg,项目名称:swift,代码行数:34,代码来源:test_daemon.py


示例13: __init__

 def __init__(self, conf):
     self.conf = conf
     self.logger = get_logger(conf, log_route='container-updater')
     self.devices = conf.get('devices', '/srv/node')
     self.mount_check = config_true_value(conf.get('mount_check', 'true'))
     self.swift_dir = conf.get('swift_dir', '/etc/swift')
     self.interval = int(conf.get('interval', 300))
     self.account_ring = None
     self.concurrency = int(conf.get('concurrency', 4))
     self.slowdown = float(conf.get('slowdown', 0.01))
     self.node_timeout = float(conf.get('node_timeout', 3))
     self.conn_timeout = float(conf.get('conn_timeout', 0.5))
     self.no_changes = 0
     self.successes = 0
     self.failures = 0
     self.account_suppressions = {}
     self.account_suppression_time = \
         float(conf.get('account_suppression_time', 60))
     self.new_account_suppressions = None
     swift.common.db.DB_PREALLOCATION = \
         config_true_value(conf.get('db_preallocation', 'f'))
     self.recon_cache_path = conf.get('recon_cache_path',
                                      '/var/cache/swift')
     self.rcache = os.path.join(self.recon_cache_path, "container.recon")
     self.user_agent = 'container-updater %s' % os.getpid()
开发者ID:ISCAS-VDI,项目名称:swift-base,代码行数:25,代码来源:updater.py


示例14: __init__

 def __init__(self, conf, logger=None):
     super(ContainerController, self).__init__(conf)
     self.logger = logger or get_logger(conf, log_route='container-server')
     self.log_requests = config_true_value(conf.get('log_requests', 'true'))
     self.root = conf.get('devices', '/srv/node')
     self.mount_check = config_true_value(conf.get('mount_check', 'true'))
     self.node_timeout = int(conf.get('node_timeout', 3))
     self.conn_timeout = float(conf.get('conn_timeout', 0.5))
     #: ContainerSyncCluster instance for validating sync-to values.
     self.realms_conf = ContainerSyncRealms(
         os.path.join(
             conf.get('swift_dir', '/etc/swift'),
             'container-sync-realms.conf'),
         self.logger)
     #: The list of hosts we're allowed to send syncs to. This can be
     #: overridden by data in self.realms_conf
     self.allowed_sync_hosts = [
         h.strip()
         for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
         if h.strip()]
     self.replicator_rpc = ContainerReplicatorRpc(
         self.root, DATADIR, ContainerBroker, self.mount_check,
         logger=self.logger)
     self.auto_create_account_prefix = \
         conf.get('auto_create_account_prefix') or '.'
     if config_true_value(conf.get('allow_versions', 'f')):
         self.save_headers.append('x-versions-location')
     swift.common.db.DB_PREALLOCATION = \
         config_true_value(conf.get('db_preallocation', 'f'))
开发者ID:steveruckdashel,项目名称:swift,代码行数:29,代码来源:server.py


示例15: test_delegate_methods_with_metric_prefix

    def test_delegate_methods_with_metric_prefix(self):
        self.logger = utils.get_logger({
            'log_statsd_host': 'localhost',
            'log_statsd_port': str(self.port),
            'log_statsd_metric_prefix': 'alpha.beta',
        }, 'pfx')
        self.assertStat('alpha.beta.pfx.some.counter:1|c',
                        self.logger.increment, 'some.counter')
        self.assertStat('alpha.beta.pfx.some.counter:-1|c',
                        self.logger.decrement, 'some.counter')
        self.assertStat('alpha.beta.pfx.some.operation:4760.0|ms',
                        self.logger.timing, 'some.operation', 4.76 * 1000)
        self.assertStatMatches(
            'alpha\.beta\.pfx\.another\.op:\d+\.\d+\|ms',
            self.logger.timing_since, 'another.op', time.time())
        self.assertStat('alpha.beta.pfx.another.counter:3|c',
                        self.logger.update_stats, 'another.counter', 3)

        self.logger.set_statsd_prefix('')
        self.assertStat('alpha.beta.some.counter:1|c|@0.9912',
                        self.logger.increment, 'some.counter',
                        sample_rate=0.9912)
        self.assertStat('alpha.beta.some.counter:-1|c|@0.9912',
                        self.logger.decrement, 'some.counter', 0.9912)
        self.assertStat('alpha.beta.some.operation:4900.0|ms|@0.9912',
                        self.logger.timing, 'some.operation', 4.9 * 1000,
                        sample_rate=0.9912)
        self.assertStatMatches('alpha\.beta\.another\.op:\d+\.\d+\|ms|@0.9912',
                               self.logger.timing_since, 'another.op',
                               time.time(), sample_rate=0.9912)
        self.assertStat('alpha.beta.another.counter:3|c|@0.9912',
                        self.logger.update_stats, 'another.counter', 3,
                        sample_rate=0.9912)
开发者ID:AsylumCorp,项目名称:swift,代码行数:33,代码来源:test_utils.py


示例16: __init__

    def __init__(self, app, conf):
        """
        This function is called when Swift Proxy inits.
        """
        self.app = app
        self.conf = conf
        self.logger = get_logger(conf, log_route='customauth')
        self.log_headers = config_true_value(conf.get('log_headers', 'f'))
        self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
        if self.reseller_prefix and self.reseller_prefix[-1] != '_':
            self.reseller_prefix += '_'
        self.logger.set_statsd_prefix('customauth.%s' % (
            self.reseller_prefix if self.reseller_prefix else 'NONE',))
        self.auth_prefix = conf.get('auth_prefix', '/auth/')
        #Organization
        self.organization_id = conf.get('organization_id', '57b69c457792482c8d817c4945c6c8a8')


        #Keystone
        self.keystone_auth_endpoint = conf.get('keystone_auth_endpoint', 'http://cloud.lab.fiware.org:4730/v2.0/tokens')
        self.keystone_tenant_endpoint = conf.get('keystone_tenant_endpoint', 'http://cloud.lab.fiware.org:4730/v2.0/tenants')
        if not self.auth_prefix or not self.auth_prefix.strip('/'):
            self.logger.warning('Rewriting invalid auth prefix "%s" to '
                                '"/auth/" (Non-empty auth prefix path '
                                'is required)' % self.auth_prefix)
            self.auth_prefix = '/auth/'
        if self.auth_prefix[0] != '/':
            self.auth_prefix = '/' + self.auth_prefix
        if self.auth_prefix[-1] != '/':
            self.auth_prefix += '/'
        self.token_life = int(conf.get('token_life', 86400))
        self.allow_overrides = config_true_value(
            conf.get('allow_overrides', 't'))
        self.storage_url_scheme = conf.get('storage_url_scheme', 'default')
        self.logger.info('CustomAuth v1.3 loaded successfully')
开发者ID:FINESCE,项目名称:HybridCloudDataManagement,代码行数:35,代码来源:customauth.py


示例17: __init__

 def __init__(self, app, conf):
     self.app = app
     swift_dir = conf.get('swift_dir', '/etc/swift')
     self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
     self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
     self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
     self.logger = get_logger(conf, log_route='lxc-swift')
开发者ID:zfeldstein,项目名称:swift-lxc,代码行数:7,代码来源:swift_lxc_proxy.py


示例18: __init__

 def __init__(self, conf):
     """
     :param conf: configuration object obtained from ConfigParser
     :param logger: logging object
     """
     self.conf = conf
     self.logger = get_logger(conf, log_route='object-replicator')
     self.devices_dir = conf.get('devices', '/srv/node')
     self.mount_check = conf.get('mount_check', 'true').lower() in \
                           ('true', 't', '1', 'on', 'yes', 'y')
     self.vm_test_mode = conf.get(
             'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
     self.swift_dir = conf.get('swift_dir', '/etc/swift')
     self.port = int(conf.get('bind_port', 6000))
     self.concurrency = int(conf.get('concurrency', 1))
     self.stats_interval = int(conf.get('stats_interval', '300'))
     self.object_ring = Ring(self.swift_dir, ring_name='object')
     self.ring_check_interval = int(conf.get('ring_check_interval', 15))
     self.next_check = time.time() + self.ring_check_interval
     self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
     self.partition_times = []
     self.run_pause = int(conf.get('run_pause', 30))
     self.rsync_timeout = int(conf.get('rsync_timeout', 900))
     self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
     self.http_timeout = int(conf.get('http_timeout', 60))
     self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
     self.recon_enable = conf.get(
             'recon_enable', 'no').lower() in TRUE_VALUES
     self.recon_cache_path = conf.get(
             'recon_cache_path', '/var/cache/swift')
     self.recon_object = os.path.join(self.recon_cache_path, "object.recon")
开发者ID:bn-emailops,项目名称:swift,代码行数:31,代码来源:replicator.py


示例19: filter_factory

def filter_factory(global_conf, **local_conf):
    """Returns the WSGI filter for use with paste.deploy."""
    conf = global_conf.copy()
    conf.update(local_conf)

    defaults = {
        'methods': 'GET HEAD PUT POST DELETE',
        'incoming_remove_headers': DEFAULT_INCOMING_REMOVE_HEADERS,
        'incoming_allow_headers': DEFAULT_INCOMING_ALLOW_HEADERS,
        'outgoing_remove_headers': DEFAULT_OUTGOING_REMOVE_HEADERS,
        'outgoing_allow_headers': DEFAULT_OUTGOING_ALLOW_HEADERS,
        'allowed_digests': DEFAULT_ALLOWED_DIGESTS,
    }
    info_conf = {k: conf.get(k, v).split() for k, v in defaults.items()}

    allowed_digests = set(digest.lower()
                          for digest in info_conf['allowed_digests'])
    not_supported = allowed_digests - SUPPORTED_DIGESTS
    if not_supported:
        logger = get_logger(conf, log_route='tempurl')
        logger.warning('The following digest algorithms are configured but '
                       'not supported: %s', ', '.join(not_supported))
        allowed_digests -= not_supported
    if not allowed_digests:
        raise ValueError('No valid digest algorithms are configured '
                         'for tempurls')
    info_conf['allowed_digests'] = sorted(allowed_digests)

    register_swift_info('tempurl', **info_conf)
    conf.update(info_conf)

    return lambda app: TempURL(app, conf)
开发者ID:chenzhongtao,项目名称:swift,代码行数:32,代码来源:tempurl.py


示例20: __init__

 def __init__(self, conf, logger=None):
     self.conf = conf
     self.logger = logger or get_logger(conf, log_route='object-updater')
     self.devices = conf.get('devices', '/srv/node')
     self.mount_check = config_true_value(conf.get('mount_check', 'true'))
     self.swift_dir = conf.get('swift_dir', '/etc/swift')
     self.interval = int(conf.get('interval', 300))
     self.container_ring = None
     self.concurrency = int(conf.get('concurrency', 1))
     if 'slowdown' in conf:
         self.logger.warning(
             'The slowdown option is deprecated in favor of '
             'objects_per_second. This option may be ignored in a '
             'future release.')
         objects_per_second = 1 / (
             float(conf.get('slowdown', '0.01')) + 0.01)
     else:
         objects_per_second = 50
     self.objects_running_time = 0
     self.max_objects_per_second = \
         float(conf.get('objects_per_second',
                        objects_per_second))
     self.node_timeout = float(conf.get('node_timeout', 10))
     self.conn_timeout = float(conf.get('conn_timeout', 0.5))
     self.report_interval = float(conf.get('report_interval', 300))
     self.recon_cache_path = conf.get('recon_cache_path',
                                      '/var/cache/swift')
     self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
     self.stats = SweepStats()
开发者ID:chenzhongtao,项目名称:swift,代码行数:29,代码来源:updater.py



注:本文中的swift.common.utils.get_logger函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python utils.get_param函数代码示例发布时间:2022-05-27
下一篇:
Python utils.get_log_line函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap