• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python ujson.loads函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中ujson.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main(j, args, params, tags, tasklet):
    doc = args.doc
    nid = args.getTag('nid')
    nidstr = str(nid)
    rediscl = j.clients.redis.getByInstance('system')

    out = list()

    out.append('||Port||Status||Memory Used||')

    rstatus = rediscl.hget('healthcheck:monitoring', 'results')
    errors = rediscl.hget('healthcheck:monitoring', 'errors')
    rstatus = ujson.loads(rstatus) if rstatus else dict()
    errors = ujson.loads(errors) if errors else dict()

    for data in [rstatus, errors]:
        if nidstr in data:
            if 'redis' in data.get(nidstr, dict()):
                rnstatus = data[nidstr].get('redis', dict())
                for stat in rnstatus:
                    if 'state' not in stat:
                        continue
                    state = j.core.grid.healthchecker.getWikiStatus(stat.get('state', 'UNKNOWN'))
                    usage = "%s / %s" % (stat.get('memory_usage', ''), stat.get('memory_max', ''))
                    out.append('|%s|%s|%s|' % (stat.get('port', -1), state, usage))

    out = '\n'.join(out)

    params.result = (out, doc)
    return params
开发者ID:jumpscale7,项目名称:jumpscale_portal,代码行数:30,代码来源:1_redisstatus.py


示例2: test_ReadBadObjectSyntax

 def test_ReadBadObjectSyntax(self):
     try:
         ujson.loads('{"age", 44}')
     except ValueError:
         pass
     else:
         assert False, "expected ValueError"
开发者ID:karanlyons,项目名称:ultrajson,代码行数:7,代码来源:tests.py


示例3: test_decodeArrayFaultyUnicode

 def test_decodeArrayFaultyUnicode(self):
     try:
         ujson.loads('[18446744073709551616]')
     except ValueError:
         pass
     else:
         assert False, "expected ValueError"
开发者ID:karanlyons,项目名称:ultrajson,代码行数:7,代码来源:tests.py


示例4: test_set_active

    def test_set_active(self):
        # type: () -> None
        self.login("[email protected]")
        client = 'website'

        self.client_post("/json/users/me/presence", {'status': 'idle'})
        result = self.client_post("/json/get_active_statuses", {})

        self.assert_json_success(result)
        json = ujson.loads(result.content)
        self.assertEqual(json['presences']["[email protected]"][client]['status'], 'idle')

        email = "[email protected]"
        self.login("[email protected]")
        self.client_post("/json/users/me/presence", {'status': 'idle'})
        result = self.client_post("/json/get_active_statuses", {})
        self.assert_json_success(result)
        json = ujson.loads(result.content)
        self.assertEqual(json['presences'][email][client]['status'], 'idle')
        self.assertEqual(json['presences']['[email protected]'][client]['status'], 'idle')

        self.client_post("/json/users/me/presence", {'status': 'active'})
        result = self.client_post("/json/get_active_statuses", {})
        self.assert_json_success(result)
        json = ujson.loads(result.content)
        self.assertEqual(json['presences'][email][client]['status'], 'active')
        self.assertEqual(json['presences']['[email protected]'][client]['status'], 'idle')
开发者ID:tobby2002,项目名称:zulip,代码行数:27,代码来源:test_presence.py


示例5: post

def post(request):
    """Sets a key to a value on the currently logged in users preferences

    :param key: Key to set
    :type key: str
    :param val: Value to set
    :type val: primitive
    :returns: json
    """
    data = request.POST or json.loads(request.body)['body']
    key = data.get('key', None)
    val = data.get('val', None)
    res = Result()
    if key is not None and val is not None:
        obj, created = UserPref.objects.get_or_create(user=request.user)
        if created:
            obj.data = json.dumps(DefaultPrefs.copy())
            obj.save()
        try:
            val = json.loads(val)
        except (TypeError, ValueError):
            pass
        obj.setKey(key, val)
        obj.save()
        res.append(obj.json())

    return JsonResponse(res.asDict())
开发者ID:theiviaxx,项目名称:Frog,代码行数:27,代码来源:userpref.py


示例6: from_json

