本文整理汇总了Python中swift.common.utils.json.dumps函数的典型用法代码示例。如果您正苦于以下问题:Python dumps函数的具体用法?Python dumps怎么用?Python dumps使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了dumps函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_objects
def setup_objects(self):
self.objects = (('rose', '2011-01-05T02:19:14.275290', 0, 303),
('viola', '2011-01-05T02:19:14.275290', 0, 3909),
('lily', '2011-01-05T02:19:14.275290', 0, 3909),
('with space', '2011-01-05T02:19:14.275290', 0, 390),
('with%20space', '2011-01-05T02:19:14.275290', 0, 390))
objects = map(
lambda item: {'name': str(item[0]), 'last_modified': str(item[1]),
'hash': str(item[2]), 'bytes': str(item[3])},
list(self.objects))
object_list = json.dumps(objects)
self.prefixes = ['rose', 'viola', 'lily']
object_list_subdir = []
for p in self.prefixes:
object_list_subdir.append({"subdir": p})
self.swift.register('HEAD', '/v1/AUTH_test/junk', swob.HTTPNoContent,
{}, None)
self.swift.register('HEAD', '/v1/AUTH_test/nojunk', swob.HTTPNotFound,
{}, None)
self.swift.register('GET', '/v1/AUTH_test/junk', swob.HTTPOk, {},
object_list)
self.swift.register('GET', '/v1/AUTH_test/junk_subdir', swob.HTTPOk,
{}, json.dumps(object_list_subdir))
开发者ID:thiagodasilva,项目名称:swift3,代码行数:26,代码来源:test_bucket.py
示例2: test_v2_obj_response
def test_v2_obj_response(self):
req = Request.blank('/endpoints/v2/a/c/o1')
resp = req.get_response(self.list_endpoints)
expected = {
'endpoints': ["http://10.1.1.1:6000/sdb1/1/a/c/o1",
"http://10.1.2.2:6000/sdd1/1/a/c/o1"],
'headers': {'X-Backend-Storage-Policy-Index': "0"},
}
self.assertEqual(resp.body, json.dumps(expected))
for policy in POLICIES:
patch_path = 'swift.common.middleware.list_endpoints' \
'.get_container_info'
mock_get_container_info = lambda *args, **kwargs: \
{'storage_policy': int(policy)}
with mock.patch(patch_path, mock_get_container_info):
resp = req.get_response(self.list_endpoints)
part, nodes = policy.object_ring.get_nodes('a', 'c', 'o1')
[node.update({'part': part}) for node in nodes]
path = 'http://%(ip)s:%(port)s/%(device)s/%(part)s/a/c/o1'
expected = {
'headers': {
'X-Backend-Storage-Policy-Index': str(int(policy))},
'endpoints': [path % node for node in nodes],
}
self.assertEqual(resp.body, json.dumps(expected))
开发者ID:7yue,项目名称:swift,代码行数:25,代码来源:test_list_endpoints.py
示例3: test_parse_input
def test_parse_input(self):
self.assertRaises(HTTPException, slo.parse_input, "some non json")
data = json.dumps([{"path": "/cont/object", "etag": "etagoftheobjecitsegment", "size_bytes": 100}])
self.assertEquals("/cont/object", slo.parse_input(data)[0]["path"])
bad_data = json.dumps([{"path": "/cont/object", "size_bytes": 100}])
self.assertRaises(HTTPException, slo.parse_input, bad_data)
开发者ID:mygoda,项目名称:openstack,代码行数:7,代码来源:test_slo.py
示例4: test_parse_input
def test_parse_input(self):
self.assertRaises(HTTPException, slo.parse_input, 'some non json')
data = json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
'size_bytes': 100}])
self.assertEquals('/cont/object',
slo.parse_input(data)[0]['path'])
bad_data = json.dumps([{'path': '/cont/object', 'size_bytes': 100}])
self.assertRaises(HTTPException, slo.parse_input, bad_data)
开发者ID:674009287,项目名称:swift,代码行数:10,代码来源:test_slo.py
示例5: getAll
def getAll(self):
"""
Dump everything, used for debugging.
"""
obj_data = [record.oRecordData for record in self.conn.query("SELECT FROM Metadata")]
return ''.join([
json.dumps(obj_data), "\n\n", json.dumps(con_data), "\n\n",
json.dumps(acc_data)
])
开发者ID:2015-ucsc-hp,项目名称:swift,代码行数:10,代码来源:backend.py
示例6: get_response_body
def get_response_body(data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format. Handles
json and xml, otherwise will return text/plain. Note: xml response does not
include xml declaration.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == "application/json":
data_dict["Errors"] = error_list
return json.dumps(data_dict)
if data_format and data_format.endswith("/xml"):
output = "<delete>\n"
for key in sorted(data_dict.keys()):
xml_key = key.replace(" ", "_").lower()
output += "<%s>%s</%s>\n" % (xml_key, data_dict[key], xml_key)
output += "<errors>\n"
output += "\n".join(
[
"<object>" "<name>%s</name><status>%s</status>" "</object>" % (saxutils.escape(name), status)
for name, status in error_list
]
)
output += "</errors>\n</delete>\n"
return output
output = ""
for key in sorted(data_dict.keys()):
output += "%s: %s\n" % (key, data_dict[key])
output += "Errors:\n"
output += "\n".join(["%s, %s" % (name, status) for name, status in error_list])
return output
开发者ID:zhouyuan,项目名称:swift,代码行数:33,代码来源:bulk.py
示例7: verify_request
def verify_request(self,req,env):
try:
version, account, container, obj = split_path(req.path_info,
minsegs=1, maxsegs=4, rest_with_last=True)
except ValueError:
self.logger.increment('errors')
return jresponse('-1','not found',req,404)
token = env.get('HTTP_X_AUTH_TOKEN', env.get('HTTP_X_STORAGE_TOKEN'))
verify_flag = False
if token :
user_info = self.get_cache_user_info(env, token)
if user_info:
tenant = 'AUTH_' + user_info.replace('@','').replace('.','')
if account != tenant:
self.logger.increment('unauthorized')
verify_flag = False
verify_flag = True
else:
self.logger.increment('unauthorized')
verify_flag = False
else:
self.logger.increment('unauthorized')
verify_flag = False
oauth_data_list = json.dumps({'verify_flag':str(verify_flag).lower()})
return Response(body=oauth_data_list,request=req)
开发者ID:sun3shines,项目名称:swift-1.7.4,代码行数:33,代码来源:oauth.py
示例8: test_handle_multipart_put_check_data_bad
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[
{"path": "/checktest/a_1", "etag": "a", "size_bytes": "2"},
{"path": "/checktest/badreq", "etag": "a", "size_bytes": "1"},
{"path": "/checktest/b_2", "etag": "not-b", "size_bytes": "2"},
{"path": "/checktest/slob", "etag": "not-slob", "size_bytes": "2"},
]
)
req = Request.blank(
"/v1/AUTH_test/checktest/man?multipart-manifest=put",
environ={"REQUEST_METHOD": "PUT"},
headers={"Accept": "application/json"},
body=bad_data,
)
status, headers, body = self.call_slo(req)
self.assertEquals(self.app.call_count, 4)
errors = json.loads(body)["Errors"]
self.assertEquals(len(errors), 5)
self.assertEquals(errors[0][0], "/checktest/a_1")
self.assertEquals(errors[0][1], "Size Mismatch")
self.assertEquals(errors[1][0], "/checktest/badreq")
self.assertEquals(errors[1][1], "400 Bad Request")
self.assertEquals(errors[2][0], "/checktest/b_2")
self.assertEquals(errors[2][1], "Etag Mismatch")
self.assertEquals(errors[3][0], "/checktest/slob")
self.assertEquals(errors[3][1], "Size Mismatch")
self.assertEquals(errors[4][0], "/checktest/slob")
self.assertEquals(errors[4][1], "Etag Mismatch")
开发者ID:jettang,项目名称:icehouse,代码行数:30,代码来源:test_slo.py
示例9: object_update
def object_update(self, req, broker, name, timestamp):
metadata = json.dumps(dict([val for val in req.headers.iteritems()
if is_user_meta('object', val[0])]))
broker.put_object(name, timestamp, int(req.headers['x-size']),
req.headers['x-content-type'],
req.headers['x-etag'],
metadata=metadata)
开发者ID:pkit,项目名称:zwift,代码行数:7,代码来源:server.py
示例10: set
def set(self, key, value, serialize=True, timeout=0):
"""
Set a key/value pair in memcache
:param key: key
:param value: value
:param serialize: if True, value is serialized with JSON before sending
to memcache, or with pickle if configured to use
pickle instead of JSON (to avoid cache poisoning)
:param timeout: ttl in memcache
"""
key = md5hash(key)
timeout = sanitize_timeout(timeout)
flags = 0
if serialize and self._allow_pickle:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
elif serialize:
value = json.dumps(value)
flags |= JSON_FLAG
for (server, fp, sock) in self._get_conns(key):
try:
sock.sendall('set %s %d %d %s noreply\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
开发者ID:ChristopherMacGown,项目名称:swift,代码行数:28,代码来源:memcached.py
示例11: test_container_usage_error_key
def test_container_usage_error_key(self):
self.conf['quota'] = json.dumps(
{
'container_count': {
'default': 5,
'L10': 10
},
'object_count': {
'default': 5,
'L10': 10
},
'container_usage': {
'default': 5,
'L10': 10
}
}
)
qa = quota.filter_factory(self.conf)(FakeApp())
req = Request.blank('/v1/a2/c/obj')
req.method = 'PUT'
req.environ['CONTENT_LENGTH'] = 1024
# no memcached
resp = req.get_response(qa)
self.assertEquals(resp.status_int, 201)
# no cache
req.environ['swift.cache'] = FakeMemcache()
resp = req.get_response(qa)
self.assertEquals(resp.status_int, 201)
# cached
resp = req.get_response(qa)
self.assertEquals(resp.status_int, 201)
开发者ID:NewpTone,项目名称:StackLab-swift,代码行数:31,代码来源:test_quota.py
示例12: jresponse
def jresponse(status,msg,req,status_int,headers=None,statusstr='',param=None):
msg = msg.lower()
data = {'status':str(status),'msg':str(msg)}
if param:
data.update(param)
container_list = json.dumps(data)
if headers:
ret = Response(body=container_list, request=req,headers=headers)
else:
ret = Response(body=container_list, request=req)
ret.content_type = 'application/json'
ret.charset = 'utf-8'
ret.status_int = status_int
if statusstr:
ret.status = statusstr
if status != '0' and req.method == 'PUT':
pass
# syslog.syslog(syslog.LOG_ERR,'jresponse: status: ' + str(status_int)+' path: '+str(req.path)+' msg: '+str(msg) + ' tx_id: '+ req.environ.get('swift.trans_id',''))
# print 'jresponse: status: ' + str(status_int)+' path: '+str(req.path)+' msg: '+str(msg) + ' tx_id: '+ req.environ.get('swift.trans_id','') + ' method: '+str(req.method) + ' headers: '+str(req.headers) + ' params: '+str(req.GET)
# traceback.print_stack()
# syslog.syslog(syslog.LOG_ERR,'jresponse: '+str(traceback.extract_stack() ))
# syslog.syslog(syslog.LOG_ERR,'jresponse: '+str(traceback.print_stack() ))
return ret
开发者ID:sun7shines,项目名称:Cloudfs,代码行数:28,代码来源:bufferedhttp.py
示例13: get_response_body
def get_response_body(self, data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == "text/plain":
output = ""
for key in sorted(data_dict.keys()):
output += "%s: %s\n" % (key, data_dict[key])
output += "Errors:\n"
output += "\n".join(["%s, %s" % (name, status) for name, status in error_list])
return output
if data_format == "application/json":
data_dict["Errors"] = error_list
return json.dumps(data_dict)
if data_format.endswith("/xml"):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(" ", "_").lower()
output += "<%s>%s</%s>\n" % (xml_key, data_dict[key], xml_key)
output += "<errors>\n"
output += "\n".join(
[
"<object>" "<name>%s</name><status>%s</status>" "</object>" % (saxutils.escape(name), status)
for name, status in error_list
]
)
output += "</errors>\n</delete>\n"
return output
raise HTTPNotAcceptable("Invalid output type")
开发者ID:ChristopherMacGown,项目名称:swift,代码行数:32,代码来源:bulk.py
示例14: get_response_body
def get_response_body(data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'text/plain':
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format.endswith('/xml'):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
output += '<errors>\n'
output += '\n'.join(
['<object>'
'<name>%s</name><status>%s</status>'
'</object>' % (saxutils.escape(name), status) for
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
raise HTTPNotAcceptable('Invalid output type')
开发者ID:jiongpan,项目名称:swift,代码行数:33,代码来源:bulk.py
示例15: _handle_sync_request
def _handle_sync_request(self, broker, remote_info):
"""
Update metadata, timestamps, sync points.
"""
with self.debug_timing('info'):
try:
info = self._get_synced_replication_info(broker, remote_info)
except (Exception, Timeout) as e:
if 'no such table' in str(e):
self.logger.error(_("Quarantining DB %s"), broker)
quarantine_db(broker.db_file, broker.db_type)
return HTTPNotFound()
raise
if remote_info['metadata']:
with self.debug_timing('update_metadata'):
broker.update_metadata(remote_info['metadata'])
sync_timestamps = ('created_at', 'put_timestamp', 'delete_timestamp')
if any(info[ts] != remote_info[ts] for ts in sync_timestamps):
with self.debug_timing('merge_timestamps'):
broker.merge_timestamps(*(remote_info[ts] for ts in
sync_timestamps))
with self.debug_timing('get_sync'):
info['point'] = broker.get_sync(remote_info['id'])
if remote_info['hash'] == info['hash'] and \
info['point'] < remote_info['point']:
with self.debug_timing('merge_syncs'):
translate = {
'remote_id': 'id',
'sync_point': 'point',
}
data = dict((k, remote_info[v]) for k, v in translate.items())
broker.merge_syncs([data])
info['point'] = remote_info['point']
return Response(json.dumps(info))
开发者ID:anishnarang,项目名称:gswift,代码行数:34,代码来源:db_replicator.py
示例16: get_response_body
def get_response_body(data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format. Handles
json and xml, otherwise will return text/plain. Note: xml response does not
include xml declaration.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format and data_format.endswith('/xml'):
output = '<delete>\n'
for key in sorted(data_dict):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
output += '<errors>\n'
output += '\n'.join(
['<object>'
'<name>%s</name><status>%s</status>'
'</object>' % (saxutils.escape(name), status) for
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
output = ''
for key in sorted(data_dict):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
开发者ID:hbhdytf,项目名称:mac,代码行数:34,代码来源:bulk.py
示例17: _reclaim
def _reclaim(self, conn, timestamp):
"""
Removes any empty metadata values older than the timestamp using the
given database connection. This function will not call commit on the
conn, but will instead return True if the database needs committing.
This function was created as a worker to limit transactions and commits
from other related functions.
:param conn: Database connection to reclaim metadata within.
:param timestamp: Empty metadata items last updated before this
timestamp will be removed.
:returns: True if conn.commit() should be called
"""
try:
md = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()[0]
if md:
md = json.loads(md)
keys_to_delete = []
for key, (value, value_timestamp) in md.iteritems():
if value == '' and value_timestamp < timestamp:
keys_to_delete.append(key)
if keys_to_delete:
for key in keys_to_delete:
del md[key]
conn.execute('UPDATE %s_stat SET metadata = ?' %
self.db_type, (json.dumps(md),))
return True
except sqlite3.OperationalError as err:
if 'no such column: metadata' not in str(err):
raise
return False
开发者ID:HoO-Group,项目名称:swift,代码行数:32,代码来源:db.py
示例18: set_multi
def set_multi(self, mapping, server_key, serialize=True, timeout=0):
"""
Sets multiple key/value pairs in memcache.
:param mapping: dictonary of keys and values to be set in memcache
:param servery_key: key to use in determining which server in the ring
is used
:param serialize: if True, value is serialized with JSON before sending
to memcache, or with pickle if configured to use
pickle instead of JSON (to avoid cache poisoning)
:param timeout: ttl for memcache
"""
server_key = md5hash(server_key)
timeout = sanitize_timeout(timeout)
msg = ''
for key, value in mapping.iteritems():
key = md5hash(key)
flags = 0
if serialize and self._allow_pickle:
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
elif serialize:
value = json.dumps(value)
flags |= JSON_FLAG
msg += ('set %s %d %d %s noreply\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
for (server, fp, sock) in self._get_conns(server_key):
try:
sock.sendall(msg)
self._return_conn(server, fp, sock)
return
except Exception, e:
self._exception_occurred(server, e)
开发者ID:ChristopherMacGown,项目名称:swift,代码行数:33,代码来源:memcached.py
示例19: test_handle_multipart_put_check_data_bad
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[
{"path": "/c/a_1", "etag": "a", "size_bytes": "1"},
{"path": "/c/a_2", "etag": "a", "size_bytes": "1"},
{"path": "/d/b_2", "etag": "b", "size_bytes": "2"},
]
)
req = Request.blank(
"/test_good/A/c/man?multipart-manifest=put",
environ={"REQUEST_METHOD": "PUT"},
headers={"Accept": "application/json"},
body=bad_data,
)
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
self.assertEquals(self.app.calls, 3)
data = json.loads(e.body)
errors = data["Errors"]
self.assertEquals(errors[0][0], "/test_good/A/c/a_1")
self.assertEquals(errors[0][1], "Size Mismatch")
self.assertEquals(errors[2][1], "400 Bad Request")
self.assertEquals(errors[-1][0], "/test_good/A/d/b_2")
self.assertEquals(errors[-1][1], "Etag Mismatch")
开发者ID:mygoda,项目名称:openstack,代码行数:25,代码来源:test_slo.py
示例20: test_handle_multipart_put_check_data_bad
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/c/a_2', 'etag': 'a', 'size_bytes': '1'},
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'},
{'path': '/d/slob', 'etag': 'a', 'size_bytes': '2'}])
req = Request.blank(
'/test_good/A/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json'},
body=bad_data)
try:
self.slo.handle_multipart_put(req, fake_start_response)
except HTTPException as e:
self.assertEquals(self.app.calls, 4)
data = json.loads(e.body)
errors = data['Errors']
self.assertEquals(errors[0][0], '/c/a_1')
self.assertEquals(errors[0][1], 'Size Mismatch')
self.assertEquals(errors[2][0], '/c/a_2')
self.assertEquals(errors[2][1], '400 Bad Request')
self.assertEquals(errors[4][0], '/d/b_2')
self.assertEquals(errors[4][1], 'Etag Mismatch')
self.assertEquals(errors[-1][0], '/d/slob')
self.assertEquals(errors[-1][1], 'Etag Mismatch')
else:
self.assert_(False)
开发者ID:674009287,项目名称:swift,代码行数:27,代码来源:test_slo.py
注:本文中的swift.common.utils.json.dumps函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论