本文整理汇总了Python中tempest.common.utils.data_utils.arbitrary_string函数的典型用法代码示例。如果您正苦于以下问题:Python arbitrary_string函数的具体用法?Python arbitrary_string怎么用?Python arbitrary_string使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了arbitrary_string函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_copy_object_2d_way
def test_copy_object_2d_way(self):
# create source object
src_object_name = data_utils.rand_name(name='SrcObject')
src_data = data_utils.arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name, src_data)
# create destination object
dst_object_name = data_utils.rand_name(name='DstObject')
dst_data = data_utils.arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name, dst_data)
# copy source object to destination
resp, _ = self.object_client.copy_object_2d_way(self.container_name,
src_object_name,
dst_object_name)
self.assertEqual(resp['status'], '201')
self.assertHeaders(resp, 'Object', 'COPY')
self.assertIn('last-modified', resp)
self.assertIn('x-copied-from', resp)
self.assertIn('x-copied-from-last-modified', resp)
self.assertNotEqual(len(resp['last-modified']), 0)
self.assertEqual(
resp['x-copied-from'],
self.container_name + "/" + src_object_name)
self.assertNotEqual(len(resp['x-copied-from-last-modified']), 0)
# check data
resp, body = self.object_client.get_object(self.container_name,
dst_object_name)
self.assertEqual(body, src_data)
开发者ID:NetApp,项目名称:tempest,代码行数:33,代码来源:test_object_services.py
示例2: test_copy_object_2d_way
def test_copy_object_2d_way(self):
# create source object
src_object_name = data_utils.rand_name(name='SrcObject')
src_data = data_utils.arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name, src_data)
# create destination object
dst_object_name = data_utils.rand_name(name='DstObject')
dst_data = data_utils.arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name, dst_data)
# copy source object to destination
resp, _ = self.object_client.copy_object_2d_way(self.container_name,
src_object_name,
dst_object_name)
self.assertHeaders(resp, 'Object', 'COPY')
#Bug 1417469
#self.assertEqual(
# resp['x-copied-from'],
# self.container_name + "/" + src_object_name)
# check data
self._check_copied_obj(dst_object_name, src_data)
开发者ID:varunarya10,项目名称:tempest,代码行数:25,代码来源:test_object_services.py
示例3: test_arbitrary_string
def test_arbitrary_string(self):
actual = data_utils.arbitrary_string()
self.assertEqual(actual, "test")
actual = data_utils.arbitrary_string(size=30, base_text="abc")
self.assertEqual(actual, "abc" * (30 / len("abc")))
actual = data_utils.arbitrary_string(size=5, base_text="deadbeaf")
self.assertEqual(actual, "deadb")
开发者ID:AminaMseddi,项目名称:tempest,代码行数:7,代码来源:test_data_utils.py
示例4: test_copy_object_in_same_container
def test_copy_object_in_same_container(self):
# create source object
src_object_name = data_utils.rand_name(name='SrcObject')
src_data = data_utils.arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name,
src_data)
# create destination object
dst_object_name = data_utils.rand_name(name='DstObject')
dst_data = data_utils.arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name,
dst_data)
# copy source object to destination
resp, _ = self.object_client.copy_object_in_same_container(
self.container_name, src_object_name, dst_object_name)
self.assertEqual(resp['status'], '201')
self.assertHeaders(resp, 'Object', 'PUT')
# check data
resp, body = self.object_client.get_object(self.container_name,
dst_object_name)
self.assertEqual(body, src_data)
开发者ID:NetApp,项目名称:tempest,代码行数:25,代码来源:test_object_services.py
示例5: test_copy_object_2d_way
def test_copy_object_2d_way(self):
# Copy storage object
# Create source Object
src_object_name = rand_name(name='SrcObject')
src_data = arbitrary_string(size=len(src_object_name) * 2,
base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name,
src_object_name, src_data)
# Create destination Object
dst_object_name = rand_name(name='DstObject')
dst_data = arbitrary_string(size=len(dst_object_name) * 3,
base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name,
dst_object_name, dst_data)
# Copy source object to destination
resp, _ = self.object_client.copy_object_2d_way(self.container_name,
src_object_name,
dst_object_name)
self.assertEqual(resp['status'], '201')
# Check data
resp, body = self.object_client.get_object(self.container_name,
dst_object_name)
self.assertEqual(body, src_data)
开发者ID:TimurNurlygayanov,项目名称:tempest,代码行数:27,代码来源:test_object_services.py
示例6: test_create_object
def test_create_object(self):
# create object
object_name = rand_name(name="TestObject")
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name, object_name, data)
# create another object
object_name = rand_name(name="TestObject")
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name, object_name, data)
self.assertEqual(resp["status"], "201")
开发者ID:john5223,项目名称:tempest,代码行数:10,代码来源:test_object_services.py
示例7: generate_message_body
def generate_message_body(cls, repeat=1):
'''Wrapper utility that sets the metadata of a queue.'''
message_ttl = data_utils.rand_int_id(start=60,
end=CONF.queuing.max_message_ttl)
key = data_utils.arbitrary_string(size=20, base_text='QueuingKey')
value = data_utils.arbitrary_string(size=20, base_text='QueuingValue')
message_body = {key: value}
rbody = ([{'body': message_body, 'ttl': message_ttl}] * repeat)
return rbody
开发者ID:armando-migliaccio,项目名称:tempest-1,代码行数:11,代码来源:base.py
示例8: test_create_object
def test_create_object(self):
# create object
object_name = data_utils.rand_name(name='TestObject')
data = data_utils.arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
# create another object
object_name = data_utils.rand_name(name='TestObject')
data = data_utils.arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
self.assertEqual(resp['status'], '201')
self.assertHeaders(resp, 'Object', 'PUT')
开发者ID:NetApp,项目名称:tempest,代码行数:13,代码来源:test_object_services.py
示例9: test_copy_object_2d_way
def test_copy_object_2d_way(self):
# create source object
src_object_name = rand_name(name="SrcObject")
src_data = arbitrary_string(size=len(src_object_name) * 2, base_text=src_object_name)
resp, _ = self.object_client.create_object(self.container_name, src_object_name, src_data)
# create destination object
dst_object_name = rand_name(name="DstObject")
dst_data = arbitrary_string(size=len(dst_object_name) * 3, base_text=dst_object_name)
resp, _ = self.object_client.create_object(self.container_name, dst_object_name, dst_data)
# copy source object to destination
resp, _ = self.object_client.copy_object_2d_way(self.container_name, src_object_name, dst_object_name)
self.assertEqual(resp["status"], "201")
# check data
resp, body = self.object_client.get_object(self.container_name, dst_object_name)
self.assertEqual(body, src_data)
开发者ID:john5223,项目名称:tempest,代码行数:15,代码来源:test_object_services.py
示例10: test_create_object
def test_create_object(self):
# Create storage object, test response
#Create Object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
#Create another Object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
self.assertEqual(resp['status'], '201')
开发者ID:TimurNurlygayanov,项目名称:tempest,代码行数:15,代码来源:test_object_services.py
示例11: test_list_container_contents_json
def test_list_container_contents_json(self):
# add metadata to an object
# create a container
container_name = rand_name(name='TestContainer')
resp, _ = self.container_client.create_container(container_name)
self.containers.append(container_name)
# create object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(container_name,
object_name, data)
# set object metadata
meta_key = rand_name(name='Meta-Test-')
meta_value = rand_name(name='MetaValue-')
orig_metadata = {meta_key: meta_value}
resp, _ = self.object_client.update_object_metadata(container_name,
object_name,
orig_metadata)
# get container contents list
params = {'format': 'json'}
resp, object_list = \
self.container_client.\
list_container_contents(container_name, params=params)
self.assertEqual(resp['status'], '200')
self.assertIsNotNone(object_list)
object_names = [obj['name'] for obj in object_list]
self.assertIn(object_name, object_names)
开发者ID:davidtao1983,项目名称:tempest,代码行数:29,代码来源:test_container_services.py
示例12: test_web_error
def test_web_error(self):
headers = {'web-listings': 'true',
'web-error': self.object_name}
self.container_client.update_container_metadata(
self.container_name, metadata=headers)
# Create object to return when requested object not found
object_name_404 = "404" + self.object_name
object_data_404 = data_utils.arbitrary_string()
self.object_client.create_object(self.container_name,
object_name_404,
object_data_404)
# Do not set auth in HTTP headers for next request
self.custom_object_client.auth_provider.set_alt_auth_data(
request_part='headers',
auth_data=None
)
# Request non-existing object
resp, body = self.custom_object_client.get_object(self.container_name,
"notexisting")
self.assertEqual(resp['status'], '404')
self.assertEqual(body, object_data_404)
开发者ID:cameron-r,项目名称:tempest-configuration,代码行数:25,代码来源:test_container_staticweb.py
示例13: test_get_object_after_expiry_time
def test_get_object_after_expiry_time(self):
# GET object after expiry time
#TODO(harika-vakadi): Similar test case has to be created for
# "X-Delete-At", after this test case works.
#Create Object
object_name = rand_name(name='TestObject')
data = arbitrary_string()
resp, _ = self.object_client.create_object(self.container_name,
object_name, data)
#Update object metadata with expiry time of 3 seconds
metadata = {'X-Delete-After': '3'}
resp, _ = \
self.object_client.update_object_metadata(self.container_name,
object_name, metadata,
metadata_prefix='')
resp, _ = \
self.object_client.list_object_metadata(self.container_name,
object_name)
self.assertEqual(resp['status'], '200')
self.assertIn('x-delete-at', resp)
resp, body = self.object_client.get_object(self.container_name,
object_name)
self.assertEqual(resp['status'], '200')
# Check data
self.assertEqual(body, data)
# Sleep for over 5 seconds, so that object is expired
sleep(5)
# Verification of raised exception after object gets expired
self.assertRaises(exceptions.NotFound, self.object_client.get_object,
self.container_name, object_name)
开发者ID:dani4571,项目名称:tempest,代码行数:35,代码来源:test_object_expiry.py
示例14: setUpClass
def setUpClass(cls):
super(ObjectTempUrlTest, cls).setUpClass()
# skip this test if TempUrl isn't enabled in the conf file.
if not cls.tempurl_available:
skip_msg = ("%s skipped as TempUrl middleware not available"
% cls.__name__)
raise cls.skipException(skip_msg)
# create a container
cls.container_name = data_utils.rand_name(name='TestContainer')
cls.container_client.create_container(cls.container_name)
cls.containers = [cls.container_name]
# update account metadata
cls.key = 'Meta'
cls.metadatas = []
cls.metadata = {'Temp-URL-Key': cls.key}
cls.metadatas.append(cls.metadata)
cls.account_client.create_account_metadata(metadata=cls.metadata)
# create an object
cls.object_name = data_utils.rand_name(name='ObjectTemp')
cls.content = data_utils.arbitrary_string(size=len(cls.object_name),
base_text=cls.object_name)
cls.object_client.create_object(cls.container_name,
cls.object_name, cls.content)
开发者ID:ironbits,项目名称:tempest,代码行数:27,代码来源:test_object_temp_url.py
示例15: test_put_object_using_temp_url
def test_put_object_using_temp_url(self):
new_data = data_utils.arbitrary_string(
size=len(self.object_name),
base_text=data_utils.rand_name(name="random"))
expires = self._get_expiry_date()
url = self._get_temp_url(self.container_name,
self.object_name, "PUT",
expires, self.key)
# trying to put random data in the object using temp url
resp, body = self.object_client.put(url, new_data, None)
self.assertHeaders(resp, 'Object', 'PUT')
# Testing a HEAD on this Temp URL
resp, body = self.object_client.head(url)
self.assertHeaders(resp, 'Object', 'HEAD')
# Validate that the content of the object has been modified
url = self._get_temp_url(self.container_name,
self.object_name, "GET",
expires, self.key)
_, body = self.object_client.get(url)
self.assertEqual(body, new_data)
开发者ID:CiscoSystems,项目名称:tempest,代码行数:25,代码来源:test_object_temp_url.py
示例16: _create_manifest
def _create_manifest(self):
# Create a manifest file for SLO uploading
object_name = data_utils.rand_name(name='TestObject')
object_name_base_1 = object_name + '_01'
object_name_base_2 = object_name + '_02'
data_size = MIN_SEGMENT_SIZE
self.content = data_utils.arbitrary_string(data_size)
self._create_object(self.container_name,
object_name_base_1,
self.content)
self._create_object(self.container_name,
object_name_base_2,
self.content)
path_object_1 = '/%s/%s' % (self.container_name,
object_name_base_1)
path_object_2 = '/%s/%s' % (self.container_name,
object_name_base_2)
data_manifest = [{'path': path_object_1,
'etag': hashlib.md5(self.content).hexdigest(),
'size_bytes': data_size},
{'path': path_object_2,
'etag': hashlib.md5(self.content).hexdigest(),
'size_bytes': data_size}]
return json.dumps(data_manifest)
开发者ID:Hybrid-Cloud,项目名称:hybrid-tempest,代码行数:26,代码来源:test_object_slo.py
示例17: test_put_object_using_temp_url
def test_put_object_using_temp_url(self):
# make sure the metadata has been set
new_data = data_utils.arbitrary_string(
size=len(self.object_name),
base_text=data_utils.rand_name(name="random"))
expires = self._get_expiry_date()
url = self._get_temp_url(self.container_name,
self.object_name, "PUT",
expires, self.key)
# trying to put random data in the object using temp url
resp, body = self.object_client.put_object_using_temp_url(
url, new_data)
self.assertIn(int(resp['status']), HTTP_SUCCESS)
# Testing a HEAD on this Temp URL
resp, body = self.object_client.head(url)
self.assertIn(int(resp['status']), HTTP_SUCCESS)
# Validate that the content of the object has been modified
url = self._get_temp_url(self.container_name,
self.object_name, "GET",
expires, self.key)
_, body = self.object_client.get_object_using_temp_url(url)
self.assertEqual(body, new_data)
开发者ID:bowzoo,项目名称:tempest-1,代码行数:28,代码来源:test_object_temp_url.py
示例18: test_copy_object_across_containers
def test_copy_object_across_containers(self):
# create a container to use as asource container
src_container_name = data_utils.rand_name(name="TestSourceContainer")
self.container_client.create_container(src_container_name)
self.containers.append(src_container_name)
# create a container to use as a destination container
dst_container_name = data_utils.rand_name(name="TestDestinationContainer")
self.container_client.create_container(dst_container_name)
self.containers.append(dst_container_name)
# create object in source container
object_name = data_utils.rand_name(name="Object")
data = data_utils.arbitrary_string(size=len(object_name) * 2, base_text=object_name)
resp, _ = self.object_client.create_object(src_container_name, object_name, data)
# set object metadata
meta_key = data_utils.rand_name(name="test-")
meta_value = data_utils.rand_name(name="MetaValue-")
orig_metadata = {meta_key: meta_value}
resp, _ = self.object_client.update_object_metadata(src_container_name, object_name, orig_metadata)
self.assertIn(int(resp["status"]), HTTP_SUCCESS)
self.assertHeaders(resp, "Object", "POST")
# copy object from source container to destination container
resp, _ = self.object_client.copy_object_across_containers(
src_container_name, object_name, dst_container_name, object_name
)
self.assertEqual(resp["status"], "201")
self.assertHeaders(resp, "Object", "PUT")
# check if object is present in destination container
resp, body = self.object_client.get_object(dst_container_name, object_name)
self.assertEqual(body, data)
actual_meta_key = "x-object-meta-" + meta_key
self.assertIn(actual_meta_key, resp)
self.assertEqual(resp[actual_meta_key], meta_value)
开发者ID:Grindizer,项目名称:tempest,代码行数:34,代码来源:test_object_services.py
示例19: test_access_public_container_object_without_using_creds
def test_access_public_container_object_without_using_creds(self):
# make container public-readable and access an object in it object
# anonymously, without using credentials
# update container metadata to make it publicly readable
cont_headers = {"X-Container-Read": ".r:*,.rlistings"}
resp_meta, body = self.container_client.update_container_metadata(
self.container_name, metadata=cont_headers, metadata_prefix=""
)
self.assertIn(int(resp_meta["status"]), HTTP_SUCCESS)
self.assertHeaders(resp_meta, "Container", "POST")
# create object
object_name = data_utils.rand_name(name="Object")
data = data_utils.arbitrary_string(size=len(object_name), base_text=object_name)
resp, _ = self.object_client.create_object(self.container_name, object_name, data)
self.assertEqual(resp["status"], "201")
self.assertHeaders(resp, "Object", "PUT")
# list container metadata
resp_meta, _ = self.container_client.list_container_metadata(self.container_name)
self.assertIn(int(resp_meta["status"]), HTTP_SUCCESS)
self.assertHeaders(resp_meta, "Container", "HEAD")
self.assertIn("x-container-read", resp_meta)
self.assertEqual(resp_meta["x-container-read"], ".r:*,.rlistings")
# trying to get object with empty headers as it is public readable
resp, body = self.custom_object_client.get_object(self.container_name, object_name, metadata={})
self.assertHeaders(resp, "Object", "GET")
self.assertEqual(body, data)
开发者ID:Grindizer,项目名称:tempest,代码行数:32,代码来源:test_object_services.py
示例20: test_update_object_metadata_with_create_and_remove_metadata
def test_update_object_metadata_with_create_and_remove_metadata(self):
# creation and deletion of metadata with one request
object_name = data_utils.rand_name(name='TestObject')
data = data_utils.arbitrary_string()
create_metadata = {'X-Object-Meta-test-meta1': 'Meta1'}
self.object_client.create_object(self.container_name,
object_name,
data,
metadata=create_metadata)
update_metadata = {'X-Object-Meta-test-meta2': 'Meta2',
'X-Remove-Object-Meta-test-meta1': 'Meta1'}
resp, _ = self.object_client.update_object_metadata(
self.container_name,
object_name,
update_metadata,
metadata_prefix='')
self.assertHeaders(resp, 'Object', 'POST')
resp, _ = self.object_client.list_object_metadata(
self.container_name,
object_name)
self.assertNotIn('x-object-meta-test-meta1', resp)
self.assertIn('x-object-meta-test-meta2', resp)
self.assertEqual(resp['x-object-meta-test-meta2'], 'Meta2')
开发者ID:flyingfish007,项目名称:tempest,代码行数:25,代码来源:test_object_services.py
注:本文中的tempest.common.utils.data_utils.arbitrary_string函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论