本文整理汇总了Python中ujson.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: main
def main(j, args, params, tags, tasklet):
doc = args.doc
nid = args.getTag('nid')
nidstr = str(nid)
rediscl = j.clients.redis.getByInstance('system')
out = list()
out.append('||Port||Status||Memory Used||')
rstatus = rediscl.hget('healthcheck:monitoring', 'results')
errors = rediscl.hget('healthcheck:monitoring', 'errors')
rstatus = ujson.loads(rstatus) if rstatus else dict()
errors = ujson.loads(errors) if errors else dict()
for data in [rstatus, errors]:
if nidstr in data:
if 'redis' in data.get(nidstr, dict()):
rnstatus = data[nidstr].get('redis', dict())
for stat in rnstatus:
if 'state' not in stat:
continue
state = j.core.grid.healthchecker.getWikiStatus(stat.get('state', 'UNKNOWN'))
usage = "%s / %s" % (stat.get('memory_usage', ''), stat.get('memory_max', ''))
out.append('|%s|%s|%s|' % (stat.get('port', -1), state, usage))
out = '\n'.join(out)
params.result = (out, doc)
return params
开发者ID:jumpscale7,项目名称:jumpscale_portal,代码行数:30,代码来源:1_redisstatus.py
示例2: test_ReadBadObjectSyntax
def test_ReadBadObjectSyntax(self):
try:
ujson.loads('{"age", 44}')
except ValueError:
pass
else:
assert False, "expected ValueError"
开发者ID:karanlyons,项目名称:ultrajson,代码行数:7,代码来源:tests.py
示例3: test_decodeArrayFaultyUnicode
def test_decodeArrayFaultyUnicode(self):
try:
ujson.loads('[18446744073709551616]')
except ValueError:
pass
else:
assert False, "expected ValueError"
开发者ID:karanlyons,项目名称:ultrajson,代码行数:7,代码来源:tests.py
示例4: test_set_active
def test_set_active(self):
# type: () -> None
self.login("[email protected]")
client = 'website'
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences']["[email protected]"][client]['status'], 'idle')
email = "[email protected]"
self.login("[email protected]")
self.client_post("/json/users/me/presence", {'status': 'idle'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'idle')
self.assertEqual(json['presences']['[email protected]'][client]['status'], 'idle')
self.client_post("/json/users/me/presence", {'status': 'active'})
result = self.client_post("/json/get_active_statuses", {})
self.assert_json_success(result)
json = ujson.loads(result.content)
self.assertEqual(json['presences'][email][client]['status'], 'active')
self.assertEqual(json['presences']['[email protected]'][client]['status'], 'idle')
开发者ID:tobby2002,项目名称:zulip,代码行数:27,代码来源:test_presence.py
示例5: post
def post(request):
"""Sets a key to a value on the currently logged in users preferences
:param key: Key to set
:type key: str
:param val: Value to set
:type val: primitive
:returns: json
"""
data = request.POST or json.loads(request.body)['body']
key = data.get('key', None)
val = data.get('val', None)
res = Result()
if key is not None and val is not None:
obj, created = UserPref.objects.get_or_create(user=request.user)
if created:
obj.data = json.dumps(DefaultPrefs.copy())
obj.save()
try:
val = json.loads(val)
except (TypeError, ValueError):
pass
obj.setKey(key, val)
obj.save()
res.append(obj.json())
return JsonResponse(res.asDict())
开发者ID:theiviaxx,项目名称:Frog,代码行数:27,代码来源:userpref.py
示例6: from_json
def from_json(text):
# Skip lines containing nothing but whitespace and lines that contain just
# a hexadecimal number. The latter arises when the tweet stream is using
# "Transfer-Encoding: chunked" and the chunk delimeters made their way into
# the output file (i.e., before issue #92 was fixed).
if (re.search(r'^[0-9a-f\s]*$', text)):
raise Nothing_To_Parse_Error()
try:
j = json.loads(text.replace(r'\\"', r'\"')) # Isaac: was having problems parsing PostgreSQL JSONs
except ValueError:
j = json.loads(text) # raises ValueError on parse failure
if ('delete' in j):
return Deletion_Notice.from_json(j)
elif ('limit' in j):
return Limit_Notice.from_json(j)
elif ('scrub_geo' in j):
return Scrub_Geo_Notice.from_json(j)
elif ('status_withheld' in j):
return Status_Withheld.from_json(j)
elif ('text' in j):
return Tweet.from_json(j)
elif ('warning' in j):
return Warning.from_json(j)
else:
raise Unknown_Object_Error()
开发者ID:joh12041,项目名称:quac,代码行数:25,代码来源:tweet.py
示例7: loads
def loads(*args, **kwargs):
try:
if json.__name__ == "ujson":
return json.loads(*args, **kwargs)
return json.loads(strict=False, *args, **kwargs)
except ValueError:
raise ResultParseError("The JSON result could not be parsed")
开发者ID:weineran,项目名称:ripe.atlas.sagan,代码行数:7,代码来源:base.py
示例8: test_get_samples
def test_get_samples(self):
res = mock.Mock()
req = mock.Mock()
def _side_effect(arg):
if arg == 'name':
return 'tongli'
elif arg == 'dimensions':
return 'key1:100, key2:200'
req.get_param.side_effect = _side_effect
req_result = mock.Mock()
req_result.json.return_value = json.loads(self.response_str)
req_result.status_code = 200
with mock.patch.object(requests, 'post', return_value=req_result):
self.dispatcher.get_samples(req, res)
# test that the response code is 200
self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
obj = json.loads(res.body)
self.assertEqual(obj[0]['meter'], 'BABMGD')
self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j')
self.assertEqual(obj[0]['type'], 'metrics')
self.assertEqual(obj[0]['user_id'],
'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
self.assertEqual(obj[0]['project_id'],
'35b17138-b364-4e6a-a131-8f3099c5be68')
self.assertEqual(obj[0]['timestamp'], 1461337094000)
self.assertEqual(obj[0]['volume'], 4)
self.assertEqual(len(obj), 1)
开发者ID:openstack,项目名称:kiloeyes,代码行数:32,代码来源:test_samples.py
示例9: loadFromFilesystem
def loadFromFilesystem(self):
if os.path.isfile(PROFILE_DIR+'UserProfile/UserProfile.json'):
# We already have a JSON file. Load the details from the file at the start.
with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'rb') as f:
self.__settingsAndProfile = ujson.loads(f.read())
# Check for old version.
if 'selectedTopics' in self.__settingsAndProfile:
# This is a 1.1.2 JSON file. needs to be migrated.
migrationResult = self.__migrateFrom112to120(self.__settingsAndProfile)
with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'wb') as f:
f.write(ujson.encode(migrationResult))
self.__settingsAndProfile = ujson.loads(f.read())
else:
# The main
self.__updateBootStatus()
else:
# We don't have a JSON file. This means it's not created yet. Create it.
with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'wb') as f:
# Now, time to set some defaults.
newProfileFile = self.__produceProfileWithDefaults()
newProfileFile['machineDetails']['listeningPort'] = self.__getRandomOpenPort()
f.write(ujson.encode(newProfileFile))
# This is the first load ever.
with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'rb') as f:
self.__settingsAndProfile = ujson.loads(f.read())
开发者ID:anastiel,项目名称:aether-public,代码行数:26,代码来源:globals.py
示例10: init_thread_proc
def init_thread_proc(self):
try:
if os.path.isfile(DUMP_PATH + '/' + DUMP_FILE):
fpdump = open(DUMP_PATH + '/' + DUMP_FILE)
totaldict = pickle.load(fpdump)
fpdump.close()
self._spellcitydict = totaldict['spellcitydict']
self._citydict = totaldict['citydict']
else:
self.get_city_service()
except:
pass
self._selfparmeter = sys.argv
post_service_data('192.168.0.24', '8030', 'register', sys.argv)
rawretdata = get_active_service('192.168.0.24', '8030', 'getinstance')
retdata = json.loads(rawretdata)
self._totalservicelist = retdata['data']
if len(retdata['data']) != 1:
tempnode = retdata['data'][0]
rawtotaldict = get_active_service(tempnode['host'], tempnode['port'], 'othersget')
totaldict = json.loads(rawtotaldict)['data']
self._citydict = totaldict['citydict']
self._spellcitydict = totaldict['spellcitydict']
self.set_auto_update(1)
return {'result' : '0'}
开发者ID:jizhouli,项目名称:realtime-booking-board,代码行数:29,代码来源:service_interface.py
示例11: get_updated_account_data_for_user_txn
def get_updated_account_data_for_user_txn(txn):
sql = (
"SELECT account_data_type, content FROM account_data"
" WHERE user_id = ? AND stream_id > ?"
)
txn.execute(sql, (user_id, stream_id))
global_account_data = {
row[0]: json.loads(row[1]) for row in txn.fetchall()
}
sql = (
"SELECT room_id, account_data_type, content FROM room_account_data"
" WHERE user_id = ? AND stream_id > ?"
)
txn.execute(sql, (user_id, stream_id))
account_data_by_room = {}
for row in txn.fetchall():
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])
return (global_account_data, account_data_by_room)
开发者ID:0-T-0,项目名称:synapse,代码行数:25,代码来源:account_data.py
示例12: main
def main(j, args, params, tags, tasklet):
doc = args.doc
nid = args.getTag('nid')
nidstr = str(nid)
rediscl = j.clients.redis.getGeventRedisClient('127.0.0.1', 9999)
out = list()
disks = rediscl.hget('healthcheck:monitoring', 'results')
errors = rediscl.hget('healthcheck:monitoring', 'errors')
disks = ujson.loads(disks) if disks else dict()
errors = ujson.loads(errors) if errors else dict()
out.append('||Disk||Free Space||Status||')
for type, data in (('error', errors), ('disk', disks)):
if nidstr in data:
if 'disks' in data.get(nidstr, dict()):
ddata = data[nidstr].get('disks', list())
for diskstat in ddata:
if type == 'error':
diskstat = diskstat.values()[0]
if 'state' not in diskstat:
continue
state = j.core.grid.healthchecker.getWikiStatus(diskstat.get('state', 'UNKNOWN'))
out.append('|%s|%s|%s|' % (diskstat.get('path', ''), diskstat.get('message', ''), state))
out.append('\n')
out = '\n'.join(out)
params.result = (out, doc)
return params
开发者ID:jumpscale7,项目名称:jumpscale6_core,代码行数:31,代码来源:1_diskcheck.py
示例13: list_of_all_data_in_bioactivities
def list_of_all_data_in_bioactivities(request):
"""Lists all requested data for filtering bioactivities"""
exclude_questionable = json.loads(request.GET.get('exclude_questionable'))
pubchem = json.loads(request.GET.get('pubchem'))
target_types = json.loads(request.GET.get('target_types'))
organisms = json.loads(request.GET.get('organisms'))
desired_target_types = [
x.get(
'name'
) for x in target_types
if x.get(
'is_selected'
) is True
]
desired_organisms = [
x.get(
'name'
) for x in organisms
if x.get(
'is_selected'
) is True
]
return JSONResponse(
generate_list_of_all_data_in_bioactivities(
exclude_questionable,
pubchem,
desired_organisms,
desired_target_types
)
)
开发者ID:UPDDI,项目名称:mps-database-server,代码行数:33,代码来源:views.py
示例14: thread_main
def thread_main():
delta = None
rpc_count = 0
while True:
want = self.want_axis_video
req = {
'__type': 'AxisCameraDaemonReq',
'traceSpec': {
'__type': 'VideoTraceSpec',
'traceDir': path.join(remote_traces_dir, self.trace_name),
'traceName': self.trace_name,
'timeseqName': timeseq_name,
},
'cameraConfig': {
'__type': 'AxisCameraConfig',
'ipaddr': ipaddr,
'url': '/axis-cgi/mjpg/video.cgi?compression=30&rotation=0&resolution=' + resolution,
'authHeader': auth_header,
},
'txTime': time.time(),
'recordFor': 10.0 if want else 0.0,
}
rpc_id = 'rpc%d' % rpc_count
rpc_count += 1
tx = ['record', rpc_id, ujson.dumps(req, escape_forward_slashes=False)]
if 1: logger.info('camera %s < %s', daemon_endpoint, tx)
self.sock.send_multipart(tx, flags=0, copy=True, track=False)
rx = self.sock.recv_multipart(flags=0, copy=True, track=False)
rx_time = time.time()
if 1: logger.info('%s > %s %s', daemon_endpoint, rx[1], rx[2])
rpc_id2 = rx[0]
rpc_err = ujson.loads(rx[1])
rpc_result = ujson.loads(rx[2])
if rpc_err:
logger.info('%s > error %s', daemon_endpoint, rpc_err)
break
min_delta = rpc_result['txTime'] - rx_time
max_delta = rpc_result['txTime'] - rpc_result['reqTxTime']
if delta is None:
delta = (min_delta + max_delta) * 0.5
delta = max(min_delta, min(max_delta, delta))
if 0: logger.info("timing %0.6f %+0.6f %+0.6f min_delta=%+0.6f max_delta=%+0.6f delta=%+0.6f",
rpc_result['reqTxTime'], rpc_result['txTime'], rx_time, min_delta, max_delta, delta)
rep_times = rpc_result['times']
rep_samples = rpc_result['samples']
for ts, sample in zip(rep_times, rep_samples):
self.add(ts - delta, timeseq_name, sample)
rep_chunks = rpc_result['chunks']
for chunk_ts in rep_chunks:
chunk_fn = 'chunk_%s_%.0f.video' % (timeseq_name, chunk_ts)
chunk_path = path.join(self.trace_dir, chunk_fn)
src_file = path.join(local_link_prefix, self.trace_name, chunk_fn)
logger.info('Create symlink(%s, %s) llp=%s rtd=%s', src_file, chunk_path, local_link_prefix, remote_traces_dir)
os.symlink(src_file, chunk_path)
if not want: break
time.sleep(0.1)
开发者ID:openai,项目名称:rosbridge,代码行数:60,代码来源:timeseq.py
示例15: notify
def notify(result):
stream = result.get("events")
if stream:
max_position = stream["position"]
for row in stream["rows"]:
position = row[0]
internal = json.loads(row[1])
event_json = json.loads(row[2])
event = FrozenEvent(event_json, internal_metadata_dict=internal)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
notifier.on_new_room_event(
event, position, max_position, extra_users
)
notify_from_stream(
result, "push_rules", "push_rules_key", user="user_id"
)
notify_from_stream(
result, "user_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "room_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "tag_account_data", "account_data_key", user="user_id"
)
notify_from_stream(
result, "receipts", "receipt_key", room="room_id"
)
notify_from_stream(
result, "typing", "typing_key", room="room_id"
)
开发者ID:pombredanne,项目名称:synapse-2,代码行数:34,代码来源:synchrotron.py
示例16: filterObjects
def filterObjects(request, obj_id):
"""
Filters Gallery for the requested ImageVideo objects. Returns a Result object with
serialized objects
"""
print obj_id
obj = Gallery.objects.get(pk=obj_id)
if request.user.is_anonymous() and obj.security != Gallery.PUBLIC:
res = Result()
res.isError = True
res.message = 'This gallery is not public'
return JsonResponse(res)
tags = json.loads(request.GET.get('filters', '[[]]'))
rng = request.GET.get('rng', None)
more = json.loads(request.GET.get('more', 'false'))
models = request.GET.get('models', 'image,video')
if models == '':
models = 'image,video'
tags = filter(None, tags)
models = [ContentType.objects.get(app_label='frog', model=x) for x in models.split(',')]
return _filter(request, obj, tags=tags, rng=rng, models=models, more=more)
开发者ID:davideilering,项目名称:Frog,代码行数:28,代码来源:gallery.py
示例17: main
def main(JSONinput):
query = json.loads(JSONinput)
# Set up the query.
p = SQLAPIcall(query)
# run the query.
resp = p.execute()
if query['method'] == 'data' and 'format' in query and query['format'] == 'json':
try:
resp = json.loads(resp)
except:
resp = dict(status="error", code=500,
message="Internal error: server did not return json")
# Print appropriate HTML headers
if 'status' in resp and resp['status'] == 'error':
code = resp['code'] if 'code' in resp else 500
headers(query['method'], errorcode=code)
else:
headers(query['method'])
print json.dumps(resp)
else:
headers(query['method'])
print resp
return True
开发者ID:Bookworm-project,项目名称:BookwormDB,代码行数:28,代码来源:dbbindings.py
示例18: load_results
def load_results(dir, raw_episodes=False):
fnames = get_monitor_files(dir)
if not fnames:
raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
episodes = []
headers = []
for fname in fnames:
with open(fname, 'rt') as fh:
lines = fh.readlines()
header = json.loads(lines[0])
headers.append(header)
for line in lines[1:]:
episode = json.loads(line)
episode['abstime'] = header['t_start'] + episode['t']
del episode['t']
episodes.append(episode)
header0 = headers[0]
for header in headers[1:]:
assert header['env_id'] == header0['env_id'], "mixing data from two envs"
episodes = sorted(episodes, key=lambda e: e['abstime'])
if raw_episodes:
return episodes
else:
return {
'env_info': {'env_id': header0['env_id'], 'gym_version': header0['gym_version']},
'episode_end_times': [e['abstime'] for e in episodes],
'episode_lengths': [e['l'] for e in episodes],
'episode_rewards': [e['r'] for e in episodes],
'initial_reset_time': min([min(header['t_start'] for header in headers)])
}
开发者ID:tony32769,项目名称:treeqn,代码行数:30,代码来源:monitor.py
示例19: test_ia_search_itemlist
def test_ia_search_itemlist(capsys):
with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
url1 = ('{0}//archive.org/services/search/beta/scrape.php'
'?q=collection%3Aattentionkmartshoppers'
'&REQUIRE_AUTH=true&size=10000'.format(protocol))
url2 = ('{0}//archive.org/services/search/beta/scrape.php?'
'cursor=W3siaWRlbnRpZmllciI6IjE5NjEtTC0wNTkxNCJ9XQ%3D%3D'
'&REQUIRE_AUTH=true&q=collection%3Aattentionkmartshoppers'
'&size=10000'.format(protocol))
rsps.add(responses.POST, url1,
body=TEST_SCRAPE_RESPONSE,
status=200,
match_querystring=True)
_j = json.loads(TEST_SCRAPE_RESPONSE)
del _j['cursor']
_r = json.dumps(_j)
rsps.add(responses.POST, url2,
body=_r,
status=200,
match_querystring=True)
sys.argv = ['ia', 'search', 'collection:attentionkmartshoppers', '--itemlist']
try:
ia.main()
except SystemExit as exc:
assert not exc.code
out, err = capsys.readouterr()
j = json.loads(TEST_SEARCH_RESPONSE)
assert len(out.split()) == 200
开发者ID:FactMiners,项目名称:internetarchive,代码行数:30,代码来源:test_ia_search.py
示例20: etr_circ
def etr_circ():
try:
data = loads(request.args.get('q', '{}'))
except (TypeError, ValueError, OverflowError):
return jsonify_status_code(400, message='Unable to decode data')
if not data:
query = World_Circonscriptions.query.filter(
World_Circonscriptions.cir_num != None).all()
else:
query = World_Circonscriptions.query.filter(World_Circonscriptions.cir_num.in_(data['cirid'])).all()
geojs = {"crs" : None, "type" : "FeatureCollection", "features" : list()}
for circo in query:
geomjs = db.session.scalar(circo.geom.geojson)
geompy = loads(geomjs)
geojs['features'].append(
{'geometry': geompy,
'type':'Feature',
'id': circo.gid,
'properties': {'name' : circo.name, 'cir_num' : circo.cir_num}
})
return Response(dumps(geojs), mimetype='application/json')
开发者ID:blackrez,项目名称:webcirconscriptionsrc,代码行数:26,代码来源:app.py
注:本文中的ujson.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论