def from_json(text):
   # Skip lines containing nothing but whitespace and lines that contain just
   # a hexadecimal number. The latter arises when the tweet stream is using
   # "Transfer-Encoding: chunked" and the chunk delimeters made their way into
   # the output file (i.e., before issue #92 was fixed).
   if (re.search(r'^[0-9a-f\s]*$', text)):
      raise Nothing_To_Parse_Error()
   try:
      j = json.loads(text.replace(r'\\"', r'\"'))  # Isaac: was having problems parsing PostgreSQL JSONs
   except ValueError:
      j = json.loads(text)  # raises ValueError on parse failure
   if ('delete' in j):
      return Deletion_Notice.from_json(j)
   elif ('limit' in j):
      return Limit_Notice.from_json(j)
   elif ('scrub_geo' in j):
      return Scrub_Geo_Notice.from_json(j)
   elif ('status_withheld' in j):
      return Status_Withheld.from_json(j)
   elif ('text' in j):
      return Tweet.from_json(j)
   elif ('warning' in j):
      return Warning.from_json(j)
   else:
      raise Unknown_Object_Error()
开发者ID:joh12041,项目名称:quac,代码行数:25,代码来源:tweet.py


示例7: loads

 def loads(*args, **kwargs):
     try:
         if json.__name__ == "ujson":
             return json.loads(*args, **kwargs)
         return json.loads(strict=False, *args, **kwargs)
     except ValueError:
         raise ResultParseError("The JSON result could not be parsed")
开发者ID:weineran,项目名称:ripe.atlas.sagan,代码行数:7,代码来源:base.py


示例8: test_get_samples

    def test_get_samples(self):
        res = mock.Mock()
        req = mock.Mock()

        def _side_effect(arg):
            if arg == 'name':
                return 'tongli'
            elif arg == 'dimensions':
                return 'key1:100, key2:200'
        req.get_param.side_effect = _side_effect

        req_result = mock.Mock()

        req_result.json.return_value = json.loads(self.response_str)
        req_result.status_code = 200

        with mock.patch.object(requests, 'post', return_value=req_result):
            self.dispatcher.get_samples(req, res)

        # test that the response code is 200
        self.assertEqual(res.status, getattr(falcon, 'HTTP_200'))
        obj = json.loads(res.body)
        self.assertEqual(obj[0]['meter'], 'BABMGD')
        self.assertEqual(obj[0]['id'], 'AVOziWmP6-pxt0dRmr7j')
        self.assertEqual(obj[0]['type'], 'metrics')
        self.assertEqual(obj[0]['user_id'],
                         'efd87807-12d2-4b38-9c70-5f5c2ac427ff')
        self.assertEqual(obj[0]['project_id'],
                         '35b17138-b364-4e6a-a131-8f3099c5be68')
        self.assertEqual(obj[0]['timestamp'], 1461337094000)
        self.assertEqual(obj[0]['volume'], 4)
        self.assertEqual(len(obj), 1)
开发者ID:openstack,项目名称:kiloeyes,代码行数:32,代码来源:test_samples.py


示例9: loadFromFilesystem

    def loadFromFilesystem(self):
        if os.path.isfile(PROFILE_DIR+'UserProfile/UserProfile.json'):
            # We already have a JSON file. Load the details from the file at the start.
            with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'rb') as f:
                self.__settingsAndProfile = ujson.loads(f.read())

                # Check for old version.
                if 'selectedTopics' in self.__settingsAndProfile:
                    # This is a 1.1.2 JSON file. needs to be migrated.
                    migrationResult = self.__migrateFrom112to120(self.__settingsAndProfile)
                    with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'wb') as f:
                        f.write(ujson.encode(migrationResult))
                    self.__settingsAndProfile = ujson.loads(f.read())
                else:
                    # The main
                    self.__updateBootStatus()
        else:
            # We don't have a JSON file. This means it's not created yet. Create it.
            with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'wb') as f:
            # Now, time to set some defaults.
                newProfileFile = self.__produceProfileWithDefaults()
                newProfileFile['machineDetails']['listeningPort'] = self.__getRandomOpenPort()
                f.write(ujson.encode(newProfileFile))
            # This is the first load ever.
            with open(PROFILE_DIR+'UserProfile/UserProfile.json', 'rb') as f:
                self.__settingsAndProfile = ujson.loads(f.read())
开发者ID:anastiel,项目名称:aether-public,代码行数:26,代码来源:globals.py


示例10: init_thread_proc

    def init_thread_proc(self):
        try:
            if os.path.isfile(DUMP_PATH + '/' + DUMP_FILE):
                fpdump = open(DUMP_PATH + '/' + DUMP_FILE)
                totaldict = pickle.load(fpdump)
                fpdump.close()
                self._spellcitydict = totaldict['spellcitydict']
                self._citydict = totaldict['citydict']
            else:
                self.get_city_service()
        except:
            pass

        self._selfparmeter = sys.argv
        
        post_service_data('192.168.0.24', '8030', 'register', sys.argv)
        rawretdata = get_active_service('192.168.0.24', '8030', 'getinstance')
        retdata = json.loads(rawretdata)
        self._totalservicelist = retdata['data']
        if len(retdata['data']) != 1:
            tempnode = retdata['data'][0]
            rawtotaldict = get_active_service(tempnode['host'], tempnode['port'], 'othersget')
            totaldict = json.loads(rawtotaldict)['data']
            self._citydict = totaldict['citydict']
            self._spellcitydict = totaldict['spellcitydict']

        self.set_auto_update(1)

        return {'result' : '0'}
