本文整理汇总了Python中swift.common.swob.Request类的典型用法代码示例。如果您正苦于以下问题:Python Request类的具体用法?Python Request怎么用?Python Request使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Request类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_bucket_GET_v2_with_delimiter_max_keys
def test_bucket_GET_v2_with_delimiter_max_keys(self):
bucket_name = 'junk'
req = Request.blank(
'/%s?list-type=2&delimiter=a&max-keys=2' % bucket_name,
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body, 'ListBucketResult')
next_token = elem.find('./NextContinuationToken')
self.assertIsNotNone(next_token)
self.assertEqual(elem.find('./MaxKeys').text, '2')
self.assertEqual(elem.find('./IsTruncated').text, 'true')
req = Request.blank(
'/%s?list-type=2&delimiter=a&max-keys=2&continuation-token=%s' %
(bucket_name, next_token.text),
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body, 'ListBucketResult')
names = [o.find('./Key').text for o in elem.iterchildren('Contents')]
self.assertEqual(names[0], 'lily')
开发者ID:jgmerritt,项目名称:swift,代码行数:26,代码来源:test_bucket.py
示例2: test_policy_index
def test_policy_index(self):
# Policy index can be specified by X-Backend-Storage-Policy-Index
# in the request header for object API
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx='1'), {})
app.access_logger = FakeLogger()
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'PUT'})
resp = app(req.environ, start_response)
''.join(resp)
log_parts = self._log_parts(app)
self.assertEquals(log_parts[20], '1')
# Policy index can be specified by X-Backend-Storage-Policy-Index
# in the response header for container API
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
app.access_logger = FakeLogger()
req = Request.blank('/v1/a/c', environ={'REQUEST_METHOD': 'GET'})
def fake_call(app, env, start_response):
start_response(app.response_str,
[('Content-Type', 'text/plain'),
('Content-Length', str(sum(map(len, app.body)))),
('X-Backend-Storage-Policy-Index', '1')])
while env['wsgi.input'].read(5):
pass
return app.body
with mock.patch.object(FakeApp, '__call__', fake_call):
resp = app(req.environ, start_response)
''.join(resp)
log_parts = self._log_parts(app)
self.assertEquals(log_parts[20], '1')
开发者ID:hbhdytf,项目名称:mac,代码行数:31,代码来源:test_proxy_logging.py
示例3: test_extract_tar_works
def test_extract_tar_works(self):
# On systems where $TMPDIR is long (like OS X), we need to do this
# or else every upload will fail due to the path being too long.
self.app.max_pathlen += len(self.testdir)
for compress_format in ['', 'gz', 'bz2']:
base_name = 'base_works_%s' % compress_format
dir_tree = [
{base_name: [{'sub_dir1': ['sub1_file1', 'sub1_file2']},
{'sub_dir2': ['sub2_file1', u'test obj \u2661']},
'sub_file1',
{'sub_dir3': [{'sub4_dir1': '../sub4 file1'}]},
{'sub_dir4': None},
]}]
build_dir_tree(self.testdir, dir_tree)
mode = 'w'
extension = ''
if compress_format:
mode += ':' + compress_format
extension += '.' + compress_format
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar' + extension),
mode=mode)
tar.add(os.path.join(self.testdir, base_name))
tar.close()
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp_body = self.handle_extract_and_iter(req, compress_format)
resp_data = utils.json.loads(resp_body)
self.assertEquals(resp_data['Number Files Created'], 6)
# test out xml
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp_body = self.handle_extract_and_iter(
req, compress_format, 'application/xml')
self.assert_('<response_status>201 Created</response_status>' in
resp_body)
self.assert_('<number_files_created>6</number_files_created>' in
resp_body)
# test out nonexistent format
req = Request.blank('/tar_works/acc/cont/?extract-archive=tar',
headers={'Accept': 'good_xml'})
req.environ['REQUEST_METHOD'] = 'PUT'
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
def fake_start_response(*args, **kwargs):
pass
app_iter = self.bulk(req.environ, fake_start_response)
resp_body = ''.join([i for i in app_iter])
self.assert_('Response Status: 406' in resp_body)
开发者ID:2015-ucsc-hp,项目名称:swift,代码行数:60,代码来源:test_bulk.py
示例4: test_proxy_client_logging
def test_proxy_client_logging(self):
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
app.access_logger = FakeLogger()
req = Request.blank(
"/",
environ={"REQUEST_METHOD": "GET", "REMOTE_ADDR": "1.2.3.4", "HTTP_X_FORWARDED_FOR": "4.5.6.7,8.9.10.11"},
)
resp = app(req.environ, start_response)
# exhaust generator
[x for x in resp]
log_parts = self._log_parts(app)
self.assertEquals(log_parts[0], "4.5.6.7") # client ip
self.assertEquals(log_parts[1], "1.2.3.4") # remote addr
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
app.access_logger = FakeLogger()
req = Request.blank(
"/", environ={"REQUEST_METHOD": "GET", "REMOTE_ADDR": "1.2.3.4", "HTTP_X_CLUSTER_CLIENT_IP": "4.5.6.7"}
)
resp = app(req.environ, start_response)
# exhaust generator
[x for x in resp]
log_parts = self._log_parts(app)
self.assertEquals(log_parts[0], "4.5.6.7") # client ip
self.assertEquals(log_parts[1], "1.2.3.4") # remote addr
开发者ID:JioCloud,项目名称:swift,代码行数:25,代码来源:test_proxy_logging.py
示例5: __call__
def __call__(self, env, start_response):
# If override is set in env, then just pass along
if config_true_value(env.get('swift.crypto.override')):
return self.app(env, start_response)
req = Request(env)
if self.disable_encryption and req.method in ('PUT', 'POST'):
return self.app(env, start_response)
try:
req.split_path(4, 4, True)
except ValueError:
return self.app(env, start_response)
if req.method in ('GET', 'HEAD'):
handler = EncrypterObjContext(self, self.logger).handle_get_or_head
elif req.method == 'PUT':
handler = EncrypterObjContext(self, self.logger).handle_put
elif req.method == 'POST':
handler = EncrypterObjContext(self, self.logger).handle_post
else:
# anything else
return self.app(env, start_response)
try:
return handler(req, start_response)
except HTTPException as err_resp:
return err_resp(env, start_response)
开发者ID:jgmerritt,项目名称:swift,代码行数:28,代码来源:encrypter.py
示例6: test_check_object_creation_copy
def test_check_object_creation_copy(self):
headers = {'Content-Length': '0',
'X-Copy-From': 'c/o2',
'Content-Type': 'text/plain'}
self.assertEquals(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name'), None)
headers = {'Content-Length': '1',
'X-Copy-From': 'c/o2',
'Content-Type': 'text/plain'}
self.assertEquals(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name').status_int,
HTTP_BAD_REQUEST)
headers = {'Transfer-Encoding': 'chunked',
'X-Copy-From': 'c/o2',
'Content-Type': 'text/plain'}
self.assertEquals(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name'), None)
# a content-length header is always required
headers = {'X-Copy-From': 'c/o2',
'Content-Type': 'text/plain'}
self.assertEquals(constraints.check_object_creation(Request.blank(
'/', headers=headers), 'object_name').status_int,
HTTP_LENGTH_REQUIRED)
开发者ID:BReduardokramer,项目名称:swift,代码行数:26,代码来源:test_constraints.py
示例7: test_to_swift_req_subrequest_proxy_access_log
def test_to_swift_req_subrequest_proxy_access_log(self):
container = 'bucket'
obj = 'obj'
method = 'GET'
# force_swift_request_proxy_log is True
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=True)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertFalse(sw_req.environ['swift.proxy_access_log_made'])
# force_swift_request_proxy_log is False
req = Request.blank('/%s/%s' % (container, obj),
environ={'REQUEST_METHOD': method,
'swift.proxy_access_log_made': True},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
with patch.object(Request, 'get_response') as m_swift_resp, \
patch.object(Request, 'remote_user', 'authorized'):
m_swift_resp.return_value = FakeSwiftResponse()
s3_req = S3AclRequest(
req.environ, MagicMock(), force_request_log=False)
sw_req = s3_req.to_swift_req(method, container, obj)
self.assertTrue(sw_req.environ['swift.proxy_access_log_made'])
开发者ID:mahak,项目名称:swift,代码行数:32,代码来源:test_s3request.py
示例8: create_container
def create_container(self, req, container_path):
"""
Checks if the container exists and if not try to create it.
:params container_path: an unquoted path to a container to be created
:returns: True if created container, False if container exists
:raises: CreateContainerError when unable to create container
"""
new_env = req.environ.copy()
new_env['PATH_INFO'] = container_path
new_env['swift.source'] = 'EA'
new_env['REQUEST_METHOD'] = 'HEAD'
head_cont_req = Request.blank(container_path, environ=new_env)
resp = head_cont_req.get_response(self.app)
if resp.is_success:
return False
if resp.status_int == 404:
new_env = req.environ.copy()
new_env['PATH_INFO'] = container_path
new_env['swift.source'] = 'EA'
new_env['REQUEST_METHOD'] = 'PUT'
create_cont_req = Request.blank(container_path, environ=new_env)
resp = create_cont_req.get_response(self.app)
if resp.is_success:
return True
raise CreateContainerError(
"Create Container Failed: " + container_path,
resp.status_int, resp.status)
开发者ID:hbhdytf,项目名称:mac,代码行数:27,代码来源:bulk.py
示例9: test_upload_size
def test_upload_size(self):
# Using default policy
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {"log_headers": "yes"})
app.access_logger = FakeLogger()
req = Request.blank("/v1/a/c/o/foo", environ={"REQUEST_METHOD": "PUT", "wsgi.input": BytesIO(b"some stuff")})
resp = app(req.environ, start_response)
# exhaust generator
[x for x in resp]
log_parts = self._log_parts(app)
self.assertEqual(log_parts[11], str(len("FAKE APP")))
self.assertEqual(log_parts[10], str(len("some stuff")))
self.assertUpdateStats(
[
("object.PUT.200.xfer", len("some stuff") + len("FAKE APP")),
("object.policy.0.PUT.200.xfer", len("some stuff") + len("FAKE APP")),
],
app,
)
# Using a non-existent policy
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx="-1"), {"log_headers": "yes"})
app.access_logger = FakeLogger()
req = Request.blank("/v1/a/c/o/foo", environ={"REQUEST_METHOD": "PUT", "wsgi.input": BytesIO(b"some stuff")})
resp = app(req.environ, start_response)
# exhaust generator
[x for x in resp]
log_parts = self._log_parts(app)
self.assertEqual(log_parts[11], str(len("FAKE APP")))
self.assertEqual(log_parts[10], str(len("some stuff")))
self.assertUpdateStats([("object.PUT.200.xfer", len("some stuff") + len("FAKE APP"))], app)
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:30,代码来源:test_proxy_logging.py
示例10: test_PUT_encryption_override
def test_PUT_encryption_override(self):
# set crypto override to disable encryption.
# simulate another middleware wanting to set footers
other_footers = {
"Etag": "other etag",
"X-Object-Sysmeta-Other": "other sysmeta",
"X-Object-Sysmeta-Container-Update-Override-Etag": "other override",
}
body = "FAKE APP"
env = {
"REQUEST_METHOD": "PUT",
"swift.crypto.override": True,
"swift.callback.update_footers": lambda footers: footers.update(other_footers),
}
hdrs = {"content-type": "text/plain", "content-length": str(len(body))}
req = Request.blank("/v1/a/c/o", environ=env, body=body, headers=hdrs)
self.app.register("PUT", "/v1/a/c/o", HTTPCreated, {})
resp = req.get_response(self.encrypter)
self.assertEqual("201 Created", resp.status)
# verify that other middleware's footers made it to app
req_hdrs = self.app.headers[0]
for k, v in other_footers.items():
self.assertEqual(v, req_hdrs[k])
# verify object is NOT encrypted by getting direct from the app
get_req = Request.blank("/v1/a/c/o", environ={"REQUEST_METHOD": "GET"})
self.assertEqual(body, get_req.get_response(self.app).body)
开发者ID:notmyname,项目名称:swift,代码行数:28,代码来源:test_encrypter.py
示例11: test_headers_to_sign_sigv4
def test_headers_to_sign_sigv4(self):
environ = {
'REQUEST_METHOD': 'GET'}
# host and x-amz-date
x_amz_date = self.get_v4_amz_date_header()
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header(),
'X-Amz-Date': x_amz_date}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(['host', 'x-amz-content-sha256', 'x-amz-date'],
sorted(headers_to_sign.keys()))
self.assertEqual(headers_to_sign['host'], 'localhost:80')
self.assertEqual(headers_to_sign['x-amz-date'], x_amz_date)
self.assertEqual(headers_to_sign['x-amz-content-sha256'], '0123456789')
# no x-amz-date
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
sigv4_req = SigV4Request(req.environ)
headers_to_sign = sigv4_req._headers_to_sign()
self.assertEqual(['host', 'x-amz-content-sha256'],
sorted(headers_to_sign.keys()))
self.assertEqual(headers_to_sign['host'], 'localhost:80')
self.assertEqual(headers_to_sign['x-amz-content-sha256'], '0123456789')
# SignedHeaders says, host and x-amz-date included but there is not
# X-Amz-Date header
headers = {
'Authorization':
'AWS4-HMAC-SHA256 '
'Credential=test/20130524/US/s3/aws4_request, '
'SignedHeaders=host;x-amz-content-sha256;x-amz-date,'
'Signature=X',
'X-Amz-Content-SHA256': '0123456789',
'Date': self.get_date_header()}
req = Request.blank('/', environ=environ, headers=headers)
with self.assertRaises(SignatureDoesNotMatch):
sigv4_req = SigV4Request(req.environ)
sigv4_req._headers_to_sign()
开发者ID:notmyname,项目名称:swift3,代码行数:60,代码来源:test_request.py
示例12: test_sysmeta_replaced_by_PUT
def test_sysmeta_replaced_by_PUT(self):
path = '/v1/a/c/o'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.original_sysmeta_headers_1)
hdrs.update(self.original_sysmeta_headers_2)
hdrs.update(self.original_meta_headers_1)
hdrs.update(self.original_meta_headers_2)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
env = {'REQUEST_METHOD': 'PUT'}
hdrs = dict(self.changed_sysmeta_headers)
hdrs.update(self.new_sysmeta_headers)
hdrs.update(self.changed_meta_headers)
hdrs.update(self.new_meta_headers)
hdrs.update(self.bad_headers)
req = Request.blank(path, environ=env, headers=hdrs, body='x')
resp = req.get_response(self.app)
self._assertStatus(resp, 201)
req = Request.blank(path, environ={})
resp = req.get_response(self.app)
self._assertStatus(resp, 200)
self._assertInHeaders(resp, self.changed_sysmeta_headers)
self._assertInHeaders(resp, self.new_sysmeta_headers)
self._assertNotInHeaders(resp, self.original_sysmeta_headers_2)
self._assertInHeaders(resp, self.changed_meta_headers)
self._assertInHeaders(resp, self.new_meta_headers)
self._assertNotInHeaders(resp, self.original_meta_headers_2)
开发者ID:igusher,项目名称:swift,代码行数:31,代码来源:test_sysmeta.py
示例13: test_bucket_PUT
def test_bucket_PUT(self):
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, '')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
# Apparently some clients will include a chunked transfer-encoding
# even with no body
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Transfer-Encoding': 'chunked'})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, '')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT',
'wsgi.input': fake_input},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, '')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
开发者ID:jgmerritt,项目名称:swift,代码行数:33,代码来源:test_bucket.py
示例14: test_bucket_GET_v2_fetch_owner
def test_bucket_GET_v2_fetch_owner(self):
bucket_name = 'junk'
req = Request.blank('/%s?list-type=2' % bucket_name,
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body, 'ListBucketResult')
name = elem.find('./Name').text
self.assertEqual(name, bucket_name)
objects = elem.iterchildren('Contents')
for o in objects:
self.assertIsNone(o.find('./Owner'))
req = Request.blank('/%s?list-type=2&fetch-owner=true' % bucket_name,
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body, 'ListBucketResult')
name = elem.find('./Name').text
self.assertEqual(name, bucket_name)
objects = elem.iterchildren('Contents')
for o in objects:
self.assertIsNotNone(o.find('./Owner'))
开发者ID:jgmerritt,项目名称:swift,代码行数:31,代码来源:test_bucket.py
示例15: test_handle_multipart_put_bad_data
def test_handle_multipart_put_bad_data(self):
bad_data = json.dumps([{"path": "/cont/object", "etag": "etagoftheobj", "size_bytes": "lala"}])
req = Request.blank(
"/test_good/AUTH_test/c/man?multipart-manifest=put", environ={"REQUEST_METHOD": "PUT"}, body=bad_data
)
self.assertRaises(HTTPException, self.slo.handle_multipart_put, req)
for bad_data in [
json.dumps([{"path": "/cont", "etag": "etagoftheobj", "size_bytes": 100}]),
json.dumps("asdf"),
json.dumps(None),
json.dumps(5),
"not json",
"1234",
None,
"",
json.dumps({"path": None}),
json.dumps([{"path": "/c/o", "etag": None, "size_bytes": 12}]),
json.dumps([{"path": "/c/o", "etag": "asdf", "size_bytes": "sd"}]),
json.dumps([{"path": 12, "etag": "etagoftheobj", "size_bytes": 100}]),
json.dumps([{"path": u"/cont/object\u2661", "etag": "etagoftheobj", "size_bytes": 100}]),
json.dumps([{"path": 12, "size_bytes": 100}]),
json.dumps([{"path": 12, "size_bytes": 100}]),
json.dumps([{"path": None, "etag": "etagoftheobj", "size_bytes": 100}]),
]:
req = Request.blank(
"/test_good/AUTH_test/c/man?multipart-manifest=put", environ={"REQUEST_METHOD": "PUT"}, body=bad_data
)
self.assertRaises(HTTPException, self.slo.handle_multipart_put, req)
开发者ID:mygoda,项目名称:openstack,代码行数:29,代码来源:test_slo.py
示例16: test_policy_index
def test_policy_index(self):
# Policy index can be specified by X-Backend-Storage-Policy-Index
# in the request header for object API
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(policy_idx="1"), {})
app.access_logger = FakeLogger()
req = Request.blank("/v1/a/c/o", environ={"REQUEST_METHOD": "PUT"})
resp = app(req.environ, start_response)
"".join(resp)
log_parts = self._log_parts(app)
self.assertEqual(log_parts[20], "1")
# Policy index can be specified by X-Backend-Storage-Policy-Index
# in the response header for container API
app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {})
app.access_logger = FakeLogger()
req = Request.blank("/v1/a/c", environ={"REQUEST_METHOD": "GET"})
def fake_call(app, env, start_response):
start_response(
app.response_str,
[
("Content-Type", "text/plain"),
("Content-Length", str(sum(map(len, app.body)))),
("X-Backend-Storage-Policy-Index", "1"),
],
)
while env["wsgi.input"].read(5):
pass
return app.body
with mock.patch.object(FakeApp, "__call__", fake_call):
resp = app(req.environ, start_response)
"".join(resp)
log_parts = self._log_parts(app)
self.assertEqual(log_parts[20], "1")
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:35,代码来源:test_proxy_logging.py
示例17: __call__
def __call__(self, env, start_response):
"""
WSGI entry point
"""
req = Request(env)
try:
vrs, account, container, obj = req.split_path(4, 4, True)
except ValueError:
return self.app(env, start_response)
# install our COPY-callback hook
env['swift.copy_hook'] = self.copy_hook(
env.get('swift.copy_hook',
lambda src_req, src_resp, sink_req: src_resp))
try:
if req.method == 'PUT' and \
req.params.get('multipart-manifest') == 'put':
return self.handle_multipart_put(req, start_response)
if req.method == 'DELETE' and \
req.params.get('multipart-manifest') == 'delete':
return self.handle_multipart_delete(req)(env, start_response)
if req.method == 'GET' or req.method == 'HEAD':
return self.handle_multipart_get_or_head(req, start_response)
if 'X-Static-Large-Object' in req.headers:
raise HTTPBadRequest(
request=req,
body='X-Static-Large-Object is a reserved header. '
'To create a static large object add query param '
'multipart-manifest=put.')
except HTTPException as err_resp:
return err_resp(env, start_response)
return self.app(env, start_response)
开发者ID:iloveyou416068,项目名称:swift-1,代码行数:34,代码来源:slo.py
示例18: test_bucket_GET_max_keys
def test_bucket_GET_max_keys(self):
class FakeApp(object):
def __call__(self, env, start_response):
self.query_string = env['QUERY_STRING']
start_response('200 OK', [])
return '[]'
fake_app = FakeApp()
local_app = swift3.filter_factory({})(fake_app)
bucket_name = 'junk'
req = Request.blank('/%s' % bucket_name,
environ={'REQUEST_METHOD': 'GET',
'QUERY_STRING': 'max-keys=5'},
headers={'Authorization': 'AWS test:tester:hmac'})
resp = local_app(req.environ, lambda *args: None)
dom = xml.dom.minidom.parseString("".join(resp))
self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
childNodes[0].nodeValue, '5')
args = dict(cgi.parse_qsl(fake_app.query_string))
self.assert_(args['limit'] == '6')
req = Request.blank('/%s' % bucket_name,
environ={'REQUEST_METHOD': 'GET',
'QUERY_STRING': 'max-keys=5000'},
headers={'Authorization': 'AWS test:tester:hmac'})
resp = local_app(req.environ, lambda *args: None)
dom = xml.dom.minidom.parseString("".join(resp))
self.assertEquals(dom.getElementsByTagName('MaxKeys')[0].
childNodes[0].nodeValue, '1000')
args = dict(cgi.parse_qsl(fake_app.query_string))
self.assertEquals(args['limit'], '1001')
开发者ID:blaxter,项目名称:swift3,代码行数:31,代码来源:test_swift3.py
示例19: test_PUT_encryption_override
def test_PUT_encryption_override(self):
# set crypto override to disable encryption.
# simulate another middleware wanting to set footers
other_footers = {
'Etag': 'other etag',
'X-Object-Sysmeta-Other': 'other sysmeta',
'X-Object-Sysmeta-Container-Update-Override-Etag':
'other override'}
body = 'FAKE APP'
env = {'REQUEST_METHOD': 'PUT',
'swift.crypto.override': True,
'swift.callback.update_footers':
lambda footers: footers.update(other_footers)}
hdrs = {'content-type': 'text/plain',
'content-length': str(len(body))}
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {})
resp = req.get_response(self.encrypter)
self.assertEqual('201 Created', resp.status)
# verify that other middleware's footers made it to app
req_hdrs = self.app.headers[0]
for k, v in other_footers.items():
self.assertEqual(v, req_hdrs[k])
# verify object is NOT encrypted by getting direct from the app
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
self.assertEqual(body, get_req.get_response(self.app).body)
开发者ID:jgmerritt,项目名称:swift,代码行数:28,代码来源:test_encrypter.py
示例20: test_parse_account_that_looks_like_version
def test_parse_account_that_looks_like_version(self):
"""
Demonstrate the failure mode for accounts that looks like versions,
if you can make _parse_path better and this is the *only* test that
fails you can delete it ;)
"""
bad_accounts = (
'v3.0', 'verybaddaccountwithnoprefix',
)
for bad_account in bad_accounts:
req = Request.blank('/endpoints/%s/c/o' % bad_account)
self.assertRaises(ValueError,
self.list_endpoints._parse_path, req)
even_worse_accounts = {
'v1': 1.0,
'v2.0': 2.0,
}
for bad_account, guessed_version in even_worse_accounts.items():
req = Request.blank('/endpoints/%s/c/o' % bad_account)
version, account, container, obj = \
self.list_endpoints._parse_path(req)
self.assertEqual(version, guessed_version)
self.assertEqual(account, 'c')
self.assertEqual(container, 'o')
self.assertEqual(obj, None)
开发者ID:bouncestorage,项目名称:swift,代码行数:25,代码来源:test_list_endpoints.py
注:本文中的swift.common.swob.Request类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论