本文整理汇总了Python中swift.proxy.controllers.base.Controller类的典型用法代码示例。如果您正苦于以下问题:Python Controller类的具体用法?Python Controller怎么用?Python Controller使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Controller类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_best_response_overrides
def test_best_response_overrides(self):
base = Controller(self.app)
responses = [
(302, 'Found', '', b'The resource has moved temporarily.'),
(100, 'Continue', '', b''),
(404, 'Not Found', '', b'Custom body'),
]
server_type = "Base DELETE"
req = Request.blank('/v1/a/c/o', method='DELETE')
statuses, reasons, headers, bodies = zip(*responses)
# First test that you can't make a quorum with only overridden
# responses
overrides = {302: 204, 100: 204}
resp = base.best_response(req, statuses, reasons, bodies, server_type,
headers=headers, overrides=overrides)
self.assertEqual(resp.status, '503 Service Unavailable')
# next make a 404 quorum and make sure the last delete (real) 404
# status is the one returned.
overrides = {100: 404}
resp = base.best_response(req, statuses, reasons, bodies, server_type,
headers=headers, overrides=overrides)
self.assertEqual(resp.status, '404 Not Found')
self.assertEqual(resp.body, b'Custom body')
开发者ID:mahak,项目名称:swift,代码行数:25,代码来源:test_base.py
示例2: __init__
def __init__(self, app, account_name, **kwargs):
Controller.__init__(self, app)
#将ascii形式的字符转换成unicode形式字符,url解码
self.account_name = unquote(account_name)
if not self.app.allow_account_management:
self.allowed_methods.remove('PUT')
self.allowed_methods.remove('DELETE')
开发者ID:sunzz679,项目名称:swift-2.4.0--source-read,代码行数:7,代码来源:account.py
示例3: test_get_shard_ranges_for_object_put
def test_get_shard_ranges_for_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=1_test'], params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
开发者ID:mahak,项目名称:swift,代码行数:32,代码来源:test_base.py
示例4: __init__
def __init__(self, app, account_name, **kwargs):
Controller.__init__(self, app)
print 'in __init__ of accountcontroller class'
self.account_name = unquote(account_name)
if not self.app.allow_account_management:
self.allowed_methods.remove('PUT')
self.allowed_methods.remove('DELETE')
开发者ID:jannatunnoor,项目名称:test_swift,代码行数:7,代码来源:account.py
示例5: __init__
def __init__(self, app, conf):
Controller.__init__(self, app)
self.conf = conf
self.logger = get_logger(conf, log_route='ringinfo')
self.swift_dir = self.conf.get('swift_dir', '/etc/swift')
self.object_ring = Ring(self.swift_dir, ring_name='object')
self.container_ring = Ring(self.swift_dir, ring_name='container')
self.account_ring = Ring(self.swift_dir, ring_name='account')
开发者ID:HeyManLeader,项目名称:OpenStack-Swift,代码行数:8,代码来源:gziptojson.py
示例6: __init__
def __init__(self, app, version, expose_info, disallowed_sections,
admin_key):
Controller.__init__(self, app)
self.expose_info = expose_info
self.disallowed_sections = disallowed_sections
self.admin_key = admin_key
self.allowed_hmac_methods = {
'HEAD': ['HEAD', 'GET'],
'GET': ['GET']}
开发者ID:sunzz679,项目名称:swift-2.4.0--source-read,代码行数:9,代码来源:info.py
示例7: test_container_info_without_req
def test_container_info_without_req(self):
base = Controller(self.app)
base.account_name = 'a'
base.container_name = 'c'
container_info = \
base.container_info(base.account_name,
base.container_name)
self.assertEqual(container_info['status'], 0)
开发者ID:mahak,项目名称:swift,代码行数:9,代码来源:test_base.py
示例8: test_transfer_headers_with_sysmeta
def test_transfer_headers_with_sysmeta(self):
base = Controller(self.app)
good_hdrs = {"x-base-sysmeta-foo": "ok", "X-Base-sysmeta-Bar": "also ok"}
bad_hdrs = {"x-base-sysmeta-": "too short"}
hdrs = dict(good_hdrs)
hdrs.update(bad_hdrs)
dst_hdrs = HeaderKeyDict()
base.transfer_headers(hdrs, dst_hdrs)
self.assertEqual(HeaderKeyDict(good_hdrs), dst_hdrs)
开发者ID:helen5haha,项目名称:swift,代码行数:9,代码来源:test_base.py
示例9: test_generate_request_headers
def test_generate_request_headers(self):
base = Controller(self.app)
src_headers = {"x-remove-base-meta-owner": "x", "x-base-meta-size": "151M", "new-owner": "Kun"}
req = Request.blank("/v1/a/c/o", headers=src_headers)
dst_headers = base.generate_request_headers(req, transfer=True)
expected_headers = {"x-base-meta-owner": "", "x-base-meta-size": "151M", "connection": "close"}
for k, v in expected_headers.iteritems():
self.assertTrue(k in dst_headers)
self.assertEqual(v, dst_headers[k])
self.assertFalse("new-owner" in dst_headers)
开发者ID:helen5haha,项目名称:swift,代码行数:10,代码来源:test_base.py
示例10: test_transfer_headers_with_sysmeta
def test_transfer_headers_with_sysmeta(self):
base = Controller(self.app)
good_hdrs = {'x-base-sysmeta-foo': 'ok',
'X-Base-sysmeta-Bar': 'also ok'}
bad_hdrs = {'x-base-sysmeta-': 'too short'}
hdrs = dict(good_hdrs)
hdrs.update(bad_hdrs)
dst_hdrs = HeaderKeyDict()
base.transfer_headers(hdrs, dst_hdrs)
self.assertEqual(HeaderKeyDict(good_hdrs), dst_hdrs)
开发者ID:mahak,项目名称:swift,代码行数:10,代码来源:test_base.py
示例11: _check_get_shard_ranges_bad_data
def _check_get_shard_ranges_bad_data(self, body):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
# empty response
headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(200, 200, body_iter=iter([b'', body]),
headers=headers):
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertIsNone(actual)
lines = self.app.logger.get_lines_for_level('error')
return lines
开发者ID:mahak,项目名称:swift,代码行数:11,代码来源:test_base.py
示例12: test_get_shard_ranges_request_failed
def test_get_shard_ranges_request_failed(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
with mocked_http_conn(200, 404, 404, 404):
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertIsNone(actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
warning_lines = self.app.logger.get_lines_for_level('warning')
self.assertIn('Failed to get container listing', warning_lines[0])
self.assertIn('/a/c', warning_lines[0])
self.assertFalse(warning_lines[1:])
开发者ID:mahak,项目名称:swift,代码行数:11,代码来源:test_base.py
示例13: test_generate_request_headers_with_no_orig_req
def test_generate_request_headers_with_no_orig_req(self):
base = Controller(self.app)
src_headers = {'x-remove-base-meta-owner': 'x',
'x-base-meta-size': '151M',
'new-owner': 'Kun'}
dst_headers = base.generate_request_headers(None,
additional=src_headers)
expected_headers = {'x-base-meta-size': '151M',
'connection': 'close'}
for k, v in expected_headers.items():
self.assertDictContainsSubset(expected_headers, dst_headers)
self.assertEqual('', dst_headers['Referer'])
开发者ID:bkolli,项目名称:swift,代码行数:12,代码来源:test_base.py
示例14: test_generate_request_headers_with_sysmeta
def test_generate_request_headers_with_sysmeta(self):
base = Controller(self.app)
good_hdrs = {"x-base-sysmeta-foo": "ok", "X-Base-sysmeta-Bar": "also ok"}
bad_hdrs = {"x-base-sysmeta-": "too short"}
hdrs = dict(good_hdrs)
hdrs.update(bad_hdrs)
req = Request.blank("/v1/a/c/o", headers=hdrs)
dst_headers = base.generate_request_headers(req, transfer=True)
for k, v in good_hdrs.iteritems():
self.assertTrue(k.lower() in dst_headers)
self.assertEqual(v, dst_headers[k.lower()])
for k, v in bad_hdrs.iteritems():
self.assertFalse(k.lower() in dst_headers)
开发者ID:helen5haha,项目名称:swift,代码行数:13,代码来源:test_base.py
示例15: test_generate_request_headers
def test_generate_request_headers(self):
base = Controller(self.app)
src_headers = {'x-remove-base-meta-owner': 'x',
'x-base-meta-size': '151M',
'new-owner': 'Kun'}
req = Request.blank('/v1/a/c/o', headers=src_headers)
dst_headers = base.generate_request_headers(req, transfer=True)
expected_headers = {'x-base-meta-owner': '',
'x-base-meta-size': '151M'}
for k, v in expected_headers.iteritems():
self.assertTrue(k in dst_headers)
self.assertEqual(v, dst_headers[k])
self.assertFalse('new-owner' in dst_headers)
开发者ID:10389030,项目名称:swift,代码行数:13,代码来源:test_base.py
示例16: test_generate_request_headers_with_sysmeta
def test_generate_request_headers_with_sysmeta(self):
base = Controller(self.app)
good_hdrs = {'x-base-sysmeta-foo': 'ok',
'X-Base-sysmeta-Bar': 'also ok'}
bad_hdrs = {'x-base-sysmeta-': 'too short'}
hdrs = dict(good_hdrs)
hdrs.update(bad_hdrs)
req = Request.blank('/v1/a/c/o', headers=hdrs)
dst_headers = base.generate_request_headers(req, transfer=True)
for k, v in good_hdrs.items():
self.assertIn(k.lower(), dst_headers)
self.assertEqual(v, dst_headers[k.lower()])
for k, v in bad_hdrs.items():
self.assertNotIn(k.lower(), dst_headers)
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py
示例17: test_options_unauthorized
def test_options_unauthorized(self):
base = Controller(self.app)
base.account_name = 'a'
base.container_name = 'c'
self.app.cors_allow_origin = ['http://NOT_IT']
req = Request.blank('/v1/a/c/o',
environ={'swift.cache': FakeCache()},
headers={'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
with mock.patch('swift.proxy.controllers.base.'
'http_connect', fake_http_connect(200)):
resp = base.OPTIONS(req)
self.assertEqual(resp.status_int, 401)
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py
示例18: test_options_unauthorized
def test_options_unauthorized(self):
base = Controller(self.app)
base.account_name = "a"
base.container_name = "c"
self.app.cors_allow_origin = ["http://NOT_IT"]
req = Request.blank(
"/v1/a/c/o",
environ={"swift.cache": FakeCache()},
headers={"Origin": "http://m.com", "Access-Control-Request-Method": "GET"},
)
with patch("swift.proxy.controllers.base." "http_connect", fake_http_connect(200)):
resp = base.OPTIONS(req)
self.assertEqual(resp.status_int, 401)
开发者ID:helen5haha,项目名称:swift,代码行数:14,代码来源:test_base.py
示例19: test_get_shard_ranges_missing_record_type
def test_get_shard_ranges_missing_record_type(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
body = json.dumps([dict(sr)]).encode('ascii')
with mocked_http_conn(
200, 200, body_iter=iter([b'', body])):
actual = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertIsNone(actual)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('unexpected record type', error_lines[0])
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
开发者ID:mahak,项目名称:swift,代码行数:14,代码来源:test_base.py
示例20: test_base_have_quorum
def test_base_have_quorum(self):
base = Controller(self.app)
# just throw a bunch of test cases at it
self.assertEqual(base.have_quorum([201, 404], 3), False)
self.assertEqual(base.have_quorum([201, 201], 4), True)
self.assertEqual(base.have_quorum([201], 4), False)
self.assertEqual(base.have_quorum([201, 201, 404, 404], 4), True)
self.assertEqual(base.have_quorum([201, 302, 418, 503], 4), False)
self.assertEqual(base.have_quorum([201, 503, 503, 201], 4), True)
self.assertEqual(base.have_quorum([201, 201], 3), True)
self.assertEqual(base.have_quorum([404, 404], 3), True)
self.assertEqual(base.have_quorum([201, 201], 2), True)
self.assertEqual(base.have_quorum([201, 404], 2), True)
self.assertEqual(base.have_quorum([404, 404], 2), True)
self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True)
开发者ID:harrisonfeng,项目名称:swift,代码行数:15,代码来源:test_base.py
注:本文中的swift.proxy.controllers.base.Controller类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论