本文整理汇总了Python中swift.obj.diskfile.write_metadata函数的典型用法代码示例。如果您正苦于以下问题:Python write_metadata函数的具体用法?Python write_metadata怎么用?Python write_metadata使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了write_metadata函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_object_run_fast_track_non_zero
def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
data = "0" * 1024
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
"ETag": etag,
"X-Timestamp": str(normalize_timestamp(time.time())),
"Content-Length": str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
etag = md5()
etag.update("1" + "0" * 1023)
etag = etag.hexdigest()
metadata["ETag"] = etag
write_metadata(writer._fd, metadata)
quarantine_path = os.path.join(self.devices, "sda", "quarantined", "objects")
kwargs = {"mode": "once"}
kwargs["zero_byte_fps"] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del (kwargs["zero_byte_fps"])
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
开发者ID:nlevinki,项目名称:swift,代码行数:29,代码来源:test_auditor.py
示例2: _get_disk_file
def _get_disk_file(
self,
invalid_type=None,
obj_name="o",
fsize=1024,
csize=8,
mark_deleted=False,
ts=None,
iter_hook=None,
mount_check=False,
extra_metadata=None,
):
"""returns a DiskFile"""
df = diskfile.DiskFile(self.testdir, "sda1", "0", "a", "c", obj_name, FakeLogger())
data = "0" * fsize
etag = md5()
if ts:
timestamp = ts
else:
timestamp = str(normalize_timestamp(time()))
with df.writer() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {"ETag": etag, "X-Timestamp": timestamp, "Content-Length": str(os.fstat(writer.fd).st_size)}
metadata.update(extra_metadata or {})
writer.put(metadata)
if invalid_type == "ETag":
etag = md5()
etag.update("1" + "0" * (fsize - 1))
etag = etag.hexdigest()
metadata["ETag"] = etag
diskfile.write_metadata(writer.fd, metadata)
if invalid_type == "Content-Length":
metadata["Content-Length"] = fsize - 1
diskfile.write_metadata(writer.fd, metadata)
if mark_deleted:
metadata = {"X-Timestamp": timestamp, "deleted": True}
df.put_metadata(metadata, tombstone=True)
df = diskfile.DiskFile(
self.testdir,
"sda1",
"0",
"a",
"c",
obj_name,
FakeLogger(),
keep_data_fp=True,
disk_chunk_size=csize,
iter_hook=iter_hook,
mount_check=mount_check,
)
if invalid_type == "Zero-Byte":
os.remove(df.data_file)
fp = open(df.data_file, "w")
fp.close()
df.unit_test_len = fsize
return df
开发者ID:zhouyuan,项目名称:swift,代码行数:60,代码来源:test_diskfile.py
示例3: run_quarantine_range_etag
def run_quarantine_range_etag(self):
container = "container-range-%s" % uuid4()
obj = "object-range-%s" % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, "RANGE")
metadata = read_metadata(data_file)
metadata["ETag"] = "badetag"
write_metadata(data_file, metadata)
base_headers = {"X-Backend-Storage-Policy-Index": self.policy.idx}
for header, result in [
({"Range": "bytes=0-2"}, "RAN"),
({"Range": "bytes=1-11"}, "ANGE"),
({"Range": "bytes=0-11"}, "RANGE"),
]:
req_headers = base_headers.copy()
req_headers.update(header)
odata = direct_client.direct_get_object(onode, opart, self.account, container, obj, headers=req_headers)[-1]
self.assertEquals(odata, result)
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={"X-Backend-Storage-Policy-Index": self.policy.idx}
)
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEquals(err.http_status, 404)
开发者ID:heemanshu,项目名称:swift_juno,代码行数:26,代码来源:test_object_failures.py
示例4: run_quarantine
def run_quarantine(self):
container = 'container-%s' % uuid4()
obj = 'object-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj,
'VERIFY')
# Stash the on disk data for future comparison - this may not equal
# 'VERIFY' if for example the proxy has crypto enabled
backend_data = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
metadata = read_metadata(data_file)
metadata['ETag'] = 'badetag'
write_metadata(data_file, metadata)
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
self.assertEqual(odata, backend_data)
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEqual(err.http_status, 404)
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:26,代码来源:test_object_failures.py
示例5: run_quarantine_range_etag
def run_quarantine_range_etag(self):
container = 'container-range-%s' % uuid4()
obj = 'object-range-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj,
'RANGE')
# Stash the on disk data for future comparison - this may not equal
# 'VERIFY' if for example the proxy has crypto enabled
backend_data = direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})[-1]
metadata = read_metadata(data_file)
metadata['ETag'] = 'badetag'
write_metadata(data_file, metadata)
base_headers = {'X-Backend-Storage-Policy-Index': self.policy.idx}
for header, result in [({'Range': 'bytes=0-2'}, backend_data[0:3]),
({'Range': 'bytes=1-11'}, backend_data[1:]),
({'Range': 'bytes=0-11'}, backend_data)]:
req_headers = base_headers.copy()
req_headers.update(header)
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj,
headers=req_headers)[-1]
self.assertEqual(odata, result)
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEqual(err.http_status, 404)
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:32,代码来源:test_object_failures.py
示例6: test_object_run_fast_track_non_zero
def test_object_run_fast_track_non_zero(self):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
data = '0' * 1024
etag = md5()
with self.disk_file.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': str(os.fstat(writer._fd).st_size),
}
writer.put(metadata)
etag = md5()
etag.update('1' + '0' * 1023)
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
quarantine_path = os.path.join(self.devices,
'sda', 'quarantined', 'objects')
kwargs = {'mode': 'once'}
kwargs['zero_byte_fps'] = 50
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.isdir(quarantine_path))
del(kwargs['zero_byte_fps'])
self.auditor.run_audit(**kwargs)
self.assertTrue(os.path.isdir(quarantine_path))
开发者ID:AsherBond,项目名称:swift,代码行数:30,代码来源:test_auditor.py
示例7: run_quarantine_range_etag
def run_quarantine_range_etag(self):
container = 'container-range-%s' % uuid4()
obj = 'object-range-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj,
'RANGE')
metadata = read_metadata(data_file)
metadata['ETag'] = 'badetag'
write_metadata(data_file, metadata)
base_headers = {'X-Backend-Storage-Policy-Index': self.policy.idx}
for header, result in [({'Range': 'bytes=0-2'}, 'RAN'),
({'Range': 'bytes=1-11'}, 'ANGE'),
({'Range': 'bytes=0-11'}, 'RANGE')]:
req_headers = base_headers.copy()
req_headers.update(header)
odata = direct_client.direct_get_object(
onode, opart, self.account, container, obj,
headers=req_headers)[-1]
self.assertEqual(odata, result)
try:
direct_client.direct_get_object(
onode, opart, self.account, container, obj, headers={
'X-Backend-Storage-Policy-Index': self.policy.idx})
raise Exception("Did not quarantine object")
except ClientException as err:
self.assertEqual(err.http_status, 404)
开发者ID:harrisonfeng,项目名称:swift,代码行数:27,代码来源:test_object_failures.py
示例8: setup_bad_zero_byte
def setup_bad_zero_byte(self, with_ts=False):
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
ts_file_path = ''
if with_ts:
name_hash = hash_path('a', 'c', 'o')
dir_path = os.path.join(
self.devices, 'sda',
storage_directory(get_data_dir(0), '0', name_hash))
ts_file_path = os.path.join(dir_path, '99999.ts')
if not os.path.exists(dir_path):
mkdirs(dir_path)
fp = open(ts_file_path, 'w')
write_metadata(fp, {'X-Timestamp': '99999', 'name': '/a/c/o'})
fp.close()
etag = md5()
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': str(normalize_timestamp(time.time())),
'Content-Length': 10,
}
writer.put(metadata)
etag = md5()
etag = etag.hexdigest()
metadata['ETag'] = etag
write_metadata(writer._fd, metadata)
return ts_file_path
开发者ID:AsherBond,项目名称:swift,代码行数:30,代码来源:test_auditor.py
示例9: setUp
def setUp(self):
super(TestPrintObj, self).setUp()
self.datafile = os.path.join(self.testdir,
'1402017432.46642.data')
with open(self.datafile, 'wb') as fp:
md = {'name': '/AUTH_admin/c/obj',
'Content-Type': 'application/octet-stream'}
write_metadata(fp, md)
开发者ID:aureliengoulon,项目名称:swift,代码行数:8,代码来源:test_info.py
示例10: test_object_audit_will_not_swallow_errors_in_tests
def test_object_audit_will_not_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + ".data")
mkdirs(self.disk_file._datadir)
with open(path, "w") as f:
write_metadata(f, {"name": "/a/c/o"})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger, self.rcache, self.devices)
def blowup(*args):
raise NameError("tpyo")
with mock.patch.object(DiskFileManager, "get_diskfile_from_audit_location", blowup):
self.assertRaises(NameError, auditor_worker.object_audit, AuditLocation(os.path.dirname(path), "sda", "0"))
开发者ID:nlevinki,项目名称:swift,代码行数:13,代码来源:test_auditor.py
示例11: test_invalid_etag
def test_invalid_etag(self):
with open(self.datafile, 'wb') as fp:
md = {'name': '/AUTH_admin/c/obj',
'Content-Type': 'application/octet-stream',
'ETag': 'badetag',
'Content-Length': 0}
write_metadata(fp, md)
out = StringIO()
with mock.patch('sys.stdout', out):
print_obj(self.datafile)
self.assertTrue('ETag: badetag doesn\'t match file hash'
in out.getvalue())
开发者ID:aureliengoulon,项目名称:swift,代码行数:13,代码来源:test_info.py
示例12: _get_disk_file
def _get_disk_file(self, invalid_type=None, obj_name='o',
fsize=1024, csize=8, mark_deleted=False, ts=None,
iter_hook=None, mount_check=False,
extra_metadata=None):
'''returns a DiskFile'''
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger())
data = '0' * fsize
etag = md5()
if ts:
timestamp = ts
else:
timestamp = str(normalize_timestamp(time()))
with df.create() as writer:
writer.write(data)
etag.update(data)
etag = etag.hexdigest()
metadata = {
'ETag': etag,
'X-Timestamp': timestamp,
'Content-Length': str(os.fstat(writer.fd).st_size),
}
metadata.update(extra_metadata or {})
writer.put(metadata)
if invalid_type == 'ETag':
etag = md5()
etag.update('1' + '0' * (fsize - 1))
etag = etag.hexdigest()
metadata['ETag'] = etag
diskfile.write_metadata(writer.fd, metadata)
if invalid_type == 'Content-Length':
metadata['Content-Length'] = fsize - 1
diskfile.write_metadata(writer.fd, metadata)
if mark_deleted:
metadata = {
'X-Timestamp': timestamp,
'deleted': True
}
df.put_metadata(metadata, tombstone=True)
df = diskfile.DiskFile(self.testdir, 'sda1', '0', 'a', 'c',
obj_name, FakeLogger(),
keep_data_fp=True, disk_chunk_size=csize,
iter_hook=iter_hook, mount_check=mount_check)
if invalid_type == 'Zero-Byte':
os.remove(df.data_file)
fp = open(df.data_file, 'w')
fp.close()
df.unit_test_len = fsize
return df
开发者ID:atulshridhar,项目名称:swift,代码行数:51,代码来源:test_diskfile.py
示例13: test_failsafe_object_audit_will_swallow_errors_in_tests
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file.datadir, timestamp + '.data')
mkdirs(self.disk_file.datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
def blowup(*args):
raise NameError('tpyo')
with mock.patch('swift.obj.diskfile.DiskFile',
blowup):
auditor_worker.failsafe_object_audit(path, 'sda', '0')
self.assertEquals(auditor_worker.errors, 1)
开发者ID:Dieterbe,项目名称:swift,代码行数:14,代码来源:test_auditor.py
示例14: test_failsafe_object_audit_will_swallow_errors_in_tests
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + ".data")
mkdirs(self.disk_file._datadir)
with open(path, "w") as f:
write_metadata(f, {"name": "/a/c/o"})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger, self.rcache, self.devices)
def blowup(*args):
raise NameError("tpyo")
with mock.patch("swift.obj.diskfile.DiskFile", blowup):
auditor_worker.failsafe_object_audit(AuditLocation(os.path.dirname(path), "sda", "0"))
self.assertEquals(auditor_worker.errors, 1)
开发者ID:nlevinki,项目名称:swift,代码行数:14,代码来源:test_auditor.py
示例15: setup_bad_zero_byte
def setup_bad_zero_byte(self, timestamp=None):
if timestamp is None:
timestamp = Timestamp(time.time())
self.auditor = auditor.ObjectAuditor(self.conf)
self.auditor.log_time = 0
etag = md5()
with self.disk_file.create() as writer:
etag = etag.hexdigest()
metadata = {"ETag": etag, "X-Timestamp": timestamp.internal, "Content-Length": 10}
writer.put(metadata)
etag = md5()
etag = etag.hexdigest()
metadata["ETag"] = etag
write_metadata(writer._fd, metadata)
开发者ID:renanalan,项目名称:swift,代码行数:14,代码来源:test_auditor.py
示例16: test_object_audit_will_not_swallow_errors_in_tests
def test_object_audit_will_not_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger)
def blowup(*args):
raise NameError('tpyo')
with mock.patch.object(DiskFileManager,
'get_diskfile_from_audit_location', blowup):
self.assertRaises(NameError, auditor_worker.object_audit,
AuditLocation(os.path.dirname(path), 'sda', '0'))
开发者ID:674009287,项目名称:swift,代码行数:14,代码来源:test_auditor.py
示例17: run_quarantine_zero_byte_head
def run_quarantine_zero_byte_head(self):
container = 'container-zbyte-%s' % uuid4()
obj = 'object-zbyte-%s' % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, 'DATA')
metadata = read_metadata(data_file)
unlink(data_file)
with open(data_file, 'w') as fpointer:
write_metadata(fpointer, metadata)
try:
direct_client.direct_head_object(onode, opart, self.account,
container, obj, conn_timeout=1,
response_timeout=1)
raise Exception("Did not quarantine object")
except client.ClientException as err:
self.assertEquals(err.http_status, 404)
开发者ID:BlueSkyChina,项目名称:swift,代码行数:16,代码来源:test_object_failures.py
示例18: test_with_tombstone_and_data
def test_with_tombstone_and_data(self):
# rsync replication could leave a tombstone and data file in object
# dir - verify they are both removed during audit
ts_iter = make_timestamp_iter()
ts_tomb = ts_iter.next()
ts_data = ts_iter.next()
self.setup_bad_zero_byte(timestamp=ts_data)
tomb_file_path = os.path.join(self.disk_file._datadir, "%s.ts" % ts_tomb.internal)
with open(tomb_file_path, "wb") as fd:
write_metadata(fd, {"X-Timestamp": ts_tomb.internal})
files = os.listdir(self.disk_file._datadir)
self.assertEqual(2, len(files))
self.assertTrue(os.path.basename(tomb_file_path) in files, files)
kwargs = {"mode": "once"}
self.auditor.run_audit(**kwargs)
self.assertFalse(os.path.exists(self.disk_file._datadir))
开发者ID:renanalan,项目名称:swift,代码行数:16,代码来源:test_auditor.py
示例19: test_failsafe_object_audit_will_swallow_errors_in_tests
def test_failsafe_object_audit_will_swallow_errors_in_tests(self):
timestamp = str(normalize_timestamp(time.time()))
path = os.path.join(self.disk_file._datadir, timestamp + '.data')
mkdirs(self.disk_file._datadir)
with open(path, 'w') as f:
write_metadata(f, {'name': '/a/c/o'})
auditor_worker = auditor.AuditorWorker(self.conf, self.logger,
self.rcache, self.devices)
def blowup(*args):
raise NameError('tpyo')
with mock.patch('swift.obj.diskfile.DiskFileManager.diskfile_cls',
blowup):
auditor_worker.failsafe_object_audit(
AuditLocation(os.path.dirname(path), 'sda', '0',
policy=POLICIES.legacy))
self.assertEqual(auditor_worker.errors, 1)
开发者ID:HanJX,项目名称:swift,代码行数:17,代码来源:test_auditor.py
示例20: run_quarantine
def run_quarantine(self):
container = "container-%s" % uuid4()
obj = "object-%s" % uuid4()
onode, opart, data_file = self._setup_data_file(container, obj, "VERIFY")
with open(data_file) as fpointer:
metadata = read_metadata(fpointer)
metadata["ETag"] = "badetag"
with open(data_file) as fpointer:
write_metadata(fpointer, metadata)
odata = direct_client.direct_get_object(onode, opart, self.account, container, obj)[-1]
self.assertEquals(odata, "VERIFY")
try:
direct_client.direct_get_object(onode, opart, self.account, container, obj)
raise Exception("Did not quarantine object")
except client.ClientException as err:
self.assertEquals(err.http_status, 404)
开发者ID:Dieterbe,项目名称:swift,代码行数:17,代码来源:test_object_failures.py
注:本文中的swift.obj.diskfile.write_metadata函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论