本文整理汇总了Python中swift.common.utils.json.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _handle_sync_response
def _handle_sync_response(self, node, response, info, broker, http,
different_region=False):
if response.status == HTTP_NOT_FOUND: # completely missing, rsync
self.stats['rsync'] += 1
self.logger.increment('rsyncs')
return self._rsync_db(broker, node, http, info['id'],
different_region=different_region)
elif response.status == HTTP_INSUFFICIENT_STORAGE:
raise DriveNotMounted()
elif 200 <= response.status < 300:
rinfo = json.loads(response.data)
local_sync = broker.get_sync(rinfo['id'], incoming=False)
if rinfo.get('metadata', ''):
broker.update_metadata(json.loads(rinfo['metadata']))
if self._in_sync(rinfo, info, broker, local_sync):
return True
# if the difference in rowids between the two differs by
# more than 50% and the difference is greater than per_diff,
# rsync then do a remote merge.
# NOTE: difference > per_diff stops us from dropping to rsync
# on smaller containers, who have only a few rows to sync.
if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
info['max_row'] - rinfo['max_row'] > self.per_diff:
self.stats['remote_merge'] += 1
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000),
different_region=different_region)
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
开发者ID:matthewoliver,项目名称:swift,代码行数:32,代码来源:db_replicator.py
示例2: test_handle_multipart_put_check_data_bad
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[
{"path": "/checktest/a_1", "etag": "a", "size_bytes": "2"},
{"path": "/checktest/badreq", "etag": "a", "size_bytes": "1"},
{"path": "/checktest/b_2", "etag": "not-b", "size_bytes": "2"},
{"path": "/checktest/slob", "etag": "not-slob", "size_bytes": "2"},
]
)
req = Request.blank(
"/v1/AUTH_test/checktest/man?multipart-manifest=put",
environ={"REQUEST_METHOD": "PUT"},
headers={"Accept": "application/json"},
body=bad_data,
)
status, headers, body = self.call_slo(req)
self.assertEquals(self.app.call_count, 4)
errors = json.loads(body)["Errors"]
self.assertEquals(len(errors), 5)
self.assertEquals(errors[0][0], "/checktest/a_1")
self.assertEquals(errors[0][1], "Size Mismatch")
self.assertEquals(errors[1][0], "/checktest/badreq")
self.assertEquals(errors[1][1], "400 Bad Request")
self.assertEquals(errors[2][0], "/checktest/b_2")
self.assertEquals(errors[2][1], "Etag Mismatch")
self.assertEquals(errors[3][0], "/checktest/slob")
self.assertEquals(errors[3][1], "Size Mismatch")
self.assertEquals(errors[4][0], "/checktest/slob")
self.assertEquals(errors[4][1], "Etag Mismatch")
开发者ID:jettang,项目名称:icehouse,代码行数:30,代码来源:test_slo.py
示例3: update_data_record
def update_data_record(self, record, list_meta=False):
"""
Perform any mutations to container listing records that are common to
all serialization formats, and returns it as a dict.
Converts created time to iso timestamp.
Replaces size with 'swift_bytes' content type parameter.
:params record: object entry record
:returns: modified record
"""
(name, created, size, content_type, etag, metadata) = record
if content_type is None:
return {'subdir': name}
response = {'bytes': size, 'hash': etag, 'name': name,
'content_type': content_type}
if list_meta:
metadata = json.loads(metadata)
utf8encodekeys(metadata)
response['metadata'] = metadata
last_modified = datetime.utcfromtimestamp(float(created)).isoformat()
# python isoformat() doesn't include msecs when zero
if len(last_modified) < len("1970-01-01T00:00:00.000000"):
last_modified += ".000000"
response['last_modified'] = last_modified
override_bytes_from_content_type(response, logger=self.logger)
return response
开发者ID:pkit,项目名称:zwift,代码行数:27,代码来源:server.py
示例4: test_extract_tar_works
def test_extract_tar_works(self):
# On systems where $TMPDIR is long (like OS X), we need to do this
# or else every upload will fail due to the path being too long.
self.app.max_pathlen += len(self.testdir)
for compress_format in ['', 'gz', 'bz2']:
base_name = 'base_works_%s' % compress_format
dir_tree = [
{base_name: [{'sub_dir1': ['sub1_file1', 'sub1_file2']},
{'sub_dir2': ['sub2_file1', u'test obj \u2661']},
'sub_file1',
{'sub_dir3': [{'sub4_dir1': '../sub4 file1'}]},
{'sub_dir4': None},
]}]
build_dir_tree(self.testdir, dir_tree)
mode = 'w'
extension = ''
if compress_format:
mode += ':' + compress_format
extension += '.' + compress_format
tar = tarfile.open(name=os.path.join(self.testdir,
'tar_works.tar' + extension),
mode=mode)
tar.add(os.path.join(self.testdir, base_name))
tar.close()
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp_body = self.handle_extract_and_iter(req, compress_format)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Files Created'], 6)
# test out xml
req = Request.blank('/tar_works/acc/cont/')
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp_body = self.handle_extract_and_iter(
req, compress_format, 'application/xml')
self.assert_('<response_status>201 Created</response_status>' in
resp_body)
self.assert_('<number_files_created>6</number_files_created>' in
resp_body)
# test out nonexistent format
req = Request.blank('/tar_works/acc/cont/?extract-archive=tar',
headers={'Accept': 'good_xml'})
req.environ['REQUEST_METHOD'] = 'PUT'
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
def fake_start_response(*args, **kwargs):
pass
app_iter = self.bulk(req.environ, fake_start_response)
resp_body = ''.join([i for i in app_iter])
self.assert_('Response Status: 406' in resp_body)
开发者ID:BlueSkyChina,项目名称:swift,代码行数:60,代码来源:test_bulk.py
示例5: get_or_head_response
def get_or_head_response(self, req, resp_headers, resp_iter):
with closing_if_possible(resp_iter):
resp_body = ''.join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
segments = []
etag = md5()
content_length = 0
for seg_dict in segments:
if seg_dict.get('range'):
etag.update('%s:%s;' % (seg_dict['hash'], seg_dict['range']))
else:
etag.update(seg_dict['hash'])
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
content_length += self._segment_length(seg_dict)
response_headers = [(h, v) for h, v in resp_headers
if h.lower() not in ('etag', 'content-length')]
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
if req.method == 'HEAD':
return self._manifest_head_response(req, response_headers)
else:
return self._manifest_get_response(
req, content_length, response_headers, segments)
开发者ID:bkolli,项目名称:swift,代码行数:31,代码来源:slo.py
示例6: test_handle_multipart_put_check_data_bad
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[
{"path": "/c/a_1", "etag": "a", "size_bytes": "1"},
{"path": "/c/a_2", "etag": "a", "size_bytes": "1"},
{"path": "/d/b_2", "etag": "b", "size_bytes": "2"},
]
)
req = Request.blank(
"/test_good/A/c/man?multipart-manifest=put",
environ={"REQUEST_METHOD": "PUT"},
headers={"Accept": "application/json"},
body=bad_data,
)
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
self.assertEquals(self.app.calls, 3)
data = json.loads(e.body)
errors = data["Errors"]
self.assertEquals(errors[0][0], "/test_good/A/c/a_1")
self.assertEquals(errors[0][1], "Size Mismatch")
self.assertEquals(errors[2][1], "400 Bad Request")
self.assertEquals(errors[-1][0], "/test_good/A/d/b_2")
self.assertEquals(errors[-1][1], "Etag Mismatch")
开发者ID:mygoda,项目名称:openstack,代码行数:25,代码来源:test_slo.py
示例7: _handle_sync_response
def _handle_sync_response(self, node, response, info, broker, http,
different_region=False):
if response.status == HTTP_NOT_FOUND: # completely missing, rsync
self.stats['rsync'] += 1
self.logger.increment('rsyncs')
return self._rsync_db(broker, node, http, info['id'],
different_region=different_region)
elif response.status == HTTP_INSUFFICIENT_STORAGE:
raise DriveNotMounted()
elif response.status >= 200 and response.status < 300:
rinfo = json.loads(response.data)
local_sync = broker.get_sync(rinfo['id'], incoming=False)
if self._in_sync(rinfo, info, broker, local_sync):
return True
# if the difference in rowids between the two differs by
# more than 50%, rsync then do a remote merge.
if rinfo['max_row'] / float(info['max_row']) < 0.5:
self.stats['remote_merge'] += 1
self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(info['count'] / 2000),
different_region=different_region)
# else send diffs over to the remote server
return self._usync_db(max(rinfo['point'], local_sync),
broker, http, rinfo['id'], info['id'])
开发者ID:hoangv2,项目名称:swift,代码行数:26,代码来源:db_replicator.py
示例8: deserialize_v1
def deserialize_v1(cls, gz_file, metadata_only=False):
"""
Deserialize a v1 ring file into a dictionary with `devs`, `part_shift`,
and `replica2part2dev_id` keys.
If the optional kwarg `metadata_only` is True, then the
`replica2part2dev_id` is not loaded and that key in the returned
dictionary just has the value `[]`.
:param file gz_file: An opened file-like object which has already
consumed the 6 bytes of magic and version.
:param bool metadata_only: If True, only load `devs` and `part_shift`
:returns: A dict containing `devs`, `part_shift`, and
`replica2part2dev_id`
"""
json_len, = struct.unpack('!I', gz_file.read(4))
ring_dict = json.loads(gz_file.read(json_len))
ring_dict['replica2part2dev_id'] = []
if metadata_only:
return ring_dict
partition_count = 1 << (32 - ring_dict['part_shift'])
for x in xrange(ring_dict['replica_count']):
ring_dict['replica2part2dev_id'].append(
array.array('H', gz_file.read(2 * partition_count)))
return ring_dict
开发者ID:linkedinyou,项目名称:swift,代码行数:28,代码来源:ring.py
示例9: get_slo_segments
def get_slo_segments(self, obj_name, req):
"""
Performs a swob.Request and returns the SLO manifest's segments.
:raises HTTPServerError: on unable to load obj_name or
on unable to load the SLO manifest data.
:raises HTTPBadRequest: on not an SLO manifest
:raises HTTPNotFound: on SLO manifest not found
:returns: SLO manifest's segments
"""
vrs, account, _junk = req.split_path(2, 3, True)
new_env = req.environ.copy()
new_env["REQUEST_METHOD"] = "GET"
del (new_env["wsgi.input"])
new_env["QUERY_STRING"] = "multipart-manifest=get"
new_env["CONTENT_LENGTH"] = 0
new_env["HTTP_USER_AGENT"] = "%s MultipartDELETE" % new_env.get("HTTP_USER_AGENT")
new_env["swift.source"] = "SLO"
new_env["PATH_INFO"] = ("/%s/%s/%s" % (vrs, account, obj_name.lstrip("/"))).encode("utf-8")
resp = Request.blank("", new_env).get_response(self.app)
if resp.is_success:
if config_true_value(resp.headers.get("X-Static-Large-Object")):
try:
return json.loads(resp.body)
except ValueError:
raise HTTPServerError("Unable to load SLO manifest")
else:
raise HTTPBadRequest("Not an SLO manifest")
elif resp.status_int == HTTP_NOT_FOUND:
raise HTTPNotFound("SLO manifest not found")
elif resp.status_int == HTTP_UNAUTHORIZED:
raise HTTPUnauthorized("401 Unauthorized")
else:
raise HTTPServerError("Unable to load SLO manifest or segment.")
开发者ID:pchng,项目名称:swift,代码行数:35,代码来源:slo.py
示例10: get_or_head_response
def get_or_head_response(self, req, resp_headers, resp_iter):
with closing_if_possible(resp_iter):
resp_body = "".join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
segments = []
etag = md5()
content_length = 0
for seg_dict in segments:
if seg_dict.get("range"):
etag.update("%s:%s;" % (seg_dict["hash"], seg_dict["range"]))
else:
etag.update(seg_dict["hash"])
if config_true_value(seg_dict.get("sub_slo")):
override_bytes_from_content_type(seg_dict, logger=self.slo.logger)
content_length += self._segment_length(seg_dict)
response_headers = [(h, v) for h, v in resp_headers if h.lower() not in ("etag", "content-length")]
response_headers.append(("Content-Length", str(content_length)))
response_headers.append(("Etag", '"%s"' % etag.hexdigest()))
if req.method == "HEAD":
return self._manifest_head_response(req, response_headers)
else:
return self._manifest_get_response(req, content_length, response_headers, segments)
开发者ID:pchng,项目名称:swift,代码行数:28,代码来源:slo.py
示例11: _fetch_sub_slo_segments
def _fetch_sub_slo_segments(self, req, version, acc, con, obj):
"""
Fetch the submanifest, parse it, and return it.
Raise exception on failures.
"""
sub_req = make_subrequest(
req.environ,
path="/".join(["", version, acc, con, obj]),
method="GET",
headers={"x-auth-token": req.headers.get("x-auth-token")},
agent=("%(orig)s " + "SLO MultipartGET"),
swift_source="SLO",
)
sub_resp = sub_req.get_response(self.slo.app)
if not is_success(sub_resp.status_int):
close_if_possible(sub_resp.app_iter)
raise ListingIterError(
"ERROR: while fetching %s, GET of submanifest %s "
"failed with status %d" % (req.path, sub_req.path, sub_resp.status_int)
)
try:
with closing_if_possible(sub_resp.app_iter):
return json.loads("".join(sub_resp.app_iter))
except ValueError as err:
raise ListingIterError(
"ERROR: while fetching %s, JSON-decoding of submanifest %s "
"failed with %s" % (req.path, sub_req.path, err)
)
开发者ID:pchng,项目名称:swift,代码行数:30,代码来源:slo.py
示例12: parse_input
def parse_input(raw_data):
"""
Given a request will parse the body and return a list of dictionaries
:raises: HTTPException on parse errors
:returns: a list of dictionaries on success
"""
try:
parsed_data = json.loads(raw_data)
except ValueError:
raise HTTPBadRequest("Manifest must be valid json.")
req_keys = set(["path", "etag", "size_bytes"])
opt_keys = set(["range"])
try:
for seg_dict in parsed_data:
if not (req_keys <= set(seg_dict) <= req_keys | opt_keys) or "/" not in seg_dict["path"].lstrip("/"):
raise HTTPBadRequest("Invalid SLO Manifest File")
if seg_dict.get("range"):
try:
seg_dict["range"] = Range("bytes=%s" % seg_dict["range"])
except ValueError:
raise HTTPBadRequest("Invalid SLO Manifest File")
except (AttributeError, TypeError):
raise HTTPBadRequest("Invalid SLO Manifest File")
return parsed_data
开发者ID:pchng,项目名称:swift,代码行数:27,代码来源:slo.py
示例13: test_direct_get_account
def test_direct_get_account(self):
stub_headers = HeaderKeyDict({
'X-Account-Container-Count': '1',
'X-Account-Object-Count': '1',
'X-Account-Bytes-Used': '1',
'X-Timestamp': '1234567890',
'X-PUT-Timestamp': '1234567890'})
body = '[{"count": 1, "bytes": 20971520, "name": "c1"}]'
with mocked_http_conn(200, stub_headers, body) as conn:
resp_headers, resp = direct_client.direct_get_account(
self.node, self.part, self.account, marker='marker',
prefix='prefix', delimiter='delimiter', limit=1000)
self.assertEqual(conn.method, 'GET')
self.assertEqual(conn.path, self.account_path)
self.assertEqual(conn.req_headers['user-agent'], self.user_agent)
self.assertEqual(resp_headers, stub_headers)
self.assertEqual(json.loads(body), resp)
self.assertTrue('marker=marker' in conn.query_string)
self.assertTrue('delimiter=delimiter' in conn.query_string)
self.assertTrue('limit=1000' in conn.query_string)
self.assertTrue('prefix=prefix' in conn.query_string)
self.assertTrue('format=json' in conn.query_string)
开发者ID:LSBDOpenstackDev,项目名称:swift,代码行数:25,代码来源:test_direct_client.py
示例14: _listing_pages_iter
def _listing_pages_iter(self, lcontainer, lprefix, env):
lpartition = self.app.container_ring.get_part(self.account_name, lcontainer)
marker = ""
while True:
lreq = Request.blank("i will be overridden by env", environ=env)
# Don't quote PATH_INFO, by WSGI spec
lreq.environ["PATH_INFO"] = "/v1/%s/%s" % (self.account_name, lcontainer)
lreq.environ["REQUEST_METHOD"] = "GET"
lreq.environ["QUERY_STRING"] = "format=json&prefix=%s&marker=%s" % (quote(lprefix), quote(marker))
lresp = self.GETorHEAD_base(
lreq, _("Container"), self.app.container_ring, lpartition, lreq.swift_entity_path
)
if "swift.authorize" in env:
lreq.acl = lresp.headers.get("x-container-read")
aresp = env["swift.authorize"](lreq)
if aresp:
raise ListingIterNotAuthorized(aresp)
if lresp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif not is_success(lresp.status_int):
raise ListingIterError()
if not lresp.body:
break
sublisting = json.loads(lresp.body)
if not sublisting:
break
marker = sublisting[-1]["name"].encode("utf-8")
yield sublisting
开发者ID:krishna-kashyap,项目名称:swift,代码行数:28,代码来源:obj.py
示例15: __call__
def __call__(self, req):
account = None
try:
(version, account, container, obj) = \
split_path(req.path_info, 2, 4, True)
except ValueError:
pass
if not account or not req.headers.get('x-web-mode'):
return req.get_response(self.app)
if not obj:
req.query_string = 'format=json'
resp = req.get_response(self.app)
if resp.content_type == 'application/json':
listing = json.loads(resp.body)
template = self.get_template(req, account, container)
if template:
ctx = {
'account': account,
'container': container,
'listing': listing,
}
if container:
index = [o for o in listing if o['name'] == 'index.html']
if index:
headers = {'Location': '/v1/%s/%s/index.html' %
(account, container)}
return HTTPSeeOther(headers=headers)
return Response(body=template.render(**ctx))
else:
index = [o for o in listing if o['name'] == 'index.html']
if index:
headers = {'Location': '/v1/%s/%s/index.html' %
(account, container)}
return HTTPSeeOther(headers=headers)
return resp
开发者ID:clayg,项目名称:swift-webmode,代码行数:35,代码来源:webtemplates.py
示例16: test_GET_OBJscope_conAttrs_metadata
def test_GET_OBJscope_conAttrs_metadata(self):
"""
In object scope, specifying container attrs
We should get back the container that the object
belongs to
"""
attrs = Cattrs
req2 = Request.blank(
'/v1/TEST_acc1/TEST_con1/TEST_obj1',
environ={'REQUEST_METHOD': 'GET',
'HTTP_X_TIMESTAMP': '0'}, headers={'attributes': attrs, 'format': 'json'})
resp2 = req2.get_response(self.controller)
self.assert_(resp2.status.startswith('200'))
testList = json.loads(resp2.body)
self.assert_(len(testList) == 1)
testDict = testList[0]
self.assert_('/TEST_acc1/TEST_con1' in testDict)
metaReturned = testDict['/TEST_acc1/TEST_con1']
self.assertEquals(
metaReturned['container_uri'], '/TEST_acc1/TEST_con1')
self.assertEquals(metaReturned['container_name'], 'TEST_con1')
self.assertEquals(metaReturned['container_account_name'], 'TEST_acc1')
self.assertEquals(metaReturned['container_create_time'], self.t)
self.assertEquals(metaReturned['container_object_count'], 33)
self.assertEquals(metaReturned['container_bytes_used'], 3342)
self.assertEquals(metaReturned['container_meta_TESTCUSTOM'], 'CUSTOM')
开发者ID:ucsc-hp-group,项目名称:swift,代码行数:27,代码来源:test_server.py
示例17: test_no_attributes_in_request_con_scope
def test_no_attributes_in_request_con_scope(self):
req = Request.blank(
'/v1/TEST_acc1/TEST_con1',
environ={'REQUEST_METHOD': 'GET',
'HTTP_X_TIMESTAMP': '0'}, headers={'format': 'json'})
resp = req.get_response(self.controller)
self.assert_(resp.status.startswith('200'))
testList = json.loads(resp.body)
self.assertEquals(len(testList), 4)
testDict = testList[0]
self.assert_('/TEST_acc1' in testDict)
metaReturned = testDict['/TEST_acc1']
self.assertEquals(metaReturned['account_uri'], '/TEST_acc1')
testDict = testList[1]
self.assert_('/TEST_acc1/TEST_con1' in testDict)
metaReturned = testDict['/TEST_acc1/TEST_con1']
self.assertEquals(
metaReturned['container_uri'], '/TEST_acc1/TEST_con1')
testDict = testList[2]
self.assert_('/TEST_acc1/TEST_con1/TEST_obj1' in testDict)
metaReturned = testDict['/TEST_acc1/TEST_con1/TEST_obj1']
self.assertEquals(
metaReturned['object_uri'], '/TEST_acc1/TEST_con1/TEST_obj1')
testDict = testList[3]
self.assert_('/TEST_acc1/TEST_con1/TEST_obj2' in testDict)
metaReturned = testDict['/TEST_acc1/TEST_con1/TEST_obj2']
self.assertEquals(
metaReturned['object_uri'], '/TEST_acc1/TEST_con1/TEST_obj2')
开发者ID:ucsc-hp-group,项目名称:swift,代码行数:32,代码来源:test_server.py
示例18: handle_multipart_delete
def handle_multipart_delete(self, req):
new_env = req.environ.copy()
new_env['REQUEST_METHOD'] = 'GET'
del(new_env['wsgi.input'])
new_env['QUERY_STRING'] = 'multipart-manifest=get'
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartDELETE' % req.environ.get('HTTP_USER_AGENT')
new_env['swift.source'] = 'SLO'
get_man_resp = \
Request.blank('', new_env).get_response(self.app)
if get_man_resp.status_int // 100 == 2:
if not config_true_value(
get_man_resp.headers.get('X-Static-Large-Object')):
raise HTTPBadRequest('Not an SLO manifest')
try:
manifest = json.loads(get_man_resp.body)
except ValueError:
raise HTTPServerError('Invalid manifest file')
delete_resp = self.bulk_deleter.handle_delete(
req,
objs_to_delete=[o['name'].encode('utf-8') for o in manifest],
user_agent='MultipartDELETE', swift_source='SLO')
if delete_resp.status_int // 100 == 2:
# delete the manifest file itself
return self.app
else:
return delete_resp
return get_man_resp
开发者ID:sun3shines,项目名称:swift-1.7.4,代码行数:31,代码来源:slo.py
示例19: _reclaim
def _reclaim(self, conn, timestamp):
"""
Removes any empty metadata values older than the timestamp using the
given database connection. This function will not call commit on the
conn, but will instead return True if the database needs committing.
This function was created as a worker to limit transactions and commits
from other related functions.
:param conn: Database connection to reclaim metadata within.
:param timestamp: Empty metadata items last updated before this
timestamp will be removed.
:returns: True if conn.commit() should be called
"""
try:
md = conn.execute('SELECT metadata FROM %s_stat' %
self.db_type).fetchone()[0]
if md:
md = json.loads(md)
keys_to_delete = []
for key, (value, value_timestamp) in md.iteritems():
if value == '' and value_timestamp < timestamp:
keys_to_delete.append(key)
if keys_to_delete:
for key in keys_to_delete:
del md[key]
conn.execute('UPDATE %s_stat SET metadata = ?' %
self.db_type, (json.dumps(md),))
return True
except sqlite3.OperationalError as err:
if 'no such column: metadata' not in str(err):
raise
return False
开发者ID:HoO-Group,项目名称:swift,代码行数:32,代码来源:db.py
示例20: test_bulk_delete_container_delete
def test_bulk_delete_container_delete(self):
req = Request.blank("/delete_cont_fail/AUTH_Acc", body="c\n", headers={"Accept": "application/json"})
req.method = "DELETE"
resp_body = self.handle_delete_and_iter(req)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data["Number Deleted"], 0)
self.assertEquals(resp_data["Errors"][0][1], "409 Conflict")
开发者ID:chmouel,项目名称:swift,代码行数:7,代码来源:test_bulk.py
注:本文中的swift.common.utils.json.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论