开发者ID:jizhouli,项目名称:realtime-booking-board,代码行数:29,代码来源:service_interface.py


示例11: get_updated_account_data_for_user_txn

        def get_updated_account_data_for_user_txn(txn):
            sql = (
                "SELECT account_data_type, content FROM account_data"
                " WHERE user_id = ? AND stream_id > ?"
            )

            txn.execute(sql, (user_id, stream_id))

            global_account_data = {
                row[0]: json.loads(row[1]) for row in txn.fetchall()
            }

            sql = (
                "SELECT room_id, account_data_type, content FROM room_account_data"
                " WHERE user_id = ? AND stream_id > ?"
            )

            txn.execute(sql, (user_id, stream_id))

            account_data_by_room = {}
            for row in txn.fetchall():
                room_account_data = account_data_by_room.setdefault(row[0], {})
                room_account_data[row[1]] = json.loads(row[2])

            return (global_account_data, account_data_by_room)
开发者ID:0-T-0,项目名称:synapse,代码行数:25,代码来源:account_data.py


示例12: main

def main(j, args, params, tags, tasklet):
    doc = args.doc
    nid = args.getTag('nid')
    nidstr = str(nid)
    rediscl = j.clients.redis.getGeventRedisClient('127.0.0.1', 9999)

    out = list()

    disks = rediscl.hget('healthcheck:monitoring', 'results')
    errors = rediscl.hget('healthcheck:monitoring', 'errors')
    disks = ujson.loads(disks) if disks else dict()
    errors = ujson.loads(errors) if errors else dict()

    out.append('||Disk||Free Space||Status||')
    for type, data in (('error', errors), ('disk', disks)):
        if nidstr in data:
            if 'disks' in data.get(nidstr, dict()):
                ddata = data[nidstr].get('disks', list())
                for diskstat in ddata:
                    if type == 'error':
                        diskstat = diskstat.values()[0]
                    if 'state' not in diskstat:
                        continue
                    state = j.core.grid.healthchecker.getWikiStatus(diskstat.get('state', 'UNKNOWN'))
                    out.append('|%s|%s|%s|' % (diskstat.get('path', ''), diskstat.get('message', ''), state))
                out.append('\n')

    out = '\n'.join(out)

    params.result = (out, doc)
    return params
开发者ID:jumpscale7,项目名称:jumpscale6_core,代码行数:31,代码来源:1_diskcheck.py


示例13: list_of_all_data_in_bioactivities

def list_of_all_data_in_bioactivities(request):
    """Lists all requested data for filtering bioactivities"""
    exclude_questionable = json.loads(request.GET.get('exclude_questionable'))
    pubchem = json.loads(request.GET.get('pubchem'))
    target_types = json.loads(request.GET.get('target_types'))
    organisms = json.loads(request.GET.get('organisms'))

    desired_target_types = [
        x.get(
            'name'
        ) for x in target_types
        if x.get(
            'is_selected'
        ) is True
    ]

    desired_organisms = [
        x.get(
            'name'
        ) for x in organisms
        if x.get(
            'is_selected'
        ) is True
    ]

    return JSONResponse(
        generate_list_of_all_data_in_bioactivities(
            exclude_questionable,
            pubchem,
            desired_organisms,
            desired_target_types
        )
    )
开发者ID:UPDDI,项目名称:mps-database-server,代码行数:33,代码来源:views.py


