本文整理汇总了Python中swiftclient.client.head_object函数的典型用法代码示例。如果您正苦于以下问题:Python head_object函数的具体用法?Python head_object怎么用?Python head_object使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了head_object函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: handleCopy
def handleCopy(self, destPath, depthInfinity):
dst = '/'.join(destPath.split('/')[2:])
try:
client.head_object(self.storage_url,
self.auth_token,
self.container,
dst,
http_conn=self.http_connection)
except client.ClientException:
pass
headers = {'X-Copy-From': self.path}
try:
client.put_object(self.storage_url,
self.auth_token,
self.container,
dst,
headers=headers,
http_conn=self.http_connection)
if self.environ.get("HTTP_OVERWRITE", '') != "T":
raise DAVError(HTTP_CREATED)
return True
except client.ClientException:
return False
开发者ID:mikalv,项目名称:swiftdav,代码行数:25,代码来源:swiftdav.py
示例2: get_status
def get_status(request, container, object_name):
"""Download an object from Swift
:param container: container of swift where object is stored
:param object_name: ID of object
:return: object bytestream
"""
try:
if not request.session.get('storage_url') and not request.session.get('auth_token'):
return Response('Please Contact your administrator', status=status.HTTP_401_UNAUTHORIZED)
storage_url, auth_token = _get_auth_data(request.session)
obj = {object_name: {}}
# TODO: Currently using swift as first option, switch to in-memory
headers = client.head_object(storage_url, auth_token, container, object_name)
vid_format = 'mp4'
id_header = 'x-object-meta-' + vid_format + '-id'
if id_header in headers:
video_headers = client.head_object(storage_url, auth_token, container, headers[id_header])
obj[object_name][vid_format + 'status'] = video_headers['x-object-meta-status'] if 'x-object-meta-status' in video_headers else 0
else:
print "DOESN'T EXIST"
# print video_keys
obj[object_name]['status'] = 'DONE'
return Response(obj, status=status.HTTP_200_OK)
return Response(obj)
# if object_name in video_keys:
# keys = video_keys[object_name]
# obj = {object_name: keys}
# print obj
# return Response(obj)
except swift_exception.ClientException as e:
print e
return Response('Please Contact Your Administrator', status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
print e
return Response(e.message, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
开发者ID:InBetween,项目名称:AssetManager,代码行数:35,代码来源:views.py
示例3: getMember
def getMember(self, objectname):
"""Get member for this ObjectCollection.
Checks if requested name is a subdir (see above)
"""
if self.prefix and self.prefix not in objectname:
objectname = self.prefix + objectname
if self.is_subdir(objectname):
return ObjectCollection(self.container, self.environ,
prefix=objectname)
if self.environ.get('REQUEST_METHOD') in ['PUT']:
return ObjectResource(self.container, objectname,
self.environ, self.objects)
try:
client.head_object(self.storage_url,
self.auth_token,
self.container,
objectname,
http_conn=self.http_connection)
return ObjectResource(self.container, objectname,
self.environ, self.objects)
except client.ClientException:
pass
return None
开发者ID:cschwede,项目名称:swiftdav,代码行数:25,代码来源:swiftdav.py
示例4: createCollection
def createCollection(self, name):
"""Create a pseudo-folder."""
if self.path:
tmp = self.path.split('/')
name = '/'.join(tmp[2:]) + '/' + name
name = name.strip('/')
try:
client.head_object(self.storage_url,
self.auth_token,
self.container,
name,
http_conn=self.http_connection)
raise dav_error.DAVError(dav_error.HTTP_METHOD_NOT_ALLOWED)
except client.ClientException:
pass
try:
client.head_object(self.storage_url,
self.auth_token,
self.container,
name + '/',
http_conn=self.http_connection)
raise dav_error.DAVError(dav_error.HTTP_METHOD_NOT_ALLOWED)
except client.ClientException:
pass
client.put_object(self.storage_url,
self.auth_token,
self.container,
sanitize(name).rstrip('/') + '/',
content_type='application/directory',
http_conn=self.http_connection)
开发者ID:cschwede,项目名称:swiftdav,代码行数:32,代码来源:swiftdav.py
示例5: exists
def exists(self, name):
"""
Returns True if a file referenced by the given name already exists in
the storage system, or False if the name is available for a new file.
"""
try:
(host, port, path, is_ssl) = self.connection.connection_args
storage_url = 'https://%s:%d/%s' % (host, port, path)
head_object(storage_url, self.connection.token, self.container_name,
name)
return True
except ClientException:
return False
开发者ID:drewtempelmeyer,项目名称:django-cumulus,代码行数:13,代码来源:storage.py
示例6: test_async_update_after_PUT
def test_async_update_after_PUT(self):
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# put an object while one container server is stopped so that we force
# an async update to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content)
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# Run the object-updaters to be sure updates are done
Manager(['object-updater']).once()
# check the re-started container server has update with override values
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
开发者ID:ISCAS-VDI,项目名称:swift-base,代码行数:27,代码来源:test_object_async_update.py
示例7: get_all
def get_all(request, container):
""" Returns List of all Objects present in specified container
:param container: Name of Swift Container
:return: meta data and id's of all the objects
"""
try:
response_dict = dict()
storage_url, auth_token = _get_auth_data(request.session)
data_container = client.get_container(storage_url, auth_token, container)
for obj in data_container[1]:
meta_object = client.head_object(storage_url, auth_token, container, obj['name'])
if not meta_object['x-object-meta-deleted']:
if meta_object['x-object-meta-type'] in ['thumbnail', 'original-thumbnail']:
if meta_object['x-object-meta-format'] not in response_dict:
form = meta_object['x-object-meta-format']
response_dict[form] = []
if meta_object['x-object-meta-format'] in response_dict:
new_obj = {'thumbnail_id': obj['name'],
'name': meta_object['x-object-meta-name'],
'type': meta_object['x-object-meta-type'],
'resolution': meta_object['x-object-meta-resolution']}
if meta_object['x-object-meta-format'] is 'thumbnail':
new_obj['original_id'] = meta_object['x-object-meta-original']
response_dict[form].append(new_obj)
return Response(response_dict)
except swift_exception.ClientException as e:
print e
return Response('Please contact your admininstrator', status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
print e
return Response('Please contact your admininstrator', status=status.HTTP_500_INTERNAL_SERVER_ERROR)
开发者ID:InBetween,项目名称:AssetManager,代码行数:31,代码来源:views.py
示例8: _check_node
def _check_node(self, node, part, etag, headers_post):
# get fragment archive etag
fragment_archive_etag = self.direct_get(node, part)
# remove the .durable from the selected node
part_dir = self.storage_dir('object', node, part=part)
for dirs, subdirs, files in os.walk(part_dir):
for fname in files:
if fname.endswith('.durable'):
durable = os.path.join(dirs, fname)
os.remove(durable)
break
try:
os.remove(os.path.join(part_dir, 'hashes.pkl'))
except OSError as e:
if e.errno != errno.ENOENT:
raise
# fire up reconstructor to propogate the .durable
self.reconstructor.once()
# fragment is still exactly as it was before!
self.assertEqual(fragment_archive_etag,
self.direct_get(node, part))
# check meta
meta = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
for key in headers_post:
self.assertTrue(key in meta)
self.assertEqual(meta[key], headers_post[key])
开发者ID:2015-ucsc-hp,项目名称:swift,代码行数:32,代码来源:test_reconstructor_durable.py
示例9: _test_sync
def _test_sync(self, object_post_as_copy):
source_container, dest_container = self._setup_synced_containers()
# upload to source
object_name = 'object-%s' % uuid.uuid4()
put_headers = {'X-Object-Meta-Test': 'put_value'}
client.put_object(self.url, self.token, source_container, object_name,
'test-body', headers=put_headers)
# cycle container-sync
Manager(['container-sync']).once()
resp_headers, body = client.get_object(self.url, self.token,
dest_container, object_name)
self.assertEqual(body, 'test-body')
self.assertIn('x-object-meta-test', resp_headers)
self.assertEqual('put_value', resp_headers['x-object-meta-test'])
# update metadata with a POST, using an internal client so we can
# vary the object_post_as_copy setting - first use post-as-copy
post_headers = {'Content-Type': 'image/jpeg',
'X-Object-Meta-Test': 'post_value'}
int_client = self.make_internal_client(
object_post_as_copy=object_post_as_copy)
int_client.set_object_metadata(self.account, source_container,
object_name, post_headers)
# sanity checks...
resp_headers = client.head_object(
self.url, self.token, source_container, object_name)
self.assertIn('x-object-meta-test', resp_headers)
self.assertEqual('post_value', resp_headers['x-object-meta-test'])
self.assertEqual('image/jpeg', resp_headers['content-type'])
# cycle container-sync
Manager(['container-sync']).once()
# verify that metadata changes were sync'd
resp_headers, body = client.get_object(self.url, self.token,
dest_container, object_name)
self.assertEqual(body, 'test-body')
self.assertIn('x-object-meta-test', resp_headers)
self.assertEqual('post_value', resp_headers['x-object-meta-test'])
self.assertEqual('image/jpeg', resp_headers['content-type'])
# delete the object
client.delete_object(
self.url, self.token, source_container, object_name)
with self.assertRaises(ClientException) as cm:
client.get_object(
self.url, self.token, source_container, object_name)
self.assertEqual(404, cm.exception.http_status) # sanity check
# cycle container-sync
Manager(['container-sync']).once()
# verify delete has been sync'd
with self.assertRaises(ClientException) as cm:
client.get_object(
self.url, self.token, dest_container, object_name)
self.assertEqual(404, cm.exception.http_status) # sanity check
开发者ID:Ahiknsr,项目名称:swift,代码行数:60,代码来源:test_container_sync.py
示例10: _check_node
def _check_node(self, node, part, etag, headers_post):
# get fragment archive etag
fragment_archive_etag = self.direct_get(node, part)
# remove data from the selected node
part_dir = self.storage_dir('object', node, part=part)
shutil.rmtree(part_dir, True)
# this node can't servce the data any more
try:
self.direct_get(node, part)
except direct_client.DirectClientException as err:
self.assertEqual(err.http_status, 404)
else:
self.fail('Node data on %r was not fully destoryed!' %
(node,))
# make sure we can still GET the object and its correct, the
# proxy is doing decode on remaining fragments to get the obj
self.assertEqual(etag, self.proxy_get())
# fire up reconstructor
self.reconstructor.once()
# fragment is rebuilt exactly as it was before!
self.assertEqual(fragment_archive_etag,
self.direct_get(node, part))
# check meta
meta = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
for key in headers_post:
self.assertTrue(key in meta)
self.assertEqual(meta[key], headers_post[key])
开发者ID:2015-ucsc-hp,项目名称:swift,代码行数:35,代码来源:test_reconstructor_rebuild.py
示例11: get_headers
def get_headers(self):
"""Execute HEAD object request.
Since this info is used in different methods (see below),
do it once and then use this info.
"""
if self.headers is None:
data = self.objects.get(self.objectname)
if data:
self.headers = {'content-length': data.get('bytes'),
'etag': data.get('hash'),
'last_modified': data.get('last_modified'),
}
else:
try:
self.headers = client.head_object(
self.storage_url,
self.auth_token,
self.container,
self.objectname,
http_conn=self.http_connection)
except client.ClientException:
self.headers = {}
pass
开发者ID:cschwede,项目名称:swiftdav,代码行数:25,代码来源:swiftdav.py
示例12: read_temp_url
def read_temp_url(url, token, container, object):
head = swift_client.head_object(url=url, token=token, container=container, name=object)
print head
try:
print head['x-object-meta-tempurl']
return head['x-object-meta-tempurl'], head['x-object-meta-tempurlexp']
except KeyError:
return False, False
开发者ID:MrHanachoo,项目名称:OSB,代码行数:8,代码来源:api.py
示例13: test_put_ctype_replicated_when_subsequent_post
def test_put_ctype_replicated_when_subsequent_post(self):
# primary half handoff half
# ------------ ------------
# t0.data: ctype = foo
# t1.data: ctype = bar
# t2.meta:
#
# ...run replicator and expect...
#
# t1.data: ctype = bar
# t2.meta:
self.brain.put_container()
# incomplete write
self.brain.stop_handoff_half()
self.container_brain.stop_handoff_half()
self._put_object(headers={'Content-Type': 'foo'})
self.brain.start_handoff_half()
self.container_brain.start_handoff_half()
# handoff write
self.brain.stop_primary_half()
self.container_brain.stop_primary_half()
self._put_object(headers={'Content-Type': 'bar'})
self.brain.start_primary_half()
self.container_brain.start_primary_half()
# metadata update with newest data unavailable
self.brain.stop_handoff_half()
self.container_brain.stop_handoff_half()
self._post_object(headers={'X-Object-Meta-Color': 'Blue'})
self.brain.start_handoff_half()
self.container_brain.start_handoff_half()
self.get_to_final_state()
# check object metadata
metadata = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
# check container listing metadata
container_metadata, objs = client.get_container(self.url, self.token,
self.container_name)
for obj in objs:
if obj['name'] == self.object_name:
break
else:
self.fail('obj not found in container listing')
expected = 'bar'
self.assertEqual(obj['content_type'], expected)
self.assertEqual(metadata['x-object-meta-color'], 'Blue')
self._assert_object_metadata_matches_listing(obj, metadata)
self._assert_consistent_container_dbs()
self._assert_consistent_object_metadata()
self._assert_consistent_suffix_hashes()
开发者ID:clayg,项目名称:swift,代码行数:57,代码来源:test_object_metadata_replication.py
示例14: _set_headers
def _set_headers(storage_url, auth_token, container, object_name, deleted, request):
try:
try:
header = client.head_object(storage_url, auth_token, container, object_name)
except swift_exception.ClientException as e:
storage_url, auth_token = _reauthorize(request)
header = client.head_object(storage_url, auth_token, container, object_name)
new_header = {'X-Object-Meta-Deleted': deleted,
'X-Object-Meta-Format': header['x-object-meta-format'],
'X-Object-Meta-Resolution': header['x-object-meta-resolution'],
'X-Object-Meta-Name': header['x-object-meta-name'],
'X-Object-Meta-Type': header['x-object-meta-type']
}
copy_of = None
if (header['x-object-meta-format'] in video_types) and (header['x-object-meta-type'] == 'original'):
new_header['X-Object-Meta-Mp4-Id'] = header['x-object-meta-mp4-id']
if 'x-object-meta-status' in header:
new_header['X-Object-Meta-Status'] = header['x-object-meta-status']
if header['x-object-meta-type'] == 'thumbnail':
new_header['X-Object-Meta-Original'] = header['x-object-meta-original']
copy_of = header['x-object-meta-original']
if header['x-object-meta-type'] == 'original':
new_header['X-Object-Meta-Thumb'] = header['x-object-meta-thumb']
if container == 'Video':
copy_of = [header['x-object-meta-thumb'], header['x-object-meta-mp4-id']]
else:
copy_of = header['x-object-meta-thumb']
if 'x-object-meta-preview-id' in header:
new_header['X-Object-Meta-Preview-Id'] = header['x-object-meta-preview-id']
copy_of = [header['x-object-meta-thumb'], header['x-object-meta-preview-id']]
try:
client.post_object(storage_url, auth_token, container, object_name, headers=new_header)
except swift_exception.ClientException as e:
storage_url, auth_token = _reauthorize(request)
client.post_object(storage_url, auth_token, container, object_name, headers=new_header)
return copy_of
except swift_exception.ClientException as e:
raise e
except Exception as e:
raise e
开发者ID:InBetween,项目名称:AssetManager,代码行数:43,代码来源:views.py
示例15: test_post_ctype_replicated_when_previous_incomplete_puts
def test_post_ctype_replicated_when_previous_incomplete_puts(self):
# primary half handoff half
# ------------ ------------
# t0.data: ctype = foo
# t1.data: ctype = bar
# t2.meta: ctype = baz
#
# ...run replicator and expect...
#
# t1.data:
# t2.meta: ctype = baz
self.brain.put_container()
# incomplete write to primary half
self.brain.stop_handoff_half()
self.container_brain.stop_handoff_half()
self._put_object(headers={'Content-Type': 'foo'})
self.brain.start_handoff_half()
self.container_brain.start_handoff_half()
# handoff write
self.brain.stop_primary_half()
self.container_brain.stop_primary_half()
self._put_object(headers={'Content-Type': 'bar'})
self.brain.start_primary_half()
self.container_brain.start_primary_half()
# content-type update to primary half
self.brain.stop_handoff_half()
self.container_brain.stop_handoff_half()
self._post_object(headers={'Content-Type': 'baz'})
self.brain.start_handoff_half()
self.container_brain.start_handoff_half()
self.get_to_final_state()
# check object metadata
metadata = client.head_object(self.url, self.token,
self.container_name,
self.object_name)
# check container listing metadata
container_metadata, objs = client.get_container(self.url, self.token,
self.container_name)
for obj in objs:
if obj['name'] == self.object_name:
break
expected = 'baz'
self.assertEqual(obj['content_type'], expected)
self._assert_object_metadata_matches_listing(obj, metadata)
self._assert_consistent_container_dbs()
self._assert_consistent_object_metadata()
self._assert_consistent_suffix_hashes()
开发者ID:clayg,项目名称:swift,代码行数:54,代码来源:test_object_metadata_replication.py
示例16: test_async_updates_after_PUT_and_POST
def test_async_updates_after_PUT_and_POST(self):
# verify correct update values when PUT update and POST updates are
# missed but then async updates are sent
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# PUT and POST to object while one container server is stopped so that
# we force async updates to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content,
content_type='test/ctype')
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# use internal client for POST so we can force fast-post mode
int_client = self.make_internal_client(object_post_as_copy=False)
int_client.set_object_metadata(
self.account, 'c1', 'o1', {'X-Object-Meta-Fruit': 'Tomato'})
self.assertEqual(
'Tomato',
int_client.get_object_metadata(self.account, 'c1', 'o1')
['x-object-meta-fruit']) # sanity
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# Run the object-updaters to send the async pendings
Manager(['object-updater']).once()
# check the re-started container server got same update as others.
# we cannot assert the actual etag value because it may be encrypted
listing_etags = set()
for cnode in cnodes:
listing = direct_client.direct_get_container(
cnode, cpart, self.account, 'c1')[1]
self.assertEqual(1, len(listing))
self.assertEqual(len(content), listing[0]['bytes'])
self.assertEqual('test/ctype', listing[0]['content_type'])
listing_etags.add(listing[0]['hash'])
self.assertEqual(1, len(listing_etags))
# check that listing meta returned to client is consistent with object
# meta returned to client
hdrs, listing = client.get_container(self.url, self.token, 'c1')
self.assertEqual(1, len(listing))
self.assertEqual('o1', listing[0]['name'])
self.assertEqual(len(content), listing[0]['bytes'])
self.assertEqual(meta['etag'], listing[0]['hash'])
self.assertEqual('test/ctype', listing[0]['content_type'])
开发者ID:SmartInfrastructures,项目名称:swift,代码行数:54,代码来源:test_object_async_update.py
示例17: invoke_storlet_on_copy_dest
def invoke_storlet_on_copy_dest(self):
# No COPY in swiftclient. Using urllib instead...
url = '%s/%s/%s' % (self.url, 'myobjects', self.storlet_file)
headers = {'X-Auth-Token': self.token,
'X-Run-Storlet': self.storlet_name,
'Destination': 'myobjects/gen_thumb_on_copy_.jpg'}
req = urllib2.Request(url, headers=headers)
req.get_method = lambda: 'COPY'
conn = urllib2.urlopen(req, timeout=10)
status = conn.getcode()
self.assertTrue(status in [201, 202])
headers = c.head_object(self.url, self.token,
'myobjects', 'gen_thumb_on_copy_.jpg')
self.assertEqual(headers['content-length'], '49032')
开发者ID:Nicolepcx,项目名称:storlets,代码行数:15,代码来源:test_thumbnail_storlet.py
示例18: invoke_storlet_on_put
def invoke_storlet_on_put(self):
headers = {'X-Run-Storlet': self.storlet_name}
resp = dict()
source_file = '%s/%s' % (self.path_to_bundle, self.storlet_file)
with open(source_file, 'r') as f:
c.put_object(self.url, self.token,
'myobjects', 'gen_thumb_on_put.jpg', f,
headers=headers,
response_dict=resp)
status = resp.get('status')
self.assertTrue(status in [201, 202])
headers = c.head_object(self.url, self.token,
'myobjects', 'gen_thumb_on_put.jpg')
self.assertEqual(headers['content-length'], '49032')
开发者ID:Nicolepcx,项目名称:storlets,代码行数:16,代码来源:test_thumbnail_storlet.py
示例19: _get_video_headers
def _get_video_headers(storage_url, auth_token, container, object_name):
try:
header = client.head_object(storage_url, auth_token, container, object_name)
new_header = {'X-Object-Meta-Deleted': header['x-object-meta-deleted'],
'X-Object-Meta-Format': header['x-object-meta-format'],
'X-Object-Meta-Resolution': header['x-object-meta-resolution'],
'X-Object-Meta-Name': header['x-object-meta-name'],
'X-Object-Meta-Type': header['x-object-meta-type']
}
# if 'x-delete-after' in header:
# new_header['X-Delete-After'] = header['x-delete-after']
return new_header
except swift_exception.ClientException as e:
raise e
except Exception as e:
raise e
开发者ID:InBetween,项目名称:AssetManager,代码行数:16,代码来源:views.py
示例20: test_update_during_POST_only
def test_update_during_POST_only(self):
# verify correct update values when PUT update is missed but then a
# POST update succeeds *before* the PUT async pending update is sent
cpart, cnodes = self.container_ring.get_nodes(self.account, 'c1')
client.put_container(self.url, self.token, 'c1',
headers={'X-Storage-Policy':
self.policy.name})
# put an object while one container server is stopped so that we force
# an async update to it
kill_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
content = u'stuff'
client.put_object(self.url, self.token, 'c1', 'o1', contents=content)
meta = client.head_object(self.url, self.token, 'c1', 'o1')
# re-start the container server and assert that it does not yet know
# about the object
start_server((cnodes[0]['ip'], cnodes[0]['port']), self.ipport2server)
self.assertFalse(direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1])
# use internal client for POST so we can force fast-post mode
int_client = self.make_internal_client(object_post_as_copy=False)
int_client.set_object_metadata(
self.account, 'c1', 'o1', {'X-Object-Meta-Fruit': 'Tomato'})
self.assertEqual(
'Tomato',
int_client.get_object_metadata(self.account, 'c1', 'o1')
['x-object-meta-fruit']) # sanity
# check the re-started container server has update with override values
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
# Run the object-updaters to send the async pending from the PUT
Manager(['object-updater']).once()
# check container listing metadata is still correct
obj = direct_client.direct_get_container(
cnodes[0], cpart, self.account, 'c1')[1][0]
self.assertEqual(meta['etag'], obj['hash'])
self.assertEqual(len(content), obj['bytes'])
开发者ID:ISCAS-VDI,项目名称:swift-base,代码行数:44,代码来源:test_object_async_update.py
注:本文中的swiftclient.client.head_object函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论