本文整理汇总了Python中util.json.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: dump
def dump(self):
"""Returns a dictionnary of all beans and attributes
keys are bean's names
values are bean's attributes in json format
ex:
{"org.apache.cassandra.db:instance=1826959904,type=DynamicEndpointSnitch":
{"UpdateInterval":100,
"Scores":{},
"SubsnitchClassName":"org.apache.cassandra.locator.SimpleSnitch",
"BadnessThreshold":0.1,
"ResetInterval":600000},
"org.apache.cassandra.db:columnfamily=NodeIdInfo,keyspace=system,type=ColumnFamilies":
{"LiveSSTableCount":0,
"LiveDiskSpaceUsed":0,
"MinimumCompactionThreshold":4,
"WriteCount":0,
"RowCacheKeysToSave":2147483647,
"RecentWriteLatencyHistogramMicros":[0,0,0,0,0,0,0,0,0,0]}
}
"""
self._jmx.sendline("dump")
self._wait_prompt()
content = self._jmx.before.replace('dump','').strip()
jsonvar = json.loads(content)
return jsonvar
开发者ID:potto007,项目名称:dd-agent,代码行数:30,代码来源:jmx_connector.py
示例2: parse_json
def parse_json(cls, raw, tags=None):
if tags is None:
tags = []
parsed = json.loads(raw)
metric_base = 'nginx'
output = []
all_keys = parsed.keys()
tagged_keys = [('caches', 'cache'), ('server_zones', 'server_zone'),
('upstreams', 'upstream')]
# Process the special keys that should turn into tags instead of
# getting concatenated to the metric name
for key, tag_name in tagged_keys:
metric_name = '%s.%s' % (metric_base, tag_name)
for tag_val, data in parsed.get(key, {}).iteritems():
tag = '%s:%s' % (tag_name, tag_val)
output.extend(cls._flatten_json(metric_name, data, tags + [tag]))
# Process the rest of the keys
rest = set(all_keys) - set([k for k, _ in tagged_keys])
for key in rest:
metric_name = '%s.%s' % (metric_base, key)
output.extend(cls._flatten_json(metric_name, parsed[key], tags))
return output
开发者ID:hungld,项目名称:dd-agent,代码行数:26,代码来源:nginx.py
示例3: get_node_stats
def get_node_stats(self, base_url):
url = urlparse.urljoin(base_url, 'nodes')
stats = []
try:
stats = json.loads(urllib2.urlopen(url).read())
except urllib2.URLError, e:
raise Exception('Cannot open RabbitMQ API url: %s %s' % (url, str(e)))
开发者ID:CaptTofu,项目名称:dd-agent,代码行数:7,代码来源:rabbitmq.py
示例4: check
def check(self, logger, agentConfig):
if 'rabbitMQStatusUrl' not in agentConfig or \
'rabbitMQUser' not in agentConfig or \
'rabbitMQPass' not in agentConfig or \
agentConfig['rabbitMQStatusUrl'] == 'http://www.example.com:55672/json':
return False
try:
logger.debug('getRabbitMQStatus: attempting authentication setup')
manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
manager.add_password(None, agentConfig['rabbitMQStatusUrl'], agentConfig['rabbitMQUser'], agentConfig['rabbitMQPass'])
handler = urllib2.HTTPBasicAuthHandler(manager)
opener = urllib2.build_opener(handler)
urllib2.install_opener(opener)
logger.debug('getRabbitMQStatus: attempting urlopen')
req = urllib2.Request(agentConfig['rabbitMQStatusUrl'], None, headers(agentConfig))
# Do the request, log any errors
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
except:
logger.exception('Unable to get RabbitMQ status')
return False
开发者ID:lightkeeper,项目名称:dd-agent,代码行数:27,代码来源:queue.py
示例5: post
def post(self):
try:
payload = json.loads(zlib.decompress(self.request.body))
except:
#log.exception("Error parsing the agent's POST request body")
return
agent_update(payload)
开发者ID:Julian,项目名称:dd-agent,代码行数:7,代码来源:pup.py
示例6: post
def post(self):
try:
body = json.loads(self.request.body)
series = body['series']
except Exception:
return
update(series)
开发者ID:bakins,项目名称:dd-agent,代码行数:7,代码来源:pup.py
示例7: _get_json
def _get_json(self, uri, params=None, multi=False):
"""Utility method to get and parse JSON streams."""
if params:
uri = "%s?%s" % (uri, urllib.urlencode(params))
self.log.debug("Connecting to Docker API at: %s" % uri)
req = urllib2.Request(uri, None)
try:
request = self.url_opener.open(req)
except urllib2.URLError as e:
if "Errno 13" in str(e):
raise Exception("Unable to connect to socket. dd-agent user must be part of the 'docker' group")
raise
response = request.read()
response = response.replace('\n', '') # Some Docker API versions occassionally send newlines in responses
self.log.debug('Docker API response: %s', response)
if multi and "}{" in response: # docker api sometimes returns juxtaposed json dictionaries
response = "[{0}]".format(response.replace("}{", "},{"))
if not response:
return []
try:
return json.loads(response)
except Exception as e:
self.log.error('Failed to parse Docker API response: %s', response)
raise DockerJSONDecodeError
开发者ID:DylanFrese,项目名称:dd-agent,代码行数:28,代码来源:docker.py
示例8: get_node_stats
def get_node_stats(self, base_url):
url = urlparse.urljoin(base_url, 'nodes')
stats = []
try:
stats = json.loads(urllib2.urlopen(url).read())
except urllib2.URLError, e:
self.log.info('Cannot open RabbitMQ API url: %s', url)
开发者ID:potto007,项目名称:dd-agent,代码行数:7,代码来源:rabbitmq.py
示例9: _get_stats
def _get_stats(self, url):
"Hit a given URL and return the parsed json"
self.log.debug('Fetching Couchbase stats at url: %s' % url)
req = urllib2.Request(url, None, headers(self.agentConfig))
# Do the request, log any errors
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
开发者ID:Vinceveve,项目名称:dd-agent,代码行数:9,代码来源:couchbase.py
示例10: _get_data
def _get_data(self, url, auth=None):
""" Hit a given URL and return the parsed json
`auth` is a tuple of (username, password) or None
"""
req = urllib2.Request(url, None, headers(self.agentConfig))
if auth:
add_basic_auth(req, *auth)
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
开发者ID:temujin9,项目名称:dd-agent,代码行数:10,代码来源:elastic.py
示例11: getInitialUpdatedValue
def getInitialUpdatedValue(self):
"""Get updated value
"""
if self._repository.etcd is None:
raise ValueError("No etcd available")
if self._environ:
path = self._environ.getEtcdPath(self._key)
else:
path = self._repository.environ.getEtcdPath(self._key)
# Get value
return json.loads(self._repository.etcd.read(path).value)
开发者ID:lipixun,项目名称:pyconfigmslib,代码行数:11,代码来源:section.py
示例12: _get_stats
def _get_stats(self, url, instance):
"Hit a given URL and return the parsed json"
self.log.debug('Fetching Couchbase stats at url: %s' % url)
req = urllib2.Request(url, None, headers(self.agentConfig))
if 'user' in instance and 'password' in instance:
add_basic_auth(req, instance['user'], instance['password'])
# Do the request, log any errors
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
开发者ID:ghessler,项目名称:dd-agent,代码行数:11,代码来源:couchbase.py
示例13: _get_stats
def _get_stats(self, agentConfig, url):
"Hit a given URL and return the parsed json"
try:
req = urllib2.Request(url, None, headers(agentConfig))
# Do the request, log any errors
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
except:
self.logger.exception('Unable to get CouchDB statistics')
return None
开发者ID:lightkeeper,项目名称:dd-agent,代码行数:12,代码来源:couch.py
示例14: __autoupdate__
def __autoupdate__(self):
"""Auto update
"""
# TODO: Support modified index
initialized = False
self.logger.debug("[%s] Auto update thread started", self.Type)
while True:
# Get etcd client
client = None
while True:
if self._repository.etcd is None:
self.logger.error("[%s] Failed to watch config, no etcd client found, will retry in 30s", self.Type)
time.sleep(30)
continue
client = self._repository.etcd
break
# Wait for the config
# Get the read path
if self._environ:
path = self._environ.getEtcdPath(self._key)
else:
path = self._repository.environ.getEtcdPath(self._key)
# Wait the config
try:
if not initialized:
# Not initialized
self.logger.debug("[%s] Watching config at path [%s]", self.Type, path)
if self.update(json.loads(client.read(path).value)):
initialized = True
else:
# Initialized, just wait
self.logger.debug("[%s] Watching config at path [%s]", self.Type, path)
self.update(json.loads(client.read(path, wait = True).value))
except (EtcdKeyNotFound, EtcdWatchTimedOut, EtcdEventIndexCleared):
# A normal error
time.sleep(10)
except:
# Error, wait 30s and continue watch
self.logger.exception("[%s] Failed to watch etcd, will retry in 30s", self.Type)
time.sleep(30)
开发者ID:lipixun,项目名称:pyconfigmslib,代码行数:40,代码来源:section.py
示例15: import_main
def import_main(options, args):
if options.flavor == 'friendfeed':
i = ingest.Ingester(persistence.MongoPersistence())
flavor_object = friendfeed.FriendFeedInput()
if options.file is not None:
result = json.loads(open(options.file, "rb+").read())
else:
f = ff.FriendFeedImport(config.friendfeed['username'], config.friendfeed['password'])
result = f.get_all_home_entries()
flavor_object.data = result
i.ingest(flavor_object)
else:
print "Not implemented !"
开发者ID:adragomir,项目名称:wurbe22,代码行数:13,代码来源:import.py
示例16: _parse
def _parse(self, resp, content):
"""Parses a rabj response to get the envelope information
"""
if resp['content-type'] == 'application/json':
try:
envelope = json.loads(content)
if envelope['status']['code'] == 200:
return envelope
else:
error = envelope['error']
raise RabjError(error['code'], error['class'], error['detail'], envelope)
except ValueError, e:
_log.warn("Decode error %s in content %s", e, content)
raise RabjError(resp.status, resp.reason, {'msg': e.message}, content)
开发者ID:kochhar,项目名称:pyrabj,代码行数:14,代码来源:api.py
示例17: read
def read(self, rawdata):
# parse packets, throw out duplicates, forward to protocol
packets = json.loads(rawdata)
for key, encoding, data in packets:
if self.lastReceived >= key:
continue
self.lastReceived = key
if encoding == 1:
# the python 2.6 json library decodes JSON strings to
# unicode, while simplejson decodes JSON strings to
# str. since, at this point, the data should have
# been base64 encoded anyway, we can just cast
# the data to a string and call it a day.
data = base64.urlsafe_b64decode(str(data) + '==')
self.protocol.dataReceived(data)
开发者ID:skysbird,项目名称:csp,代码行数:15,代码来源:session.py
示例18: _get_stats
def _get_stats(self, url, instance):
"Hit a given URL and return the parsed json"
self.log.debug('Fetching Couchbase stats at url: %s' % url)
req = urllib2.Request(url, None, headers(self.agentConfig))
if 'user' in instance and 'password' in instance:
add_basic_auth(req, instance['user'], instance['password'])
if instance['is_recent_python']:
timeout = instance.get('timeout' , DEFAULT_TIMEOUT)
request = urllib2.urlopen(req,timeout=timeout)
else:
request = urllib2.urlopen(req)
response = request.read()
return json.loads(response)
开发者ID:dhapgood4thscreen,项目名称:dd-agent,代码行数:15,代码来源:couchbase.py
示例19: _get_primary_addr
def _get_primary_addr(self, agentConfig, url, node_name):
''' Returns a list of primary interface addresses as seen by ES.
Used in ES < 0.19
'''
req = urllib2.Request(url, None, headers(agentConfig))
request = urllib2.urlopen(req)
response = request.read()
data = json.loads(response)
if node_name in data['nodes']:
node = data['nodes'][node_name]
if 'network' in node\
and 'primary_interface' in node['network']\
and 'address' in node['network']['primary_interface']:
return node['network']['primary_interface']['address']
raise NodeNotFound()
开发者ID:potto007,项目名称:dd-agent,代码行数:17,代码来源:elastic.py
示例20: _get_primary_addr
def _get_primary_addr(self, agentConfig, url, node_name):
""" Returns a list of primary interface addresses as seen by ES.
Used in ES < 0.19
"""
req = urllib2.Request(url, None, headers(agentConfig))
request = urllib2.urlopen(req)
response = request.read()
data = json.loads(response)
if node_name in data["nodes"]:
node = data["nodes"][node_name]
if (
"network" in node
and "primary_interface" in node["network"]
and "address" in node["network"]["primary_interface"]
):
return node["network"]["primary_interface"]["address"]
raise NodeNotFound()
开发者ID:stefan-mees,项目名称:dd-agent,代码行数:19,代码来源:elastic.py
注:本文中的util.json.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论