本文整理汇总了Python中swift.common.utils.readconf函数的典型用法代码示例。如果您正苦于以下问题:Python readconf函数的具体用法?Python readconf怎么用?Python readconf使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了readconf函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: has_expirer_section
def has_expirer_section(conf_path):
try:
readconf(conf_path, section_name="object-expirer")
except ValueError:
return False
else:
return True
开发者ID:mahak,项目名称:swift,代码行数:7,代码来源:manager.py
示例2: test_readconf
def test_readconf(self):
conf = """[section1]
foo = bar
[section2]
log_name = yarr"""
# setup a real file
with open("/tmp/test", "wb") as f:
f.write(conf)
make_filename = lambda: "/tmp/test"
# setup a file stream
make_fp = lambda: StringIO(conf)
for conf_object_maker in (make_filename, make_fp):
result = utils.readconf(conf_object_maker())
expected = {"log_name": None, "section1": {"foo": "bar"}, "section2": {"log_name": "yarr"}}
self.assertEquals(result, expected)
result = utils.readconf(conf_object_maker(), "section1")
expected = {"log_name": "section1", "foo": "bar"}
self.assertEquals(result, expected)
result = utils.readconf(conf_object_maker(), "section2").get("log_name")
expected = "yarr"
self.assertEquals(result, expected)
result = utils.readconf(conf_object_maker(), "section1", log_name="foo").get("log_name")
expected = "foo"
self.assertEquals(result, expected)
result = utils.readconf(conf_object_maker(), "section1", defaults={"bar": "baz"})
expected = {"log_name": "section1", "foo": "bar", "bar": "baz"}
self.assertEquals(result, expected)
self.assertRaises(SystemExit, utils.readconf, "/tmp/test", "section3")
os.unlink("/tmp/test")
self.assertRaises(SystemExit, utils.readconf, "/tmp/test")
开发者ID:colecrawford,项目名称:swift,代码行数:31,代码来源:test_utils.py
示例3: test_readconf
def test_readconf(self):
conf = '''[section1]
foo = bar
[section2]
log_name = yarr'''
f = open('/tmp/test', 'wb')
f.write(conf)
f.close()
result = utils.readconf('/tmp/test')
expected = {'log_name': None,
'section1': {'foo': 'bar'},
'section2': {'log_name': 'yarr'}}
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1')
expected = {'log_name': 'section1', 'foo': 'bar'}
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section2').get('log_name')
expected = 'yarr'
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1',
log_name='foo').get('log_name')
expected = 'foo'
self.assertEquals(result, expected)
result = utils.readconf('/tmp/test', 'section1',
defaults={'bar': 'baz'})
expected = {'log_name': 'section1', 'foo': 'bar', 'bar': 'baz'}
self.assertEquals(result, expected)
os.unlink('/tmp/test')
开发者ID:edwardt,项目名称:swift,代码行数:29,代码来源:test_utils.py
示例4: test_rsync_tempfile_timeout_auto_option
def test_rsync_tempfile_timeout_auto_option(self):
# if we don't have access to the replicator config section we'll use
# our default
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
# if the rsync_tempfile_timeout option is set explicitly we use that
self.conf['rsync_tempfile_timeout'] = '1800'
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 1800)
# if we have a real config we can be a little smarter
config_path = os.path.join(self.testdir, 'objserver.conf')
stub_config = """
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(config_path, 'w') as f:
f.write(textwrap.dedent(stub_config))
# the Daemon loader will hand the object-auditor config to the
# auditor who will build the workers from it
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is no object-replicator section we still have to fall back
# to default because we can't parse the config for that section!
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 86400)
stub_config = """
[object-replicator]
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if the object-replicator section will parse but does not override
# the default rsync_timeout we assume the default rsync_timeout value
# and add 15mins
self.assertEqual(auditor_worker.rsync_tempfile_timeout,
replicator.DEFAULT_RSYNC_TIMEOUT + 900)
stub_config = """
[DEFAULT]
reclaim_age = 1209600
[object-replicator]
rsync_timeout = 3600
[object-auditor]
rsync_tempfile_timeout = auto
"""
with open(os.path.join(self.testdir, 'objserver.conf'), 'w') as f:
f.write(textwrap.dedent(stub_config))
conf = readconf(config_path, 'object-auditor')
auditor_worker = auditor.AuditorWorker(conf, self.logger,
self.rcache, self.devices)
# if there is an object-replicator section with a rsync_timeout
# configured we'll use that value (3600) + 900
self.assertEqual(auditor_worker.rsync_tempfile_timeout, 3600 + 900)
开发者ID:Ahiknsr,项目名称:swift,代码行数:58,代码来源:test_auditor.py
示例5: get_config
def get_config():
"""
Attempt to get a functional config dictionary.
"""
config_file = 'test/dispatcher_test.conf'
config = {}
try:
try:
config = readconf(config_file, 'app:dispatcher')
except MissingSectionHeaderError:
config_fp = StringIO('[func_test]\n' + open(config_file).read())
config = readconf(config_fp, 'func_test')
except SystemExit:
print >>sys.stderr, 'UNABLE TO READ FUNCTIONAL TESTS CONFIG FILE'
return config
开发者ID:AsherBond,项目名称:colony,代码行数:15,代码来源:__init__.py
示例6: _get_objects_dir
def _get_objects_dir(self, onode):
device = onode["device"]
node_id = (onode["port"] - 6000) / 10
obj_server_conf = readconf(self.configs["object-server"][node_id])
devices = obj_server_conf["app:object-server"]["devices"]
obj_dir = "%s/%s" % (devices, device)
return obj_dir
开发者ID:portante,项目名称:swift,代码行数:7,代码来源:test_empty_device_handoff.py
示例7: setUp
def setUp(self):
super(TestDbRsyncReplicator, self).setUp()
cont_configs = [utils.readconf(p, 'container-replicator')
for p in self.configs['container-replicator'].values()]
# Do more than per_diff object PUTs, to force rsync instead of usync
self.object_puts = 1 + max(int(c.get('per_diff', '1000'))
for c in cont_configs)
开发者ID:jgmerritt,项目名称:swift,代码行数:7,代码来源:test_db_replicator.py
示例8: _get_objects_dir
def _get_objects_dir(self, onode):
device = onode["device"]
_, node_id = get_server_number((onode["ip"], onode["port"]), self.ipport2server)
obj_server_conf = readconf(self.configs["object-server"][node_id])
devices = obj_server_conf["app:object-server"]["devices"]
obj_dir = "%s/%s" % (devices, device)
return obj_dir
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:7,代码来源:test_empty_device_handoff.py
示例9: _get_objects_dir
def _get_objects_dir(self, onode):
device = onode['device']
node_id = (onode['port'] - 6000) / 10
obj_server_conf = readconf(self.configs['object'] % node_id)
devices = obj_server_conf['app:object-server']['devices']
obj_dir = '%s/%s' % (devices, device)
return obj_dir
开发者ID:BlueSkyChina,项目名称:swift,代码行数:7,代码来源:test_empty_device_handoff.py
示例10: __init__
def __init__(self, app, conf):
self.app = app
keymaster_config_path = conf.get('keymaster_config_path')
if keymaster_config_path:
if any(opt in conf for opt in ('encryption_root_secret',)):
raise ValueError('keymaster_config_path is set, but there '
'are other config options specified!')
conf = readconf(keymaster_config_path, 'keymaster')
self.root_secret = conf.get('encryption_root_secret')
try:
# b64decode will silently discard bad characters, but we should
# treat them as an error
if not isinstance(self.root_secret, six.string_types) or any(
c not in string.digits + string.ascii_letters + '/+\r\n'
for c in self.root_secret.strip('\r\n=')):
raise ValueError
self.root_secret = base64.b64decode(self.root_secret)
if len(self.root_secret) < 32:
raise ValueError
except (TypeError, ValueError):
raise ValueError(
'encryption_root_secret option in %s must be a base64 '
'encoding of at least 32 raw bytes' % (
keymaster_config_path or 'proxy-server.conf'))
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:26,代码来源:keymaster.py
示例11: load_server_conf
def load_server_conf(conf, sections):
server_conf_file = conf.get('__file__', None)
if server_conf_file:
server_conf = readconf(server_conf_file)
for sect in sections:
if server_conf.get(sect, None):
conf.update(server_conf[sect])
开发者ID:Prosunjit,项目名称:zerocloud,代码行数:7,代码来源:common.py
示例12: setUp
def setUp(self):
resetswift()
try:
self.ipport2server = {}
self.configs = defaultdict(dict)
self.account_ring = get_ring(
'account',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.container_ring = get_ring(
'container',
self.acct_cont_required_replicas,
self.acct_cont_required_devices,
ipport2server=self.ipport2server,
config_paths=self.configs)
self.policy = get_policy(**self.policy_requirements)
self.object_ring = get_ring(
self.policy.ring_name,
self.obj_required_replicas,
self.obj_required_devices,
server='object',
ipport2server=self.ipport2server,
config_paths=self.configs)
self.servers_per_port = any(
int(readconf(c, section_name='object-replicator').get(
'servers_per_port', '0'))
for c in self.configs['object-replicator'].values())
Manager(['main']).start(wait=False)
for ipport in self.ipport2server:
check_server(ipport, self.ipport2server)
proxy_ipport = ('127.0.0.1', 8080)
self.ipport2server[proxy_ipport] = 'proxy'
self.url, self.token, self.account = check_server(
proxy_ipport, self.ipport2server)
self.account_1 = {
'url': self.url, 'token': self.token, 'account': self.account}
url2, token2 = get_auth(
'http://%s:%d/auth/v1.0' % proxy_ipport,
'test2:tester2', 'testing2')
self.account_2 = {
'url': url2, 'token': token2, 'account': url2.split('/')[-1]}
head_account(url2, token2) # sanity check
self.replicators = Manager(
['account-replicator', 'container-replicator',
'object-replicator'])
self.updaters = Manager(['container-updater', 'object-updater'])
except BaseException:
try:
raise
finally:
try:
Manager(['all']).kill()
except Exception:
pass
开发者ID:prashanthpai,项目名称:swift,代码行数:60,代码来源:common.py
示例13: device_dir
def device_dir(self, server, node):
server_type, config_number = get_server_number(
(node['ip'], node['port']), self.ipport2server)
repl_server = '%s-replicator' % server_type
conf = readconf(self.configs[repl_server][config_number],
section_name=repl_server)
return os.path.join(conf['devices'], node['device'])
开发者ID:Ahiknsr,项目名称:swift,代码行数:7,代码来源:common.py
示例14: _get_root_secret
def _get_root_secret(self, conf):
"""
This keymaster requires its ``encryption_root_secret`` option to be
set. This must be set before first use to a value that is a base64
encoding of at least 32 bytes. The encryption root secret is stored
in either proxy-server.conf, or in an external file referenced from
proxy-server.conf using ``keymaster_config_path``.
:param conf: the keymaster config section from proxy-server.conf
:type conf: dict
:return: the encryption root secret binary bytes
:rtype: bytearray
"""
if self.keymaster_config_path:
keymaster_opts = ['encryption_root_secret']
if any(opt in conf for opt in keymaster_opts):
raise ValueError('keymaster_config_path is set, but there '
'are other config options specified: %s' %
", ".join(list(
set(keymaster_opts).intersection(conf))))
conf = readconf(self.keymaster_config_path, 'keymaster')
b64_root_secret = conf.get('encryption_root_secret')
try:
binary_root_secret = strict_b64decode(b64_root_secret,
allow_line_breaks=True)
if len(binary_root_secret) < 32:
raise ValueError
return binary_root_secret
except ValueError:
raise ValueError(
'encryption_root_secret option in %s must be a base64 '
'encoding of at least 32 raw bytes' % (
self.keymaster_config_path or 'proxy-server.conf'))
开发者ID:chenzhongtao,项目名称:swift,代码行数:34,代码来源:keymaster.py
示例15: get_lfs
def get_lfs(conf, ring, datadir, default_port, logger):
"""
Returns LFS for current node
:param conf: server configuration
:param ring: server ring file
:param datadir: server data directory
:param default_port: default server port
:param logger: server logger
:returns : LFS storage class
:raises SwiftConfigurationError: if fs is invalid
"""
fs = conf.get('fs', 'xfs')
try:
module_name = 'swift_lfs.fs.%s' % fs
cls_name = 'LFS%s' % fs.upper()
module = __import__(module_name, fromlist=[cls_name])
cls = getattr(module, cls_name)
if '__file__' in conf and fs in conf:
fs_conf = readconf(conf['__file__'], fs)
conf = dict(conf, **fs_conf)
return cls(conf, ring, datadir, default_port, logger)
except ImportError, e:
raise SwiftConfigurationError(
_('Cannot load LFS. Invalid FS: %s. %s') % (fs, e))
开发者ID:Nexenta,项目名称:lfs,代码行数:25,代码来源:__init__.py
示例16: get_config
def get_config():
"""
Attempt to get a functional config dictionary.
"""
config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE',
'/etc/gluster-object/func_test.conf')
config = {}
try:
try:
config = readconf(config_file, 'func_test')
except MissingSectionHeaderError:
config_fp = StringIO('[func_test]\n' + open(config_file).read())
config = readconf(config_fp, 'func_test')
except SystemExit:
print >>sys.stderr, 'UNABLE TO READ FUNCTIONAL TESTS CONFIG FILE'
return config
开发者ID:Gaurav-Gangalwar,项目名称:UFO,代码行数:16,代码来源:__init__.py
示例17: get_config
def get_config(section_name=None, defaults=None):
"""
Attempt to get a test config dictionary.
:param section_name: the section to read (all sections if not defined)
:param defaults: an optional dictionary namespace of defaults
"""
config = {}
if defaults is not None:
config.update(defaults)
config_file = os.environ.get('SWIFT_TEST_CONFIG_FILE',
'/etc/swift/test.conf')
try:
config = readconf(config_file, section_name)
except IOError:
if not os.path.exists(config_file):
print('Unable to read test config %s - file not found'
% config_file, file=sys.stderr)
elif not os.access(config_file, os.R_OK):
print('Unable to read test config %s - permission denied'
% config_file, file=sys.stderr)
except ValueError as e:
print(e)
return config
开发者ID:bebule,项目名称:swift,代码行数:25,代码来源:__init__.py
示例18: run_daemon
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
"""
Loads settings from conf, then instantiates daemon "klass" and runs the
daemon with the specified once kwarg. The section_name will be derived
from the daemon "klass" if not provided (e.g. ObjectReplicator =>
object-replicator).
:param klass: Class to instantiate, subclass of common.daemon.Daemon
:param conf_file: Path to configuration file
:param section_name: Section name from conf file to load config from
:param once: Passed to daemon run method
"""
# very often the config section_name is based on the class name
# the None singleton will be passed through to readconf as is
if section_name is '':
section_name = sub(r'([a-z])([A-Z])', r'\1-\2',
klass.__name__).lower()
conf = utils.readconf(conf_file, section_name,
log_name=kwargs.get('log_name'))
# once on command line (i.e. daemonize=false) will over-ride config
once = once or \
conf.get('daemonize', 'true').lower() not in utils.TRUE_VALUES
# pre-configure logger
if 'logger' in kwargs:
logger = kwargs.pop('logger')
else:
logger = utils.get_logger(conf, conf.get('log_name', section_name),
log_to_console=kwargs.pop('verbose', False), log_route=section_name)
try:
klass(conf).run(once=once, **kwargs)
except KeyboardInterrupt:
logger.info('User quit')
logger.info('Exited')
开发者ID:BillTheBest,项目名称:swift,代码行数:35,代码来源:daemon.py
示例19: collect_tasks
def collect_tasks(self):
tasks = {"account": {}, "container": {}, "object": {}}
for t in tasks:
for level in ("partition", "suffix", "hsh", "item"):
tasks[t][level] = []
for section in self.conf:
try:
if type(self.conf[section]) is not dict:
continue
if "data_type" in self.conf[section] and self.conf[section]:
datatype = self.conf[section]["data_type"]
level = self.conf[section]["level"]
task_module = self.conf[section]["task_module"]
task_class = self.conf[section]["task_class"]
task_conf = self.conf[section]["task_conf"]
task_method = self.conf[section].get("task_method", "process_%s" % level)
_module = __import__(task_module)
splits = task_module.split(".")
for split in splits[1:]:
_module = getattr(_module, split)
_class = getattr(_module, task_class)
conf = utils.readconf(task_conf, section)
_object = _class(conf)
self.conf[section]["func"] = getattr(_object, task_method)
self.conf[section]["cycles"] = int(self.conf[section].get("cycles", 1))
tasks[datatype][level].append(section)
except:
print "Failed to parse config file section %s - %s" % (section, sys.exc_info())
traceback.print_exc()
return tasks
开发者ID:shaolinjr,项目名称:SwiftCrawler,代码行数:30,代码来源:crawl.py
示例20: test_locked_container_dbs
def test_locked_container_dbs(self):
def run_test(num_locks, catch_503):
container = 'container-%s' % uuid4()
client.put_container(self.url, self.token, container)
db_files = self._get_container_db_files(container)
db_conns = []
for i in range(num_locks):
db_conn = connect(db_files[i])
db_conn.execute('begin exclusive transaction')
db_conns.append(db_conn)
if catch_503:
try:
client.delete_container(self.url, self.token, container)
except client.ClientException as err:
self.assertEqual(err.http_status, 503)
else:
self.fail("Expected ClientException but didn't get it")
else:
client.delete_container(self.url, self.token, container)
proxy_conf = readconf(self.configs['proxy-server'],
section_name='app:proxy-server')
node_timeout = int(proxy_conf.get('node_timeout', 10))
pool = GreenPool()
try:
with Timeout(node_timeout + 5):
pool.spawn(run_test, 1, False)
pool.spawn(run_test, 2, True)
pool.spawn(run_test, 3, True)
pool.waitall()
except Timeout as err:
raise Exception(
"The server did not return a 503 on container db locks, "
"it just hangs: %s" % err)
开发者ID:mahak,项目名称:swift,代码行数:35,代码来源:test_container_failures.py
注:本文中的swift.common.utils.readconf函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论