本文整理汇总了Python中swift.common.utils.get_logger函数的典型用法代码示例。如果您正苦于以下问题:Python get_logger函数的具体用法?Python get_logger怎么用?Python get_logger使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_logger函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, app, conf):
#: The next WSGI application/filter in the paste.deploy pipeline.
self.app = app
#: The filter configuration dict.
self.conf = conf
#: The seconds to cache the x-container-meta-web-* headers.,
self.cache_timeout = int(conf.get('cache_timeout', 300))
#: Logger for this filter.
self.logger = get_logger(conf, log_route='staticweb')
access_log_conf = {}
for key in ('log_facility', 'log_name', 'log_level'):
value = conf.get('access_' + key, conf.get(key, None))
if value:
access_log_conf[key] = value
#: Web access logger for this filter.
self.access_logger = get_logger(access_log_conf,
log_route='staticweb-access')
#: Indicates whether full HTTP headers should be logged or not.
self.log_headers = conf.get('log_headers') == 'True'
# Results from the last call to self._start_response.
self._response_status = None
self._response_headers = None
self._response_exc_info = None
# Results from the last call to self._get_container_info.
self._index = self._error = self._listings = self._listings_css = None
开发者ID:OpenStack-Kha,项目名称:swift,代码行数:25,代码来源:staticweb.py
示例2: test_get_logger
def test_get_logger(self):
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server', log_route='server')
logger.warn('test1')
self.assertEquals(sio.getvalue(), 'test1\n')
logger.debug('test2')
self.assertEquals(sio.getvalue(), 'test1\n')
logger = utils.get_logger({'log_level': 'DEBUG'}, 'server',
log_route='server')
logger.debug('test3')
self.assertEquals(sio.getvalue(), 'test1\ntest3\n')
# Doesn't really test that the log facility is truly being used all the
# way to syslog; but exercises the code.
logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server',
log_route='server')
logger.warn('test4')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure debug doesn't log by default
logger.debug('test5')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure notice lvl logs by default
logger.notice('test6')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\ntest6\n')
开发者ID:AsylumCorp,项目名称:swift,代码行数:28,代码来源:test_utils.py
示例3: get_socket
def get_socket(conf):
"""Bind socket to bind ip:port in conf
:param conf: Configuration dict to read settings from
:returns: a socket object as returned from socket.listen or
ssl.wrap_socket if conf specifies cert_file
"""
try:
bind_port = int(conf['bind_port'])
except (ValueError, KeyError, TypeError):
raise ConfigFilePortError()
bind_addr = (conf.get('bind_ip', '0.0.0.0'), bind_port)
address_family = [addr[0] for addr in socket.getaddrinfo(
bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]
sock = None
bind_timeout = int(conf.get('bind_timeout', 30))
retry_until = time.time() + bind_timeout
warn_ssl = False
try:
keepidle = int(conf.get('keep_idle', 600))
if keepidle <= 0 or keepidle >= 2 ** 15 - 1:
raise ValueError()
except (ValueError, KeyError, TypeError):
raise ConfigFileError()
while not sock and time.time() < retry_until:
try:
sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)),
family=address_family)
if 'cert_file' in conf:
warn_ssl = True
sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
keyfile=conf['key_file'])
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
sleep(0.1)
if not sock:
raise Exception(_('Could not bind to %(addr)s:%(port)s '
'after trying for %(timeout)s seconds') % {
'addr': bind_addr[0], 'port': bind_addr[1],
'timeout': bind_timeout})
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, keepidle)
if warn_ssl:
ssl_warning_message = _('WARNING: SSL should only be enabled for '
'testing purposes. Use external SSL '
'termination for a production deployment.')
get_logger(conf).warning(ssl_warning_message)
print(ssl_warning_message)
return sock
开发者ID:mahak,项目名称:swift,代码行数:57,代码来源:wsgi.py
示例4: test_get_logger_console
def test_get_logger_console(self):
reload(utils) # reset get_logger attrs
logger = utils.get_logger(None)
self.assertFalse(hasattr(utils.get_logger, 'console'))
logger = utils.get_logger(None, log_to_console=True)
self.assert_(hasattr(utils.get_logger, 'console'))
self.assert_(isinstance(utils.get_logger.console,
logging.StreamHandler))
# make sure you can't have two console handlers
old_handler = utils.get_logger.console
logger = utils.get_logger(None, log_to_console=True)
self.assertNotEquals(utils.get_logger.console, old_handler)
logger.logger.removeHandler(utils.get_logger.console)
开发者ID:edwardt,项目名称:swift,代码行数:13,代码来源:test_utils.py
示例5: test_capture_stdio
def test_capture_stdio(self):
# stubs
logger = utils.get_logger(None, 'dummy')
# mock utils system modules
_orig_sys = utils.sys
_orig_os = utils.os
try:
utils.sys = MockSys()
utils.os = MockOs()
# basic test
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, utils.sys.stdio_fds)
self.assert_(isinstance(utils.sys.stdout, utils.LoggerFileObject))
self.assert_(isinstance(utils.sys.stderr, utils.LoggerFileObject))
# reset; test same args, but exc when trying to close stdio
utils.os = MockOs(raise_funcs=('dup2',))
utils.sys = MockSys()
# test unable to close stdio
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [])
self.assert_(isinstance(utils.sys.stdout, utils.LoggerFileObject))
self.assert_(isinstance(utils.sys.stderr, utils.LoggerFileObject))
# reset; test some other args
utils.os = MockOs()
utils.sys = MockSys()
logger = utils.get_logger(None, log_to_console=True)
# test console log
utils.capture_stdio(logger, capture_stdout=False,
capture_stderr=False)
self.assert_(utils.sys.excepthook is not None)
# when logging to console, stderr remains open
self.assertEquals(utils.os.closed_fds, utils.sys.stdio_fds[:2])
reset_loggers()
# stdio not captured
self.assertFalse(isinstance(utils.sys.stdout,
utils.LoggerFileObject))
self.assertFalse(isinstance(utils.sys.stderr,
utils.LoggerFileObject))
reset_loggers()
finally:
utils.sys = _orig_sys
utils.os = _orig_os
开发者ID:AsylumCorp,项目名称:swift,代码行数:51,代码来源:test_utils.py
示例6: get_socket
def get_socket(conf):
"""Bind socket to bind ip:port in conf
:param conf: Configuration dict to read settings from
:returns : a socket object as returned from socket.listen or
ssl.wrap_socket if conf specifies cert_file
"""
try:
bind_port = int(conf["bind_port"])
except (ValueError, KeyError, TypeError):
raise ConfigFilePortError()
bind_addr = (conf.get("bind_ip", "0.0.0.0"), bind_port)
address_family = [
addr[0]
for addr in socket.getaddrinfo(bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
if addr[0] in (socket.AF_INET, socket.AF_INET6)
][0]
sock = None
bind_timeout = int(conf.get("bind_timeout", 30))
retry_until = time.time() + bind_timeout
warn_ssl = False
while not sock and time.time() < retry_until:
try:
sock = listen(bind_addr, backlog=int(conf.get("backlog", 4096)), family=address_family)
if "cert_file" in conf:
warn_ssl = True
sock = ssl.wrap_socket(sock, certfile=conf["cert_file"], keyfile=conf["key_file"])
except socket.error as err:
if err.args[0] != errno.EADDRINUSE:
raise
sleep(0.1)
if not sock:
raise Exception(
_("Could not bind to %s:%s " "after trying for %s seconds") % (bind_addr[0], bind_addr[1], bind_timeout)
)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# in my experience, sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if hasattr(socket, "TCP_KEEPIDLE"):
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)
if warn_ssl:
ssl_warning_message = _(
"WARNING: SSL should only be enabled for "
"testing purposes. Use external SSL "
"termination for a production deployment."
)
get_logger(conf).warning(ssl_warning_message)
print(ssl_warning_message)
return sock
开发者ID:vandanashah,项目名称:swift,代码行数:51,代码来源:wsgi.py
示例7: __init__
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf, log_route='lite-swauth')
self.profile_path = 'profile'
self.super_admin_key = conf.get('super_admin_key')
if not self.super_admin_key:
msg = 'No super_admin_key set in conf file; ' \
'Swauth administration features will be disabled.'
try:
self.logger.warn(msg)
except Exception:
pass
self.auth_prefix = conf.get('auth_prefix', '/auth/')
if not self.auth_prefix:
self.auth_prefix = '/auth/'
if self.auth_prefix[0] != '/':
self.auth_prefix = '/' + self.auth_prefix
if self.auth_prefix[-1] != '/':
self.auth_prefix += '/'
self.version = 'v2'
# url for whitelist objects
# Example: /v1/liteauth/whitelist
self.whitelist_url = conf.get('whitelist_url', '').lower().rstrip('/')
if not self.whitelist_url:
raise ValueError('whitelist_url not set in config file')
# url for invite objects
# Example: /v1/liteauth/invites
self.invite_url = conf.get('invite_url', '').lower().rstrip('/')
if not self.invite_url:
raise ValueError('invite_url not set in config file')
开发者ID:mgeisler,项目名称:liteauth,代码行数:31,代码来源:swauth_manager.py
示例8: __init__
def __init__(self, conf):
"""
Creates a new WSGI application for the Swift Object Server. An
example configuration is given at
<source-dir>/etc/object-server.conf-sample or
/etc/swift/object-server.conf-sample.
"""
self.logger = get_logger(conf, log_route='object-server')
self.devices = conf.get('devices', '/srv/node/')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.disk_chunk_size = int(conf.get('disk_chunk_size', 65536))
self.network_chunk_size = int(conf.get('network_chunk_size', 65536))
self.log_requests = conf.get('log_requests', 't')[:1].lower() == 't'
self.max_upload_time = int(conf.get('max_upload_time', 86400))
self.slow = int(conf.get('slow', 0))
self.bytes_per_sync = int(conf.get('mb_per_sync', 512)) * 1024 * 1024
default_allowed_headers = '''
content-disposition,
content-encoding,
x-delete-at,
x-object-manifest,
'''
self.allowed_headers = set(i.strip().lower() for i in
conf.get('allowed_headers',
default_allowed_headers).split(',') if i.strip() and
i.strip().lower() not in DISALLOWED_HEADERS)
self.expiring_objects_account = \
(conf.get('auto_create_account_prefix') or '.') + \
'expiring_objects'
self.expiring_objects_container_divisor = \
int(conf.get('expiring_objects_container_divisor') or 86400)
开发者ID:Nupta,项目名称:swift,代码行数:34,代码来源:server.py
示例9: __init__
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf, log_route='liteauth')
self.provider = load_oauth_provider(
conf.get('oauth_provider', 'google_oauth'))
self.auth_endpoint = conf.get('auth_endpoint', '')
if not self.auth_endpoint:
raise ValueError('auth_endpoint not set in config file')
if isinstance(self.auth_endpoint, unicode):
self.auth_endpoint = self.auth_endpoint.encode('utf-8')
parsed_path = urlparse(self.auth_endpoint)
if not parsed_path.netloc:
raise ValueError('auth_endpoint is invalid in config file')
self.auth_domain = parsed_path.netloc
self.login_path = parsed_path.path
self.scheme = parsed_path.scheme
if self.scheme != 'https':
raise ValueError('auth_endpoint must have https:// scheme')
# by default service_domain can be extracted from the endpoint
# in case where auth domain is different from service domain
# you need to set up the service domain separately
# Example:
# auth_endpoint = https://auth.example.com/login
# service_domain = https://www.example.com
self.service_domain = conf.get('service_domain',
'%s://%s'
% (self.scheme, self.auth_domain))
self.storage_driver = None
self.oauth_login_timeout = 3600
开发者ID:pkit,项目名称:liteauth,代码行数:30,代码来源:oauth_login.py
示例10: __init__
def __init__(self, conf):
self.logger = get_logger(conf)
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.replicator_rpc = \
ReplicatorRpc(self.root, DATADIR, AccountBroker, self.mount_check)
开发者ID:edwardt,项目名称:swift,代码行数:7,代码来源:server.py
示例11: __init__
def __init__(self, app, conf):
self.app = app
self.logger = get_logger(conf)
# TODO Check pipeline
self.verb_acl = {}
verb_acl = conf.get('verb_acl', None)
if verb_acl is None:
raise ValueError(
'verb_acl: missing "verb_acl" configuration entry')
for acl in verb_acl.split(';'):
if not acl:
continue
if acl.count(':') != 1:
raise ValueError('verb_acl: bad format: "%s"' % acl)
methods, blocks = acl.split(':', 1)
for method in methods.split(','):
if not method:
raise ValueError(
'verb_acl: bad format: missing method: "%s"' % acl)
for block in blocks.split(','):
if not block:
raise ValueError(
'verb_acl: bad format: empty address block: "%s"' %
acl)
self.verb_acl.setdefault(method.upper(), []).append(block)
self.logger.debug("Verb ACL: " + str(self.verb_acl))
开发者ID:jfsmig,项目名称:oio-swift,代码行数:28,代码来源:verb_acl.py
示例12: test_run_daemon
def test_run_daemon(self):
sample_conf = "[my-daemon]\nuser = %s\n" % getuser()
with tmpfile(sample_conf) as conf_file:
with mock.patch.dict('os.environ', {'TZ': ''}):
with mock.patch('time.tzset') as mock_tzset:
daemon.run_daemon(MyDaemon, conf_file)
self.assertTrue(MyDaemon.forever_called)
self.assertEqual(os.environ['TZ'], 'UTC+0')
self.assertEqual(mock_tzset.mock_calls, [mock.call()])
daemon.run_daemon(MyDaemon, conf_file, once=True)
self.assertEqual(MyDaemon.once_called, True)
# test raise in daemon code
with mock.patch.object(MyDaemon, 'run_once', MyDaemon.run_raise):
self.assertRaises(OSError, daemon.run_daemon, MyDaemon,
conf_file, once=True)
# test user quit
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server', log_route='server')
with mock.patch.object(MyDaemon, 'run_forever', MyDaemon.run_quit):
daemon.run_daemon(MyDaemon, conf_file, logger=logger)
self.assertTrue('user quit' in sio.getvalue().lower())
# test missing section
sample_conf = "[default]\nuser = %s\n" % getuser()
with tmpfile(sample_conf) as conf_file:
self.assertRaisesRegexp(SystemExit,
'Unable to find my-daemon '
'config section in.*',
daemon.run_daemon, MyDaemon,
conf_file, once=True)
开发者ID:clayg,项目名称:swift,代码行数:34,代码来源:test_daemon.py
示例13: __init__
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, log_route='container-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300))
self.account_ring = None
self.concurrency = int(conf.get('concurrency', 4))
self.slowdown = float(conf.get('slowdown', 0.01))
self.node_timeout = float(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.no_changes = 0
self.successes = 0
self.failures = 0
self.account_suppressions = {}
self.account_suppression_time = \
float(conf.get('account_suppression_time', 60))
self.new_account_suppressions = None
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "container.recon")
self.user_agent = 'container-updater %s' % os.getpid()
开发者ID:ISCAS-VDI,项目名称:swift-base,代码行数:25,代码来源:updater.py
示例14: __init__
def __init__(self, conf, logger=None):
super(ContainerController, self).__init__(conf)
self.logger = logger or get_logger(conf, log_route='container-server')
self.log_requests = config_true_value(conf.get('log_requests', 'true'))
self.root = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.node_timeout = int(conf.get('node_timeout', 3))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
#: ContainerSyncCluster instance for validating sync-to values.
self.realms_conf = ContainerSyncRealms(
os.path.join(
conf.get('swift_dir', '/etc/swift'),
'container-sync-realms.conf'),
self.logger)
#: The list of hosts we're allowed to send syncs to. This can be
#: overridden by data in self.realms_conf
self.allowed_sync_hosts = [
h.strip()
for h in conf.get('allowed_sync_hosts', '127.0.0.1').split(',')
if h.strip()]
self.replicator_rpc = ContainerReplicatorRpc(
self.root, DATADIR, ContainerBroker, self.mount_check,
logger=self.logger)
self.auto_create_account_prefix = \
conf.get('auto_create_account_prefix') or '.'
if config_true_value(conf.get('allow_versions', 'f')):
self.save_headers.append('x-versions-location')
swift.common.db.DB_PREALLOCATION = \
config_true_value(conf.get('db_preallocation', 'f'))
开发者ID:steveruckdashel,项目名称:swift,代码行数:29,代码来源:server.py
示例15: test_delegate_methods_with_metric_prefix
def test_delegate_methods_with_metric_prefix(self):
self.logger = utils.get_logger({
'log_statsd_host': 'localhost',
'log_statsd_port': str(self.port),
'log_statsd_metric_prefix': 'alpha.beta',
}, 'pfx')
self.assertStat('alpha.beta.pfx.some.counter:1|c',
self.logger.increment, 'some.counter')
self.assertStat('alpha.beta.pfx.some.counter:-1|c',
self.logger.decrement, 'some.counter')
self.assertStat('alpha.beta.pfx.some.operation:4760.0|ms',
self.logger.timing, 'some.operation', 4.76 * 1000)
self.assertStatMatches(
'alpha\.beta\.pfx\.another\.op:\d+\.\d+\|ms',
self.logger.timing_since, 'another.op', time.time())
self.assertStat('alpha.beta.pfx.another.counter:3|c',
self.logger.update_stats, 'another.counter', 3)
self.logger.set_statsd_prefix('')
self.assertStat('alpha.beta.some.counter:1|c|@0.9912',
self.logger.increment, 'some.counter',
sample_rate=0.9912)
self.assertStat('alpha.beta.some.counter:-1|c|@0.9912',
self.logger.decrement, 'some.counter', 0.9912)
self.assertStat('alpha.beta.some.operation:4900.0|ms|@0.9912',
self.logger.timing, 'some.operation', 4.9 * 1000,
sample_rate=0.9912)
self.assertStatMatches('alpha\.beta\.another\.op:\d+\.\d+\|ms|@0.9912',
self.logger.timing_since, 'another.op',
time.time(), sample_rate=0.9912)
self.assertStat('alpha.beta.another.counter:3|c|@0.9912',
self.logger.update_stats, 'another.counter', 3,
sample_rate=0.9912)
开发者ID:AsylumCorp,项目名称:swift,代码行数:33,代码来源:test_utils.py
示例16: __init__
def __init__(self, app, conf):
"""
This function is called when Swift Proxy inits.
"""
self.app = app
self.conf = conf
self.logger = get_logger(conf, log_route='customauth')
self.log_headers = config_true_value(conf.get('log_headers', 'f'))
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
self.reseller_prefix += '_'
self.logger.set_statsd_prefix('customauth.%s' % (
self.reseller_prefix if self.reseller_prefix else 'NONE',))
self.auth_prefix = conf.get('auth_prefix', '/auth/')
#Organization
self.organization_id = conf.get('organization_id', '57b69c457792482c8d817c4945c6c8a8')
#Keystone
self.keystone_auth_endpoint = conf.get('keystone_auth_endpoint', 'http://cloud.lab.fiware.org:4730/v2.0/tokens')
self.keystone_tenant_endpoint = conf.get('keystone_tenant_endpoint', 'http://cloud.lab.fiware.org:4730/v2.0/tenants')
if not self.auth_prefix or not self.auth_prefix.strip('/'):
self.logger.warning('Rewriting invalid auth prefix "%s" to '
'"/auth/" (Non-empty auth prefix path '
'is required)' % self.auth_prefix)
self.auth_prefix = '/auth/'
if self.auth_prefix[0] != '/':
self.auth_prefix = '/' + self.auth_prefix
if self.auth_prefix[-1] != '/':
self.auth_prefix += '/'
self.token_life = int(conf.get('token_life', 86400))
self.allow_overrides = config_true_value(
conf.get('allow_overrides', 't'))
self.storage_url_scheme = conf.get('storage_url_scheme', 'default')
self.logger.info('CustomAuth v1.3 loaded successfully')
开发者ID:FINESCE,项目名称:HybridCloudDataManagement,代码行数:35,代码来源:customauth.py
示例17: __init__
def __init__(self, app, conf):
self.app = app
swift_dir = conf.get('swift_dir', '/etc/swift')
self.account_ring_path = os.path.join(swift_dir, 'account.ring.gz')
self.container_ring_path = os.path.join(swift_dir, 'container.ring.gz')
self.object_ring_path = os.path.join(swift_dir, 'object.ring.gz')
self.logger = get_logger(conf, log_route='lxc-swift')
开发者ID:zfeldstein,项目名称:swift-lxc,代码行数:7,代码来源:swift_lxc_proxy.py
示例18: __init__
def __init__(self, conf):
"""
:param conf: configuration object obtained from ConfigParser
:param logger: logging object
"""
self.conf = conf
self.logger = get_logger(conf, log_route='object-replicator')
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.vm_test_mode = conf.get(
'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1')
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.port = int(conf.get('bind_port', 6000))
self.concurrency = int(conf.get('concurrency', 1))
self.stats_interval = int(conf.get('stats_interval', '300'))
self.object_ring = Ring(self.swift_dir, ring_name='object')
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
self.reclaim_age = int(conf.get('reclaim_age', 86400 * 7))
self.partition_times = []
self.run_pause = int(conf.get('run_pause', 30))
self.rsync_timeout = int(conf.get('rsync_timeout', 900))
self.rsync_io_timeout = conf.get('rsync_io_timeout', '30')
self.http_timeout = int(conf.get('http_timeout', 60))
self.lockup_timeout = int(conf.get('lockup_timeout', 1800))
self.recon_enable = conf.get(
'recon_enable', 'no').lower() in TRUE_VALUES
self.recon_cache_path = conf.get(
'recon_cache_path', '/var/cache/swift')
self.recon_object = os.path.join(self.recon_cache_path, "object.recon")
开发者ID:bn-emailops,项目名称:swift,代码行数:31,代码来源:replicator.py
示例19: filter_factory
def filter_factory(global_conf, **local_conf):
"""Returns the WSGI filter for use with paste.deploy."""
conf = global_conf.copy()
conf.update(local_conf)
defaults = {
'methods': 'GET HEAD PUT POST DELETE',
'incoming_remove_headers': DEFAULT_INCOMING_REMOVE_HEADERS,
'incoming_allow_headers': DEFAULT_INCOMING_ALLOW_HEADERS,
'outgoing_remove_headers': DEFAULT_OUTGOING_REMOVE_HEADERS,
'outgoing_allow_headers': DEFAULT_OUTGOING_ALLOW_HEADERS,
'allowed_digests': DEFAULT_ALLOWED_DIGESTS,
}
info_conf = {k: conf.get(k, v).split() for k, v in defaults.items()}
allowed_digests = set(digest.lower()
for digest in info_conf['allowed_digests'])
not_supported = allowed_digests - SUPPORTED_DIGESTS
if not_supported:
logger = get_logger(conf, log_route='tempurl')
logger.warning('The following digest algorithms are configured but '
'not supported: %s', ', '.join(not_supported))
allowed_digests -= not_supported
if not allowed_digests:
raise ValueError('No valid digest algorithms are configured '
'for tempurls')
info_conf['allowed_digests'] = sorted(allowed_digests)
register_swift_info('tempurl', **info_conf)
conf.update(info_conf)
return lambda app: TempURL(app, conf)
开发者ID:chenzhongtao,项目名称:swift,代码行数:32,代码来源:tempurl.py
示例20: __init__
def __init__(self, conf, logger=None):
self.conf = conf
self.logger = logger or get_logger(conf, log_route='object-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
self.interval = int(conf.get('interval', 300))
self.container_ring = None
self.concurrency = int(conf.get('concurrency', 1))
if 'slowdown' in conf:
self.logger.warning(
'The slowdown option is deprecated in favor of '
'objects_per_second. This option may be ignored in a '
'future release.')
objects_per_second = 1 / (
float(conf.get('slowdown', '0.01')) + 0.01)
else:
objects_per_second = 50
self.objects_running_time = 0
self.max_objects_per_second = \
float(conf.get('objects_per_second',
objects_per_second))
self.node_timeout = float(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.report_interval = float(conf.get('report_interval', 300))
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
self.stats = SweepStats()
开发者ID:chenzhongtao,项目名称:swift,代码行数:29,代码来源:updater.py
注:本文中的swift.common.utils.get_logger函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论