示例14: thread_main

        def thread_main():
            delta = None
            rpc_count = 0
            while True:
                want = self.want_axis_video
                req = {
                    '__type': 'AxisCameraDaemonReq',
                    'traceSpec': {
                        '__type': 'VideoTraceSpec',
                        'traceDir': path.join(remote_traces_dir, self.trace_name),
                        'traceName': self.trace_name,
                        'timeseqName': timeseq_name,
                    },
                    'cameraConfig': {
                        '__type': 'AxisCameraConfig',
                        'ipaddr': ipaddr,
                        'url': '/axis-cgi/mjpg/video.cgi?compression=30&rotation=0&resolution=' + resolution,
                        'authHeader': auth_header,
                    },
                    'txTime': time.time(),
                    'recordFor': 10.0 if want else 0.0,
                }
                rpc_id = 'rpc%d' % rpc_count
                rpc_count += 1
                tx = ['record', rpc_id, ujson.dumps(req, escape_forward_slashes=False)]
                if 1: logger.info('camera %s < %s', daemon_endpoint, tx)
                self.sock.send_multipart(tx, flags=0, copy=True, track=False)

                rx = self.sock.recv_multipart(flags=0, copy=True, track=False)
                rx_time = time.time()
                if 1: logger.info('%s > %s %s', daemon_endpoint, rx[1], rx[2])
                rpc_id2 = rx[0]
                rpc_err = ujson.loads(rx[1])
                rpc_result = ujson.loads(rx[2])
                if rpc_err:
                    logger.info('%s > error %s', daemon_endpoint, rpc_err)
                    break

                min_delta = rpc_result['txTime'] - rx_time
                max_delta = rpc_result['txTime'] - rpc_result['reqTxTime']
                if delta is None:
                    delta = (min_delta + max_delta) * 0.5
                delta = max(min_delta, min(max_delta, delta))
                if 0: logger.info("timing %0.6f %+0.6f %+0.6f min_delta=%+0.6f max_delta=%+0.6f delta=%+0.6f",
                    rpc_result['reqTxTime'], rpc_result['txTime'], rx_time, min_delta, max_delta, delta)

                rep_times = rpc_result['times']
                rep_samples = rpc_result['samples']
                for ts, sample in zip(rep_times, rep_samples):
                    self.add(ts - delta, timeseq_name, sample)
                rep_chunks = rpc_result['chunks']
                for chunk_ts in rep_chunks:
                    chunk_fn = 'chunk_%s_%.0f.video' % (timeseq_name, chunk_ts)
                    chunk_path = path.join(self.trace_dir, chunk_fn)
                    src_file = path.join(local_link_prefix, self.trace_name, chunk_fn)
                    logger.info('Create symlink(%s, %s)  llp=%s rtd=%s', src_file, chunk_path, local_link_prefix, remote_traces_dir)
                    os.symlink(src_file, chunk_path)

                if not want: break
                time.sleep(0.1)
开发者ID:openai,项目名称:rosbridge,代码行数:60,代码来源:timeseq.py


示例15: notify

        def notify(result):
            stream = result.get("events")
            if stream:
                max_position = stream["position"]
                for row in stream["rows"]:
                    position = row[0]
                    internal = json.loads(row[1])
                    event_json = json.loads(row[2])
                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
                    extra_users = ()
                    if event.type == EventTypes.Member:
                        extra_users = (event.state_key,)
                    notifier.on_new_room_event(
                        event, position, max_position, extra_users
                    )

            notify_from_stream(
                result, "push_rules", "push_rules_key", user="user_id"
            )
            notify_from_stream(
                result, "user_account_data", "account_data_key", user="user_id"
            )
            notify_from_stream(
                result, "room_account_data", "account_data_key", user="user_id"
            )
            notify_from_stream(
                result, "tag_account_data", "account_data_key", user="user_id"
            )
            notify_from_stream(
                result, "receipts", "receipt_key", room="room_id"
            )
            notify_from_stream(
                result, "typing", "typing_key", room="room_id"
            )
开发者ID:pombredanne,项目名称:synapse-2,代码行数:34,代码来源:synchrotron.py


示例16: filterObjects

def filterObjects(request, obj_id):
    """
    Filters Gallery for the requested ImageVideo objects.  Returns a Result object with 
    serialized objects
    """
    print obj_id
    obj = Gallery.objects.get(pk=obj_id)

    if request.user.is_anonymous() and obj.security != Gallery.PUBLIC:
        res = Result()
        res.isError = True
        res.message = 'This gallery is not public'

        return JsonResponse(res)

    
    tags = json.loads(request.GET.get('filters', '[[]]'))
    rng = request.GET.get('rng', None)
    more = json.loads(request.GET.get('more', 'false'))
    models = request.GET.get('models', 'image,video')
    if models == '':
        models = 'image,video'

    tags = filter(None, tags)

    models = [ContentType.objects.get(app_label='frog', model=x) for x in models.split(',')]

    return _filter(request, obj, tags=tags, rng=rng, models=models, more=more)
开发者ID:davideilering,项目名称:Frog,代码行数:28,代码来源:gallery.py


示例17: main

