本文整理汇总了Python中neutron.openstack.common.jsonutils.loads函数的典型用法代码示例。如果您正苦于以下问题:Python loads函数的具体用法?Python loads怎么用?Python loads使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了loads函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_plugin_prefix_with_parent_resource
def test_plugin_prefix_with_parent_resource(self):
controller = self.DummySvcPlugin()
parent = dict(member_name="tenant",
collection_name="tenants")
member = {'custom_member_action': "GET"}
collections = {'collection_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller, parent,
path_prefix="/dummy_svc",
member_actions=member,
collection_actions=collections)
test_app = _setup_extensions_test_app(SimpleExtensionManager(res_ext))
index_response = test_app.get("/dummy_svc/tenants/1/tweedles")
self.assertEqual(200, index_response.status_int)
response = test_app.get("/dummy_svc/tenants/1/"
"tweedles/1/custom_member_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['member_action'],
"value")
response = test_app.get("/dummy_svc/tenants/2/"
"tweedles/collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'],
"value")
开发者ID:basilbaby,项目名称:neutron,代码行数:26,代码来源:test_extensions.py
示例2: _get_handler
def _get_handler(resource):
if resource == GET_200[2]:
data = json.loads('[{"name":"a"},{"name":"b"}]')
return 200, '', '', data
if resource in GET_200:
return 200, '', '', ''
else:
data = json.loads('{"complete":"True", "success": "True"}')
return 202, '', '', data
开发者ID:JoeMido,项目名称:neutron,代码行数:10,代码来源:test_plugin_driver.py
示例3: _fetch_template_and_params
def _fetch_template_and_params(self, context, sc_instance,
sc_spec, sc_node):
stack_template = sc_node.get('config')
# TODO(magesh):Raise an exception ??
if not stack_template:
LOG.error(_("Service Config is not defined for the service"
" chain Node"))
return
stack_template = jsonutils.loads(stack_template)
config_param_values = sc_instance.get('config_param_values', {})
stack_params = {}
# config_param_values has the parameters for all Nodes. Only apply
# the ones relevant for this Node
if config_param_values:
config_param_values = jsonutils.loads(config_param_values)
config_param_names = sc_spec.get('config_param_names', [])
if config_param_names:
config_param_names = ast.literal_eval(config_param_names)
# This service chain driver knows how to fill in two parameter values
# for the template at present.
# 1)Subnet -> Provider PTG subnet is used
# 2)PoolMemberIPs -> List of IP Addresses of all PTs in Provider PTG
# TODO(magesh):Process on the basis of ResourceType rather than Name
# eg: Type: OS::Neutron::PoolMember
# Variable number of pool members is not handled yet. We may have to
# dynamically modify the template json to achieve that
member_ips = []
provider_ptg_id = sc_instance.get("provider_ptg_id")
# If we have the key "PoolMemberIP*" in template input parameters,
# fetch the list of IPs of all PTs in the PTG
for key in config_param_names or []:
if "PoolMemberIP" in key:
member_ips = self._get_member_ips(context, provider_ptg_id)
break
member_count = 0
for key in config_param_names or []:
if "PoolMemberIP" in key:
value = (member_ips[member_count]
if len(member_ips) > member_count else '0')
member_count = member_count + 1
config_param_values[key] = value
elif key == "Subnet":
value = self._get_ptg_subnet(context, provider_ptg_id)
config_param_values[key] = value
node_params = (stack_template.get('Parameters')
or stack_template.get('parameters'))
if node_params:
for parameter in config_param_values.keys():
if parameter in node_params.keys():
stack_params[parameter] = config_param_values[parameter]
return (stack_template, stack_params)
开发者ID:philpraxis,项目名称:group-based-policy,代码行数:54,代码来源:simplechain_driver.py
示例4: test_resource_extension_for_get_custom_collection_action
def test_resource_extension_for_get_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = _setup_extensions_test_app(SimpleExtensionManager(res_ext))
response = test_app.get("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
LOG.debug(jsonutils.loads(response.body))
self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
开发者ID:basilbaby,项目名称:neutron,代码行数:11,代码来源:test_extensions.py
示例5: _post_handler
def _post_handler(resource, binary):
if re.search(r'/api/workflow/.+/action/.+', resource):
data = jsonutils.loads('{"uri":"some_uri"}')
return 202, '', '', data
elif re.search(r'/api/service\?name=.+', resource):
data = jsonutils.loads('{"links":{"actions":{"provision":"someuri"}}}')
return 201, '', '', data
elif binary:
return 201, '', '', ''
else:
return 202, '', '', ''
开发者ID:basilbaby,项目名称:neutron,代码行数:11,代码来源:test_plugin_driver.py
示例6: test_sync_multi_chunk
def test_sync_multi_chunk(self):
# The fake NVP API client cannot be used for this test
ctx = context.get_admin_context()
# Generate 4 networks, 1 port per network, and 4 routers
with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
fake_lswitches = json.loads(
self.fc.handle_get('/ws.v1/lswitch'))['results']
fake_lrouters = json.loads(
self.fc.handle_get('/ws.v1/lrouter'))['results']
fake_lswitchports = json.loads(
self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results']
return_values = [
# Chunk 0 - lswitches
(fake_lswitches, None, 4),
# Chunk 0 - lrouters
(fake_lrouters[:2], 'xxx', 4),
# Chunk 0 - lports (size only)
([], 'start', 4),
# Chunk 1 - lrouters (2 more) (lswitches are skipped)
(fake_lrouters[2:], None, None),
# Chunk 1 - lports
(fake_lswitchports, None, 4)]
def fake_fetch_data(*args, **kwargs):
return return_values.pop(0)
# 2 Chunks, with 6 resources each.
# 1st chunk lswitches and lrouters
# 2nd chunk lrouters and lports
# Mock _fetch_data
with mock.patch.object(
self._plugin._synchronizer, '_fetch_data',
side_effect=fake_fetch_data):
sp = sync.SyncParameters(6)
def do_chunk(chunk_idx, ls_cursor, lr_cursor, lp_cursor):
self._plugin._synchronizer._synchronize_state(sp)
self.assertEqual(chunk_idx, sp.current_chunk)
self.assertEqual(ls_cursor, sp.ls_cursor)
self.assertEqual(lr_cursor, sp.lr_cursor)
self.assertEqual(lp_cursor, sp.lp_cursor)
# check 1st chunk
do_chunk(1, None, 'xxx', 'start')
# check 2nd chunk
do_chunk(0, None, None, None)
# Chunk size should have stayed the same
self.assertEqual(sp.chunk_size, 6)
开发者ID:noxhana,项目名称:neutron,代码行数:48,代码来源:test_nvp_sync.py
示例7: _goose_handler
def _goose_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = jsonutils.loads(res.body)
data['FOXNSOX:googoose'] = req.GET.get('chewing')
res.body = jsonutils.dumps(data)
return res
开发者ID:AsherBond,项目名称:quantum,代码行数:7,代码来源:foxinsocks.py
示例8: get_port_tag_dict
def get_port_tag_dict(self):
"""Get a dict of port names and associated vlan tags.
e.g. the returned dict is of the following form::
{u'int-br-eth2': [],
u'patch-tun': [],
u'qr-76d9e6b6-21': 1,
u'tapce5318ff-78': 1,
u'tape1400310-e6': 1}
The TAG ID is only available in the "Port" table and is not available
in the "Interface" table queried by the get_vif_port_set() method.
"""
port_names = self.get_port_name_list()
args = ['--format=json', '--', '--columns=name,tag', 'list', 'Port']
result = self.run_vsctl(args, check_error=True)
port_tag_dict = {}
if not result:
return port_tag_dict
for name, tag in jsonutils.loads(result)['data']:
if name not in port_names:
continue
# 'tag' can be [u'set', []] or an integer
if isinstance(tag, list):
tag = tag[1]
port_tag_dict[name] = tag
return port_tag_dict
开发者ID:Zemeio,项目名称:neutron,代码行数:29,代码来源:ovs_lib.py
示例9: _show
def _show(self, resource_type, response_file,
uuid1, uuid2=None, relations=None):
target_uuid = uuid2 or uuid1
if resource_type.endswith('attachment'):
resource_type = resource_type[:resource_type.index('attachment')]
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
response_template = f.read()
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
for item in res_dict.itervalues():
if 'tags' in item:
item['tags_json'] = jsonutils.dumps(item['tags'])
# replace sec prof rules with their json dump
def jsonify_rules(rule_key):
if rule_key in item:
rules_json = jsonutils.dumps(item[rule_key])
item['%s_json' % rule_key] = rules_json
jsonify_rules('logical_port_egress_rules')
jsonify_rules('logical_port_ingress_rules')
items = [jsonutils.loads(response_template % res_dict[res_uuid])
for res_uuid in res_dict if res_uuid == target_uuid]
if items:
return jsonutils.dumps(items[0])
raise api_exc.ResourceNotFound()
开发者ID:AsherBond,项目名称:quantum,代码行数:25,代码来源:fake.py
示例10: _build_item
def _build_item(resource):
item = jsonutils.loads(response_template % resource)
if relations:
for relation in relations:
self._build_relation(resource, item,
resource_type, relation)
return item
开发者ID:AsherBond,项目名称:quantum,代码行数:7,代码来源:fake.py
示例11: _add_lswitch_lport
def _add_lswitch_lport(self, body, ls_uuid):
fake_lport = jsonutils.loads(body)
new_uuid = uuidutils.generate_uuid()
fake_lport['uuid'] = new_uuid
# put the tenant_id and the ls_uuid in the main dict
# for simplyfying templating
fake_lport['ls_uuid'] = ls_uuid
fake_lport['tenant_id'] = self._get_tag(fake_lport, 'os_tid')
fake_lport['neutron_port_id'] = self._get_tag(fake_lport,
'q_port_id')
fake_lport['neutron_device_id'] = self._get_tag(fake_lport, 'vm_id')
fake_lport['att_type'] = "NoAttachment"
fake_lport['att_info_json'] = ''
self._fake_lswitch_lport_dict[fake_lport['uuid']] = fake_lport
fake_lswitch = self._fake_lswitch_dict[ls_uuid]
fake_lswitch['lport_count'] += 1
fake_lport_status = fake_lport.copy()
fake_lport_status['ls_tenant_id'] = fake_lswitch['tenant_id']
fake_lport_status['ls_uuid'] = fake_lswitch['uuid']
fake_lport_status['ls_name'] = fake_lswitch['display_name']
fake_lport_status['ls_zone_uuid'] = fake_lswitch['zone_uuid']
# set status value
fake_lport['status'] = 'true'
self._fake_lswitch_lportstatus_dict[new_uuid] = fake_lport_status
return fake_lport
开发者ID:AsherBond,项目名称:quantum,代码行数:26,代码来源:fake.py
示例12: _request_api_server
def _request_api_server(self, url, data=None, headers=None):
# Attempt to post to Api-Server
if self._apiinsecure:
response = requests.post(url, data=data, headers=headers,verify=False)
elif not self._apiinsecure and self._use_api_certs:
response = requests.post(url, data=data, headers=headers,verify=self._apicertbundle)
else:
response = requests.post(url, data=data, headers=headers)
if (response.status_code == requests.codes.unauthorized):
# Get token from keystone and save it for next request
if self._ksinsecure:
response = requests.post(self._keystone_url,
data=self._authn_body,
headers={'Content-type': 'application/json'},verify=False)
elif not self._ksinsecure and self._use_ks_certs:
response = requests.post(self._keystone_url,
data=self._authn_body,
headers={'Content-type': 'application/json'},verify=self._kscertbundle)
else:
response = requests.post(self._keystone_url,
data=self._authn_body,
headers={'Content-type': 'application/json'})
if (response.status_code == requests.codes.ok):
# plan is to re-issue original request with new token
auth_headers = headers or {}
authn_content = json.loads(response.text)
self._authn_token = authn_content['access']['token']['id']
auth_headers['X-AUTH-TOKEN'] = self._authn_token
response = self._request_api_server(url, data, auth_headers)
else:
raise RuntimeError('Authentication Failure')
return response
开发者ID:dattamiruke,项目名称:contrail-neutron-plugin,代码行数:32,代码来源:contrail_plugin.py
示例13: get_vif_port_by_id
def get_vif_port_by_id(self, port_id):
args = ['--format=json', '--', '--columns=external_ids,name,ofport',
'find', 'Interface',
'external_ids:iface-id="%s"' % port_id]
result = self.run_vsctl(args)
if not result:
return
json_result = jsonutils.loads(result)
try:
# Retrieve the indexes of the columns we're looking for
headings = json_result['headings']
ext_ids_idx = headings.index('external_ids')
name_idx = headings.index('name')
ofport_idx = headings.index('ofport')
# If data attribute is missing or empty the line below will raise
# an exeception which will be captured in this block.
# We won't deal with the possibility of ovs-vsctl return multiple
# rows since the interface identifier is unique
data = json_result['data'][0]
port_name = data[name_idx]
ofport = data[ofport_idx]
# ofport must be integer otherwise return None
if not isinstance(ofport, int) or ofport == -1:
LOG.warn(_("ofport: %(ofport)s for VIF: %(vif)s is not a"
"positive integer"), {'ofport': ofport,
'vif': port_id})
return
# Find VIF's mac address in external ids
ext_id_dict = dict((item[0], item[1]) for item in
data[ext_ids_idx][1])
vif_mac = ext_id_dict['attached-mac']
return VifPort(port_name, ofport, port_id, vif_mac, self)
except Exception as e:
LOG.warn(_("Unable to parse interface details. Exception: %s"), e)
return
开发者ID:greynolds123,项目名称:neutron,代码行数:35,代码来源:ovs_lib.py
示例14: get_qos_by_port
def get_qos_by_port(self, port_id):
args = ['--format=json', '--', 'find', 'qos',
'external_ids:port-id="%s"' % port_id]
result = self.run_vsctl(args)
if not result:
return
json_result = jsonutils.loads(result)
try:
# Retrieve the indexes of the columns we're looking for
headings = json_result['headings']
qos_idx = headings.index('_uuid')
ext_ids_idx = headings.index('external_ids')
other_idx = headings.index('other_config')
queues_idx = headings.index('queues')
type_idx = headings.index('type')
# If data attribute is missing or empty the line below will raise
# an exeception which will be captured in this block.
# We won't deal with the possibility of ovs-vsctl return multiple
# rows since the interface identifier is unique
data = json_result['data'][0]
qos_id = data[qos_idx][1]
ext_id_dict = dict((item[0], item[1]) for item in
data[ext_ids_idx][1])
port_id = ext_id_dict['port-id']
other_dict = dict((item[0], item[1]) for item in
data[other_idx][1])
queues_dict = dict((item[0], item[1]) for item in
data[queues_idx][1])
type = data[type_idx]
return PortQos(port_id, qos_id, other_dict, queues_dict, type)
except Exception as e:
LOG.warn(_("Unable to parse qos details. Exception: %s"), e)
return
开发者ID:nkapotoxin,项目名称:fs_spc111t_plus_hc,代码行数:34,代码来源:openflow.py
示例15: api_providers
def api_providers(self):
"""Parse api_providers from response.
Returns: api_providers in [(host, port, is_ssl), ...] format
"""
def _provider_from_listen_addr(addr):
# (pssl|ptcp):<ip>:<port> => (host, port, is_ssl)
parts = addr.split(':')
return (parts[1], int(parts[2]), parts[0] == 'pssl')
try:
if self.successful():
ret = []
body = json.loads(self.value.body)
for node in body.get('results', []):
for role in node.get('roles', []):
if role.get('role') == 'api_provider':
addr = role.get('listen_addr')
if addr:
ret.append(_provider_from_listen_addr(addr))
return ret
except Exception as e:
LOG.warn(_("[%(rid)d] Failed to parse API provider: %(e)s"),
{'rid': self._rid(), 'e': e})
# intentionally fall through
return None
开发者ID:cisco-pnsc,项目名称:neutron,代码行数:26,代码来源:eventlet_request.py
示例16: _get_keystone_token_v2
def _get_keystone_token_v2(self):
kcfg = cfg.CONF.keystone_authtoken
auth_body = {
"auth": {
"passwordCredentials": {
"username": kcfg.admin_user,
"password": kcfg.admin_password,
"tenantName": kcfg.admin_tenant_name
}
}
}
keystone_url = "%s://%s:%s%s" % (
cfg.CONF.keystone_authtoken.auth_protocol,
cfg.CONF.keystone_authtoken.auth_host,
cfg.CONF.keystone_authtoken.auth_port,
"/v2.0/tokens"
)
response = self._query_keystone_server(keystone_url, auth_body)
if response.status_code == requests.codes.ok:
authn_content = json.loads(response.text)
self._authn_token = authn_content['access']['token']['id']
return response
else:
raise RuntimeError('Authentication Failure')
开发者ID:kklimonda,项目名称:contrail-neutron-plugin,代码行数:26,代码来源:contrail_plugin.py
示例17: get_vif_port_set
def get_vif_port_set(self):
port_names = self.get_port_name_list()
edge_ports = set()
args = ["--format=json", "--", "--columns=name,external_ids,ofport", "list", "Interface"]
result = self.run_vsctl(args, check_error=True)
if not result:
return edge_ports
for row in jsonutils.loads(result)["data"]:
name = row[0]
if name not in port_names:
continue
external_ids = dict(row[1][1])
# Do not consider VIFs which aren't yet ready
# This can happen when ofport values are either [] or ["set", []]
# We will therefore consider only integer values for ofport
ofport = row[2]
try:
int_ofport = int(ofport)
except (ValueError, TypeError):
LOG.warn(_("Found not yet ready openvswitch port: %s"), row)
else:
if int_ofport > 0:
if "iface-id" in external_ids and "attached-mac" in external_ids:
edge_ports.add(external_ids["iface-id"])
elif "xs-vif-uuid" in external_ids and "attached-mac" in external_ids:
# if this is a xenserver and iface-id is not
# automatically synced to OVS from XAPI, we grab it
# from XAPI directly
iface_id = self.get_xapi_iface_id(external_ids["xs-vif-uuid"])
edge_ports.add(iface_id)
else:
LOG.warn(_("Found failed openvswitch port: %s"), row)
return edge_ports
开发者ID:AsherBond,项目名称:quantum,代码行数:33,代码来源:ovs_lib.py
示例18: _bands_handler
def _bands_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = jsonutils.loads(res.body)
data['FOXNSOX:big_bands'] = 'Pig Bands!'
res.body = jsonutils.dumps(data)
return res
开发者ID:AsherBond,项目名称:quantum,代码行数:7,代码来源:foxinsocks.py
示例19: _handler
def _handler(self, client_sock, client_addr):
"""Handle incoming lease relay stream connection.
This method will only read the first 1024 bytes and then close the
connection. The limit exists to limit the impact of misbehaving
clients.
"""
try:
msg = client_sock.recv(1024)
data = jsonutils.loads(msg)
client_sock.close()
network_id = data['network_id']
if not uuidutils.is_uuid_like(network_id):
raise ValueError(_("Network ID %s is not a valid UUID") %
network_id)
ip_address = str(netaddr.IPAddress(data['ip_address']))
lease_remaining = int(data['lease_remaining'])
self.callback(network_id, ip_address, lease_remaining)
except ValueError as e:
LOG.warn(_('Unable to parse lease relay msg to dict.'))
LOG.warn(_('Exception value: %s'), e)
LOG.warn(_('Message representation: %s'), repr(msg))
except Exception as e:
LOG.exception(_('Unable update lease. Exception'))
开发者ID:armando-migliaccio,项目名称:neutron,代码行数:25,代码来源:dhcp_agent.py
示例20: get_port_tag_dict
def get_port_tag_dict(self):
"""Get a dict of port names and associated vlan tags.
e.g. the returned dict is of the following form::
{u'int-br-eth2': [],
u'patch-tun': [],
u'qr-76d9e6b6-21': 1,
u'tapce5318ff-78': 1,
u'tape1400310-e6': 1}
The TAG ID is only available in the "Port" table and is not available
in the "Interface" table queried by the get_vif_port_set() method.
"""
port_names = self.get_port_name_list()
#['ha-0e10bc14-99', 'int-br-ex', 'patch-tun', 'qr-77bec540-b3', 'qr-8f9a5cdd-3d', 'qvoee622161-cf', 'tap6f3f6216-ef', 'tap9bf56d57-97']
args = ['--format=json', '--', '--columns=name,tag', 'list', 'Port']
result = self.run_vsctl(args,
check_error=True)
#'{"data":[["qr-8f9a5cdd-3d",1],["ha-0e10bc14-99",3],["br-ex",["set",[]]],["tap6f3f6216-ef",2],["patch-tun",["set",[]]],["phy-br-ex",["set",[]]],["tap9bf56d57-97",1],["patch-int",["set",[]]],["int-br-ex",["set",[]]],["qr-77bec540-b3",2],["br-tun",["set",[]]],["br-int",["set",[]]],["qvoee622161-cf",2]],"headings":["name","tag"]}\n'
port_tag_dict = {}
if not result:
return port_tag_dict
for name, tag in jsonutils.loads(result)['data']:
if name not in port_names:
continue
# 'tag' can be [u'set', []] or an integer
if isinstance(tag, list):
tag = tag[1]
port_tag_dict[name] = tag
return port_tag_dict
开发者ID:xiongmeng1108,项目名称:gcloud7_neutron-2014.2.2,代码行数:32,代码来源:ovs_lib.py
注:本文中的neutron.openstack.common.jsonutils.loads函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论