本文整理汇总了Python中swift.common.utils.storage_directory函数的典型用法代码示例。如果您正苦于以下问题:Python storage_directory函数的具体用法?Python storage_directory怎么用?Python storage_directory使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了storage_directory函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_DELETE
def test_DELETE(self):
""" Test swift.object_server.ObjectController.DELETE """
req = Request.blank('/sda1/p/a/c',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 400)
# self.assertRaises(KeyError, self.object_controller.DELETE, req)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 404)
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={
'X-Timestamp': timestamp,
'Content-Type': 'application/octet-stream',
'Content-Length': '4',
})
req.body = 'test'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
timestamp = normalize_timestamp(float(timestamp) - 1)
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.ts')
self.assert_(os.path.isfile(objfile))
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'X-Timestamp': timestamp})
resp = self.object_controller.DELETE(req)
self.assertEquals(resp.status_int, 204)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.ts')
self.assert_(os.path.isfile(objfile))
开发者ID:edwardt,项目名称:swift,代码行数:56,代码来源:test_server.py
示例2: print_ring_locations
def print_ring_locations(ring, datadir, account, container=None):
"""
print out ring locations of specified type
:param ring: ring instance
:param datadir: high level directory to store account/container/objects
:param account: account name
:param container: container name
"""
if ring is None or datadir is None or account is None:
raise ValueError('None type')
storage_type = 'account'
if container:
storage_type = 'container'
try:
part, nodes = ring.get_nodes(account, container, None)
except (ValueError, AttributeError):
raise ValueError('Ring error')
else:
path_hash = hash_path(account, container, None)
print '\nRing locations:'
for node in nodes:
print (' %s:%s - /srv/node/%s/%s/%s.db' %
(node['ip'], node['port'], node['device'],
storage_directory(datadir, part, path_hash),
path_hash))
print '\nnote: /srv/node is used as default value of `devices`, the ' \
'real value is set in the %s config file on each storage node.' % \
storage_type
开发者ID:HoO-Group,项目名称:swift,代码行数:29,代码来源:info.py
示例3: test_PUT_overwrite
def test_PUT_overwrite(self):
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': normalize_timestamp(time()),
'Content-Length': '6',
'Content-Type': 'application/octet-stream'})
req.body = 'VERIFY'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
sleep(.00001)
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'Content-Encoding': 'gzip'})
req.body = 'VERIFY TWO'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY TWO')
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '10',
'ETag': 'b381a4c5dab1eaa1eb9711fa647cd039',
'Content-Type': 'text/plain',
'name': '/a/c/o',
'Content-Encoding': 'gzip'})
开发者ID:edwardt,项目名称:swift,代码行数:31,代码来源:test_server.py
示例4: test_PUT_user_metadata
def test_PUT_user_metadata(self):
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Timestamp': timestamp,
'Content-Type': 'text/plain',
'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
req.body = 'VERIFY THREE'
resp = self.object_controller.PUT(req)
self.assertEquals(resp.status_int, 201)
objfile = os.path.join(self.testdir, 'sda1',
storage_directory(object_server.DATADIR, 'p',
hash_path('a', 'c', 'o')),
timestamp + '.data')
self.assert_(os.path.isfile(objfile))
self.assertEquals(open(objfile).read(), 'VERIFY THREE')
self.assertEquals(pickle.loads(getxattr(objfile,
object_server.METADATA_KEY)),
{'X-Timestamp': timestamp,
'Content-Length': '12',
'ETag': 'b114ab7b90d9ccac4bd5d99cc7ebb568',
'Content-Type': 'text/plain',
'name': '/a/c/o',
'X-Object-Meta-1': 'One',
'X-Object-Meta-Two': 'Two'})
开发者ID:edwardt,项目名称:swift,代码行数:26,代码来源:test_server.py
示例5: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ""
if with_ts:
name_hash = hash_path("a", "c", "o")
dir_path = os.path.join(self.devices, "sda", storage_directory(DATADIR, "0", name_hash))
ts_file_path = os.path.join(dir_path, "99999.ts")
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, "w")
fp.close()
etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath):
etag = etag.hexdigest()
metadata = {"ETag": etag, "X-Timestamp": str(normalize_timestamp(time.time())), "Content-Length": 10}
self.disk_file.put(fd, tmppath, metadata)
etag = md5()
etag = etag.hexdigest()
metadata["ETag"] = etag
write_metadata(fd, metadata)
if self.disk_file.data_file:
return self.disk_file.data_file
return ts_file_path
开发者ID:jness,项目名称:python-swift,代码行数:26,代码来源:test_auditor.py
示例6: __init__
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
uid=DEFAULT_UID, gid=DEFAULT_GID):
self.disk_chunk_size = disk_chunk_size
#Don't support obj_name ending/begining with '/', like /a, a/, /a/b/ etc
obj = obj.strip('/')
if '/' in obj:
self.obj_path, self.obj = obj.rsplit('/', 1)
else:
self.obj_path = ''
self.obj = obj
if self.obj_path:
self.name = '/'.join((container, self.obj_path))
else:
self.name = container
#Absolute path for obj directory.
self.datadir = os.path.join(path, device,
storage_directory(DATADIR, partition, self.name))
self.device_path = os.path.join(path, device)
self.container_path = os.path.join(path, device, container)
self.tmpdir = os.path.join(path, device, 'tmp')
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.is_dir = False
self.is_valid = True
self.uid = int(uid)
self.gid = int(gid)
if not os.path.exists(self.datadir + '/' + self.obj):
return
self.data_file = os.path.join(self.datadir, self.obj)
self.metadata = read_metadata(self.datadir + '/' + self.obj)
if not self.metadata:
create_object_metadata(self.datadir + '/' + self.obj)
self.metadata = read_metadata(self.datadir + '/' + self.obj)
if not validate_object(self.metadata):
self.logger.error('Metadata validation failed %s %s' % \
(self.data_file, self.metadata))
self.metadata = {}
self.is_valid = False
self.data_file = None
return
if os.path.isdir(self.datadir + '/' + self.obj):
self.is_dir = True
else:
self.fp = do_open(self.data_file, 'rb')
if not keep_data_fp:
self.close(verify_file=False)
开发者ID:Gaurav-Gangalwar,项目名称:UFO,代码行数:60,代码来源:server.py
示例7: __init__
def __init__(self, path, device, partition, account, container, obj,
logger, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024),
iter_hook=None, threadpool=None, obj_dir='objects',
mount_check=False):
if mount_check and not check_mount(path, device):
raise DiskFileDeviceUnavailable()
self.disk_chunk_size = disk_chunk_size
self.bytes_per_sync = bytes_per_sync
self.iter_hook = iter_hook
self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = join(
path, device, storage_directory(obj_dir, partition, name_hash))
self.device_path = join(path, device)
self.tmpdir = join(path, device, 'tmp')
self.logger = logger
self._metadata = None
self.data_file = None
self._data_file_size = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.suppress_file_closing = False
self._verify_close = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
# FIXME(clayg): this attribute is set after open and affects the
# behavior of the class (i.e. public interface)
self.keep_cache = False
开发者ID:dionysus1016,项目名称:swift,代码行数:32,代码来源:diskfile.py
示例8: get_reconciler_broker
def get_reconciler_broker(self, timestamp):
"""
Get a local instance of the reconciler container broker that is
appropriate to enqueue the given timestamp.
:param timestamp: the timestamp of the row to be enqueued
:returns: a local reconciler broker
"""
container = get_reconciler_container_name(timestamp)
if self.reconciler_containers and \
container in self.reconciler_containers:
return self.reconciler_containers[container][1]
account = MISPLACED_OBJECTS_ACCOUNT
part = self.ring.get_part(account, container)
node = self.find_local_handoff_for_part(part)
if not node:
raise DeviceUnavailable(
'No mounted devices found suitable to Handoff reconciler '
'container %s in partition %s' % (container, part))
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, node['device'], db_dir, hsh + '.db')
broker = ContainerBroker(db_path, account=account, container=container)
if not os.path.exists(broker.db_file):
try:
broker.initialize(timestamp, 0)
except DatabaseAlreadyExists:
pass
if self.reconciler_containers is not None:
self.reconciler_containers[container] = part, broker, node['id']
return broker
开发者ID:chenzhongtao,项目名称:swift,代码行数:32,代码来源:replicator.py
示例9: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ''
if with_ts:
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(self.devices, 'sda',
storage_directory(DATADIR, '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, 'w')
fp.close()
etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath):
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
self.disk_file.put(fd, tmppath, metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(fd, metadata)
if self.disk_file.data_file:
return self.disk_file.data_file
return ts_file_path
开发者ID:BillTheBest,项目名称:swift,代码行数:31,代码来源:test_auditor.py
示例10: _get_account_broker
def _get_account_broker(self, drive, part, account, **kwargs):
hsh = hash_path(account)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + ".db")
kwargs.setdefault("account", account)
kwargs.setdefault("logger", self.logger)
return AccountBroker(db_path, **kwargs)
开发者ID:steveruckdashel,项目名称:swift,代码行数:7,代码来源:server.py
示例11: print_ring_locations
def print_ring_locations(ring, datadir, account, container=None):
"""
print out ring locations of specified type
:param ring: ring instance
:param datadir: high level directory to store account/container/objects
:param acount: account name
:param container: container name
"""
if ring is None or datadir is None or account is None:
raise ValueError("None type")
storage_type = "account"
if container:
storage_type = "container"
try:
part, nodes = ring.get_nodes(account, container, None)
except (ValueError, AttributeError):
raise ValueError("Ring error")
else:
path_hash = hash_path(account, container, None)
print "\nRing locations:"
for node in nodes:
print (
" %s:%s - /srv/node/%s/%s/%s.db"
% (node["ip"], node["port"], node["device"], storage_directory(datadir, part, path_hash), path_hash)
)
print "\nnote: /srv/node is used as default value of `devices`, the " "real value is set in the %s config file on each storage node." % storage_type
开发者ID:happyhehe,项目名称:swift,代码行数:27,代码来源:info.py
示例12: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ''
if with_ts:
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(
self.devices, 'sda',
storage_directory(get_data_dir(0), '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, 'w')
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
fp.close()
etag = md5()
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
writer.put(metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
return ts_file_path
开发者ID:AsherBond,项目名称:swift,代码行数:30,代码来源:test_auditor.py
示例13: tmp_dir
def tmp_dir(self, pool, partition, name_hash):
if self.fs_per_obj:
return os.path.join(self.root, pool,
storage_directory(self.srvdir, partition, name_hash), 'tmp')
elif self.fs_per_part:
return os.path.join(self.root, pool, self.srvdir, partition, 'tmp')
else:
return os.path.join(self.root, pool, self.srvdir, 'tmp')
开发者ID:vineethrp,项目名称:swift,代码行数:8,代码来源:lfszfs.py
示例14: get_account_broker
def get_account_broker(drive, part, account):
DATADIR = "accounts"
root = os.getcwd()
hsh = hash_path(account)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(root, drive, db_dir, hsh + ".db")
print root, hsh, db_dir, db_path
return AccountBroker(db_path, account=account, logger=None)
开发者ID:github-lmp,项目名称:zft,代码行数:8,代码来源:db_broker_test.py
示例15: _get_account_broker
def _get_account_broker(self, drive, part, account):
if self.fs_object:
return DiskAccount(self.root, account, self.fs_object)
hsh = hash_path(account)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + '.db')
return AccountBroker(db_path, account=account, logger=self.logger)
开发者ID:mja054,项目名称:swift_plugin,代码行数:8,代码来源:server.py
示例16: __init__
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
bytes_per_sync=(512 * 1024 * 1024), iter_hook=None,
threadpool=None):
self.disk_chunk_size = disk_chunk_size
self.bytes_per_sync = bytes_per_sync
self.iter_hook = iter_hook
self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = os.path.join(
path, device, storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp')
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
self.threadpool = threadpool or ThreadPool(nthreads=0)
if not os.path.exists(self.datadir):
return
files = sorted(os.listdir(self.datadir), reverse=True)
for afile in files:
if afile.endswith('.ts'):
self.data_file = self.meta_file = None
self.metadata = {'deleted': True}
return
if afile.endswith('.meta') and not self.meta_file:
self.meta_file = os.path.join(self.datadir, afile)
if afile.endswith('.data') and not self.data_file:
self.data_file = os.path.join(self.datadir, afile)
break
if not self.data_file:
return
self.fp = open(self.data_file, 'rb')
self.metadata = read_metadata(self.fp)
if not keep_data_fp:
self.close(verify_file=False)
if self.meta_file:
with open(self.meta_file) as mfp:
for key in self.metadata.keys():
if key.lower() not in DISALLOWED_HEADERS:
del self.metadata[key]
self.metadata.update(read_metadata(mfp))
if 'name' in self.metadata:
if self.metadata['name'] != self.name:
self.logger.error(_('Client path %(client)s does not match '
'path stored in object metadata %(meta)s'),
{'client': self.name,
'meta': self.metadata['name']})
raise DiskFileCollision('Client path does not match path '
'stored in object metadata')
开发者ID:SinSiXX,项目名称:swift,代码行数:58,代码来源:server.py
示例17: __init__
def __init__(
self,
path,
device,
partition,
account,
container,
obj,
logger,
keep_data_fp=False,
disk_chunk_size=65536,
iter_hook=None,
):
self.disk_chunk_size = disk_chunk_size
self.iter_hook = iter_hook
self.name = "/" + "/".join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = os.path.join(path, device, storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, "tmp")
self.tmppath = None
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
if not os.path.exists(self.datadir):
return
files = sorted(os.listdir(self.datadir), reverse=True)
for file in files:
if file.endswith(".ts"):
self.data_file = self.meta_file = None
self.metadata = {"deleted": True}
return
if file.endswith(".meta") and not self.meta_file:
self.meta_file = os.path.join(self.datadir, file)
if file.endswith(".data") and not self.data_file:
self.data_file = os.path.join(self.datadir, file)
break
if not self.data_file:
return
self.fp = open(self.data_file, "rb")
self.metadata = read_metadata(self.fp)
if not keep_data_fp:
self.close(verify_file=False)
if self.meta_file:
with open(self.meta_file) as mfp:
for key in self.metadata.keys():
if key.lower() not in DISALLOWED_HEADERS:
del self.metadata[key]
self.metadata.update(read_metadata(mfp))
开发者ID:ChristopherMacGown,项目名称:swift,代码行数:57,代码来源:server.py
示例18: __init__
def __init__(self, path, device, partition, account, container, obj,
logger, keep_data_fp=False, disk_chunk_size=65536,
origin_disk_chunk_size=65536, iter_hook=None,
encryption_context=None, crypto_driver=None):
self.disk_chunk_size = disk_chunk_size
self.origin_disk_chunk_size = origin_disk_chunk_size
self.iter_hook = iter_hook
self.name = '/' + '/'.join((account, container, obj))
name_hash = hash_path(account, container, obj)
self.datadir = os.path.join(
path, device, storage_directory(DATADIR, partition, name_hash))
self.device_path = os.path.join(path, device)
self.tmpdir = os.path.join(path, device, 'tmp')
self.tmppath = None
self.logger = logger
self.metadata = {}
self.meta_file = None
self.data_file = None
self.fp = None
self.iter_etag = None
self.started_at_0 = False
self.read_to_eof = False
self.quarantined_dir = None
self.keep_cache = False
self.suppress_file_closing = False
self.encryption_context = encryption_context
self.crypto_driver = crypto_driver
if not os.path.exists(self.datadir):
return
files = sorted(os.listdir(self.datadir), reverse=True)
for file in files:
if file.endswith('.ts'):
self.data_file = self.meta_file = None
self.metadata = {'deleted': True}
return
if file.endswith('.meta') and not self.meta_file:
self.meta_file = os.path.join(self.datadir, file)
if file.endswith('.data') and not self.data_file:
self.data_file = os.path.join(self.datadir, file)
break
if not self.data_file:
return
self.fp = open(self.data_file, 'rb')
self.metadata = read_metadata(self.fp)
if not keep_data_fp:
self.close(verify_file=False)
if self.meta_file:
with open(self.meta_file) as mfp:
for key in self.metadata.keys():
if key.lower() not in DISALLOWED_HEADERS:
del self.metadata[key]
self.metadata.update(read_metadata(mfp))
if self.crypto_driver and not self.encryption_context:
key_id = self.metadata.get('X-Object-Meta-Key-Id')
self.encryption_context = \
self.crypto_driver.encryption_context(key_id)
开发者ID:Manasa7,项目名称:swift-encrypt,代码行数:56,代码来源:server.py
示例19: delete_handoff_objs
def delete_handoff_objs(self, job, delete_objs):
for object_hash in delete_objs:
object_path = storage_directory(job['obj_path'], job['partition'],
object_hash)
tpool.execute(shutil.rmtree, object_path, ignore_errors=True)
suffix_dir = dirname(object_path)
try:
os.rmdir(suffix_dir)
except OSError as e:
if e.errno not in (errno.ENOENT, errno.ENOTEMPTY):
self.logger.exception(
"Unexpected error trying to cleanup suffix dir:%r",
suffix_dir)
开发者ID:danieleguttadoro,项目名称:ovencswiftserver_onthefly,代码行数:13,代码来源:replicator.py
示例20: _get_container_broker
def _get_container_broker(self, drive, part, account, container):
"""
Get a DB broker for the container.
:param drive: drive that holds the container
:param part: partition the container is in
:param account: account name
:param container: container name
:returns: ContainerBroker object
"""
hsh = hash_path(account, container)
db_dir = storage_directory(DATADIR, part, hsh)
db_path = os.path.join(self.root, drive, db_dir, hsh + ".db")
return ContainerBroker(db_path, account=account, container=container, logger=self.logger)
开发者ID:nicoleLiu,项目名称:swift,代码行数:14,代码来源:server.py
注:本文中的swift.common.utils.storage_directory函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论