def main(JSONinput):

    query = json.loads(JSONinput)
    # Set up the query.
    p = SQLAPIcall(query)

    # run the query.
    resp = p.execute()

    if query['method'] == 'data' and 'format' in query and query['format'] == 'json':
        try:
            resp = json.loads(resp)
        except:
            resp = dict(status="error", code=500,
                        message="Internal error: server did not return json")

        # Print appropriate HTML headers
        if 'status' in resp and resp['status'] == 'error':
            code = resp['code'] if 'code' in resp else 500
            headers(query['method'], errorcode=code)
        else:
            headers(query['method'])
        print json.dumps(resp)
    else:
        headers(query['method'])
        print resp

    return True
开发者ID:Bookworm-project,项目名称:BookwormDB,代码行数:28,代码来源:dbbindings.py


示例18: load_results

def load_results(dir, raw_episodes=False):
    fnames = get_monitor_files(dir)
    if not fnames:
        raise LoadMonitorResultsError("no monitor files of the form *%s found in %s" % (Monitor.EXT, dir))
    episodes = []
    headers = []
    for fname in fnames:
        with open(fname, 'rt') as fh:
            lines = fh.readlines()
        header = json.loads(lines[0])
        headers.append(header)
        for line in lines[1:]:
            episode = json.loads(line)
            episode['abstime'] = header['t_start'] + episode['t']
            del episode['t']
            episodes.append(episode)
    header0 = headers[0]
    for header in headers[1:]:
        assert header['env_id'] == header0['env_id'], "mixing data from two envs"
    episodes = sorted(episodes, key=lambda e: e['abstime'])
    if raw_episodes:
        return episodes
    else:
        return {
            'env_info': {'env_id': header0['env_id'], 'gym_version': header0['gym_version']},
            'episode_end_times': [e['abstime'] for e in episodes],
            'episode_lengths': [e['l'] for e in episodes],
            'episode_rewards': [e['r'] for e in episodes],
            'initial_reset_time': min([min(header['t_start'] for header in headers)])
        }
开发者ID:tony32769,项目名称:treeqn,代码行数:30,代码来源:monitor.py


示例19: test_ia_search_itemlist

def test_ia_search_itemlist(capsys):
    with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps:
        url1 = ('{0}//archive.org/services/search/beta/scrape.php'
                '?q=collection%3Aattentionkmartshoppers'
                '&REQUIRE_AUTH=true&size=10000'.format(protocol))
        url2 = ('{0}//archive.org/services/search/beta/scrape.php?'
                'cursor=W3siaWRlbnRpZmllciI6IjE5NjEtTC0wNTkxNCJ9XQ%3D%3D'
                '&REQUIRE_AUTH=true&q=collection%3Aattentionkmartshoppers'
                '&size=10000'.format(protocol))
        rsps.add(responses.POST, url1,
                 body=TEST_SCRAPE_RESPONSE,
                 status=200,
                 match_querystring=True)
        _j = json.loads(TEST_SCRAPE_RESPONSE)
        del _j['cursor']
        _r = json.dumps(_j)
        rsps.add(responses.POST, url2,
                 body=_r,
                 status=200,
                 match_querystring=True)

        sys.argv = ['ia', 'search', 'collection:attentionkmartshoppers', '--itemlist']
        try:
            ia.main()
        except SystemExit as exc:
            assert not exc.code

    out, err = capsys.readouterr()
    j = json.loads(TEST_SEARCH_RESPONSE)
    assert len(out.split()) == 200
开发者ID:FactMiners,项目名称:internetarchive,代码行数:30,代码来源:test_ia_search.py


示例20: etr_circ

def etr_circ():

    try:
        data = loads(request.args.get('q', '{}'))
    except (TypeError, ValueError, OverflowError):
        return jsonify_status_code(400, message='Unable to decode data')

    if not data:
        query = World_Circonscriptions.query.filter(
                    World_Circonscriptions.cir_num != None).all()
    else:
        query = World_Circonscriptions.query.filter(World_Circonscriptions.cir_num.in_(data['cirid'])).all()
    
    geojs = {"crs" : None, "type" : "FeatureCollection", "features" : list()}
    
    for circo in query:
        geomjs = db.session.scalar(circo.geom.geojson)
        geompy = loads(geomjs)
        geojs['features'].append(
            {'geometry': geompy, 
            'type':'Feature',
            'id': circo.gid, 
            'properties': {'name' : circo.name, 'cir_num' : circo.cir_num}
        })
    
    return Response(dumps(geojs), mimetype='application/json')
开发者ID:blackrez,项目名称:webcirconscriptionsrc,代码行数:26,代码来源:app.py



注:本文中的ujson.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python analysis.SDAPlotter类代码示例发布时间:2022-05-27
下一篇:
Python ujson.load函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap