本文整理汇总了Python中swift.common.utils.mkdirs函数的典型用法代码示例。如果您正苦于以下问题:Python mkdirs函数的具体用法?Python mkdirs怎么用?Python mkdirs使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了mkdirs函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_hash_suffix_multi_file_two
def test_hash_suffix_multi_file_two(self):
df = diskfile.DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data']
if tdiff > 50:
suffs.append('.ts')
for suff in suffs:
f = open(
os.path.join(
df.datadir,
normalize_timestamp(int(time()) - tdiff) + suff),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hsh_path = os.listdir(whole_path_from)[0]
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
diskfile.hash_suffix(whole_path_from, 99)
# only the meta and data should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
开发者ID:atulshridhar,项目名称:swift,代码行数:26,代码来源:test_diskfile.py
示例2: initialize
def initialize(self):
print "initialize start"
utils.mkdirs(os.path.dirname(self.db_file))
self.conn = sqlite3.connect(self.db_file, check_same_thread=False)
self.conn.executescript(
"""
CREATE TABLE data_crawlers (
dev_path TEXT,
datatype TEXT,
slowdown FLOAT DEFAULT 0.001,
cycle INTEGER DEFAULT 0,
UNIQUE (dev_path, datatype)
);
CREATE TABLE data_crawler_runtime (
dev_path TEXT,
datatype TEXT,
cycle INTEGER,
interval FLOAT,
slowdown FLOAT,
parts INTEGER,
timestamp TEXT,
UNIQUE (dev_path, datatype)
);
"""
)
self.conn.commit()
print "initialize DONE"
开发者ID:shaolinjr,项目名称:SwiftCrawler,代码行数:27,代码来源:crawl.py
示例3: test_delete_partition_with_handoff_delete_fail_in_other_region
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
ring = self.replicator.get_object_ring(0)
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
if node['region'] != 1:
# the rsync calls for other region to fail
ret_code = 1
else:
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
开发者ID:gayana06,项目名称:Thesis,代码行数:33,代码来源:test_replicator.py
示例4: test_quarantine_same_file
def test_quarantine_same_file(self):
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger())
new_dir = df.quarantine()
quar_dir = os.path.join(self.testdir, 'sda1', 'quarantined',
'objects', os.path.basename(os.path.dirname(
df.data_file)))
self.assert_(os.path.isdir(quar_dir))
self.assertEquals(quar_dir, new_dir)
# have to remake the datadir and file
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time()) + '.data'), 'wb')
setxattr(f.fileno(), diskfile.METADATA_KEY,
pickle.dumps({}, diskfile.PICKLE_PROTOCOL))
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',
FakeLogger(), keep_data_fp=True)
double_uuid_path = df.quarantine()
self.assert_(os.path.isdir(double_uuid_path))
self.assert_('-' in os.path.basename(double_uuid_path))
开发者ID:Awingu,项目名称:swift,代码行数:28,代码来源:test_diskfile.py
示例5: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ""
if with_ts:
name_hash = hash_path("a", "c", "o")
dir_path = os.path.join(self.devices, "sda", storage_directory(DATADIR, "0", name_hash))
ts_file_path = os.path.join(dir_path, "99999.ts")
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, "w")
fp.close()
etag = md5()
with self.disk_file.mkstemp() as (fd, tmppath):
etag = etag.hexdigest()
metadata = {"ETag": etag, "X-Timestamp": str(normalize_timestamp(time.time())), "Content-Length": 10}
self.disk_file.put(fd, tmppath, metadata)
etag = md5()
etag = etag.hexdigest()
metadata["ETag"] = etag
write_metadata(fd, metadata)
if self.disk_file.data_file:
return self.disk_file.data_file
return ts_file_path
开发者ID:jness,项目名称:python-swift,代码行数:26,代码来源:test_auditor.py
示例6: test_run_once_1
def test_run_once_1(self):
conf = dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1')
replicator = object_replicator.ObjectReplicator(conf)
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
cur_part = '0'
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
policy_idx=1)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects_1, cur_part, data_dir)
process_arg_checker = []
ring = replicator.get_object_ring(1)
nodes = [node for node in
ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
rsync_mods = tuple(['%s::object/sda/objects-1/%s' %
(node['ip'], cur_part) for node in nodes])
for node in nodes:
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mods]))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
object_replicator.http_connect = was_connector
开发者ID:gayana06,项目名称:Thesis,代码行数:32,代码来源:test_replicator.py
示例7: setUp
def setUp(self):
self.orig_hp = utils.HASH_PATH_PREFIX, utils.HASH_PATH_SUFFIX
utils.HASH_PATH_PREFIX = 'info'
utils.HASH_PATH_SUFFIX = 'info'
self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_info')
utils.mkdirs(self.testdir)
rmtree(self.testdir)
utils.mkdirs(os.path.join(self.testdir, 'sda1'))
utils.mkdirs(os.path.join(self.testdir, 'sda1', 'tmp'))
utils.mkdirs(os.path.join(self.testdir, 'sdb1'))
utils.mkdirs(os.path.join(self.testdir, 'sdb1', 'tmp'))
self.account_ring_path = os.path.join(self.testdir, 'account.ring.gz')
with closing(GzipFile(self.account_ring_path, 'wb')) as f:
pickle.dump(ring.RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
[{'id': 0, 'zone': 0, 'device': 'sda1',
'ip': '127.0.0.1', 'port': 42},
{'id': 1, 'zone': 1, 'device': 'sdb1',
'ip': '127.0.0.2', 'port': 43}], 30),
f)
self.container_ring_path = os.path.join(self.testdir,
'container.ring.gz')
with closing(GzipFile(self.container_ring_path, 'wb')) as f:
pickle.dump(ring.RingData([[0, 1, 0, 1], [1, 0, 1, 0]],
[{'id': 0, 'zone': 0, 'device': 'sda1',
'ip': '127.0.0.3', 'port': 42},
{'id': 1, 'zone': 1, 'device': 'sdb1',
'ip': '127.0.0.4', 'port': 43}], 30),
f)
开发者ID:HugoKuo,项目名称:swift,代码行数:28,代码来源:test_info.py
示例8: setUp
def setUp(self):
super(TestBaseSsync, self).setUp()
self.device = 'dev'
self.partition = '9'
# sender side setup
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
self.daemon = FakeReplicator(self.tx_testdir)
# rx side setup
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
conf = {
'devices': self.rx_testdir,
'mount_check': 'false',
'replication_one_per_device': 'false',
'log_requests': 'false'}
self.rx_controller = server.ObjectController(conf)
self.ts_iter = (Timestamp(t)
for t in itertools.count(int(time.time())))
self.rx_ip = '127.0.0.1'
sock = eventlet.listen((self.rx_ip, 0))
self.rx_server = eventlet.spawn(
eventlet.wsgi.server, sock, self.rx_controller, utils.NullLogger())
self.rx_port = sock.getsockname()[1]
self.rx_node = {'replication_ip': self.rx_ip,
'replication_port': self.rx_port,
'device': self.device}
开发者ID:BjoernT,项目名称:swift,代码行数:28,代码来源:test_ssync.py
示例9: setup_partition
def setup_partition(self, pool, partition):
path = os.path.join(self.root, pool, self.srvdir, partition)
if self.fs_per_part:
fs = '%s/%s/%s' %(self.topfs, self.srvdir, partition)
zfs_create(pool, fs, path)
else:
mkdirs(path)
开发者ID:vineethrp,项目名称:swift,代码行数:7,代码来源:lfszfs.py
示例10: add_synced_container
def add_synced_container(self, broker):
"""
Adds the container db represented by broker to the list of synced
containers.
:param broker: An instance of ContainerBroker representing the
container to add.
"""
sync_file = self._container_to_synced_container_path(broker.db_file)
stat = None
try:
stat = os.stat(sync_file)
except OSError as oserr:
if oserr.errno != errno.ENOENT:
raise oserr
if stat is not None:
return
sync_path = os.path.dirname(sync_file)
mkdirs(sync_path)
try:
os.symlink(broker.db_file, sync_file)
except OSError as oserr:
if oserr.errno != errno.EEXIST or not os.path.islink(sync_file):
raise oserr
开发者ID:unibg-seclab,项目名称:swift,代码行数:27,代码来源:sync_store.py
示例11: find_and_process
def find_and_process(self):
src_filename = time.strftime(self.filename_format)
working_dir = os.path.join(self.target_dir, ".%-stats_tmp" % self.stats_type)
shutil.rmtree(working_dir, ignore_errors=True)
mkdirs(working_dir)
tmp_filename = os.path.join(working_dir, src_filename)
hasher = hashlib.md5()
try:
with open(tmp_filename, "wb") as statfile:
statfile.write(self.get_header())
for device in os.listdir(self.devices):
if self.mount_check and not check_mount(self.devices, device):
self.logger.error(_("Device %s is not mounted, skipping.") % device)
continue
db_dir = os.path.join(self.devices, device, self.data_dir)
if not os.path.exists(db_dir):
self.logger.debug(_("Path %s does not exist, skipping.") % db_dir)
continue
for root, dirs, files in os.walk(db_dir, topdown=False):
for filename in files:
if filename.endswith(".db"):
db_path = os.path.join(root, filename)
try:
line_data = self.get_data(db_path)
except sqlite3.Error, err:
self.logger.info(_("Error accessing db %s: %s") % (db_path, err))
continue
if line_data:
statfile.write(line_data)
hasher.update(line_data)
src_filename += hasher.hexdigest()
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
开发者ID:VenkataSeshadri,项目名称:slogging,代码行数:33,代码来源:db_stats_collector.py
示例12: test_run_once_recover_from_failure
def test_run_once_recover_from_failure(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices, mount_check="false", timeout="300", stats_interval="1")
)
was_connector = object_replicator.http_connect
try:
object_replicator.http_connect = mock_http_connect(200)
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
cur_part = "1"
df = DiskFile(self.devices, "sda", cur_part, "a", "c", "o", FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + ".data"), "wb")
f.write("1234567890")
f.close()
ohash = hash_path("a", "c", "o")
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in self.ring.get_part_nodes(int(cur_part)) if node["ip"] not in _ips()]
for node in nodes:
rsync_mod = "%s::object/sda/objects/%s" % (node["ip"], cur_part)
process_arg_checker.append((0, "", ["rsync", whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects, "1", data_dir, ohash), os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
for i, result in [("0", True), ("1", False), ("2", True), ("3", True)]:
self.assertEquals(
os.access(os.path.join(self.objects, i, object_replicator.HASH_FILE), os.F_OK), result
)
finally:
object_replicator.http_connect = was_connector
开发者ID:mygoda,项目名称:openstack,代码行数:33,代码来源:test_replicator.py
示例13: test_run_once
def test_run_once(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices, mount_check="false", timeout="300", stats_interval="1")
)
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
cur_part = "0"
df = DiskFile(self.devices, "sda", cur_part, "a", "c", "o", FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir, normalize_timestamp(time.time()) + ".data"), "wb")
f.write("1234567890")
f.close()
ohash = hash_path("a", "c", "o")
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in self.ring.get_part_nodes(int(cur_part)) if node["ip"] not in _ips()]
for node in nodes:
rsync_mod = "%s::object/sda/objects/%s" % (node["ip"], cur_part)
process_arg_checker.append((0, "", ["rsync", whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
object_replicator.http_connect = was_connector
开发者ID:mygoda,项目名称:openstack,代码行数:25,代码来源:test_replicator.py
示例14: setUp
def setUp(self):
self.testdir = os.path.join(mkdtemp(), "tmp_test_object_auditor")
self.devices = os.path.join(self.testdir, "node")
self.rcache = os.path.join(self.testdir, "object.recon")
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, "sda"))
os.mkdir(os.path.join(self.devices, "sdb"))
# policy 0
self.objects = os.path.join(self.devices, "sda", get_data_dir(POLICIES[0]))
self.objects_2 = os.path.join(self.devices, "sdb", get_data_dir(POLICIES[0]))
os.mkdir(self.objects)
# policy 1
self.objects_p1 = os.path.join(self.devices, "sda", get_data_dir(POLICIES[1]))
self.objects_2_p1 = os.path.join(self.devices, "sdb", get_data_dir(POLICIES[1]))
os.mkdir(self.objects_p1)
self.parts = self.parts_p1 = {}
for part in ["0", "1", "2", "3"]:
self.parts[part] = os.path.join(self.objects, part)
self.parts_p1[part] = os.path.join(self.objects_p1, part)
os.mkdir(os.path.join(self.objects, part))
os.mkdir(os.path.join(self.objects_p1, part))
self.conf = dict(devices=self.devices, mount_check="false", object_size_stats="10,100,1024,10240")
self.df_mgr = DiskFileManager(self.conf, self.logger)
# diskfiles for policy 0, 1
self.disk_file = self.df_mgr.get_diskfile("sda", "0", "a", "c", "o", policy=POLICIES[0])
self.disk_file_p1 = self.df_mgr.get_diskfile("sda", "0", "a", "c", "o", policy=POLICIES[1])
开发者ID:renanalan,项目名称:swift,代码行数:31,代码来源:test_auditor.py
示例15: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ''
if with_ts:
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(
self.devices, 'sda',
storage_directory(get_data_dir(0), '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, 'w')
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
fp.close()
etag = md5()
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
writer.put(metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
return ts_file_path
开发者ID:AsherBond,项目名称:swift,代码行数:30,代码来源:test_auditor.py
示例16: create
def create(self, size=None):
"""
Context manager to create a file. We create a temporary file first, and
then return a DiskFileWriter object to encapsulate the state.
.. note::
An implementation is not required to perform on-disk
preallocations even if the parameter is specified. But if it does
and it fails, it must raise a `DiskFileNoSpace` exception.
:param size: optional initial size of file to explicitly allocate on
disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
"""
if not exists(self._tmpdir):
mkdirs(self._tmpdir)
fd, tmppath = mkstemp(dir=self._tmpdir)
try:
if size is not None and size > 0:
try:
fallocate(fd, size)
except OSError:
raise DiskFileNoSpace()
yield DiskFileWriter(self._name, self._datadir, fd, tmppath,
self._bytes_per_sync, self._threadpool)
finally:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(tmppath)
except OSError:
pass
开发者ID:yohsuke,项目名称:swift,代码行数:35,代码来源:diskfile.py
示例17: writer
def writer(self, size=None):
"""
Context manager to write a file. We create a temporary file first, and
then return a DiskWriter object to encapsulate the state.
:param size: optional initial size of file to explicitly allocate on
disk
:raises DiskFileNoSpace: if a size is specified and allocation fails
"""
if not os.path.exists(self.tmpdir):
mkdirs(self.tmpdir)
fd, tmppath = mkstemp(dir=self.tmpdir)
try:
if size is not None and size > 0:
try:
fallocate(fd, size)
except OSError:
raise DiskFileNoSpace()
yield DiskWriter(self, fd, tmppath, self.threadpool)
finally:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(tmppath)
except OSError:
pass
开发者ID:iBeacons,项目名称:swift,代码行数:28,代码来源:diskfile.py
示例18: setUp
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_object_auditor')
self.devices = os.path.join(self.testdir, 'node')
self.rcache = os.path.join(self.testdir, 'object.recon')
self.logger = FakeLogger()
rmtree(self.testdir, ignore_errors=1)
mkdirs(os.path.join(self.devices, 'sda'))
os.mkdir(os.path.join(self.devices, 'sdb'))
# policy 0
self.objects = os.path.join(self.devices, 'sda', get_data_dir(0))
self.objects_2 = os.path.join(self.devices, 'sdb', get_data_dir(0))
os.mkdir(self.objects)
# policy 1
self.objects_p1 = os.path.join(self.devices, 'sda', get_data_dir(1))
self.objects_2_p1 = os.path.join(self.devices, 'sdb', get_data_dir(1))
os.mkdir(self.objects_p1)
self.parts = self.parts_p1 = {}
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
self.parts_p1[part] = os.path.join(self.objects_p1, part)
os.mkdir(os.path.join(self.objects, part))
os.mkdir(os.path.join(self.objects_p1, part))
self.conf = dict(
devices=self.devices,
mount_check='false',
object_size_stats='10,100,1024,10240')
self.df_mgr = DiskFileManager(self.conf, self.logger)
# diskfiles for policy 0, 1
self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', 0)
self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c',
'o', 1)
开发者ID:AsherBond,项目名称:swift,代码行数:35,代码来源:test_auditor.py
示例19: test_delete_partition
def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
开发者ID:ChenZhengtongnju,项目名称:swift,代码行数:7,代码来源:test_replicator.py
示例20: test_run_once_recover_from_timeout
def test_run_once_recover_from_timeout(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1'))
was_connector = object_replicator.http_connect
was_get_hashes = object_replicator.get_hashes
was_execute = tpool.execute
self.get_hash_count = 0
try:
def fake_get_hashes(*args, **kwargs):
self.get_hash_count += 1
if self.get_hash_count == 3:
# raise timeout on last call to get hashes
raise Timeout()
return 2, {'abc': 'def'}
def fake_exc(tester, *args, **kwargs):
if 'Error syncing partition' in args[0]:
tester.i_failed = True
self.i_failed = False
object_replicator.http_connect = mock_http_connect(200)
object_replicator.get_hashes = fake_get_hashes
replicator.logger.exception = \
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
# Write some files into '1' and run replicate- they should be moved
# to the other partitions and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o',
FakeLogger())
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in
self.ring.get_part_nodes(int(cur_part))
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
self.assertFalse(self.i_failed)
finally:
object_replicator.http_connect = was_connector
object_replicator.get_hashes = was_get_hashes
tpool.execute = was_execute
开发者ID:ChenZhengtongnju,项目名称:swift,代码行数:60,代码来源:test_replicator.py
注:本文中的swift.common.utils.mkdirs函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论