本文整理汇总了Python中neutronclient.neutron.v2_0.find_resourceid_by_name_or_id函数的典型用法代码示例。如果您正苦于以下问题:Python find_resourceid_by_name_or_id函数的具体用法?Python find_resourceid_by_name_or_id怎么用?Python find_resourceid_by_name_or_id使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了find_resourceid_by_name_or_id函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_ip_address_is_cidr
def test_ip_address_is_cidr(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'abcd1234'
).MultipleTimes().AndReturn('abcd1234')
neutronclient.Client.create_port({'port': {
'network_id': u'abcd1234',
'allowed_address_pairs': [{
'ip_address': u'10.0.3.0/24',
'mac_address': u'00-B0-D0-86-BB-F7'
}],
'name': utils.PhysName('test_stack', 'port'),
'admin_state_up': True}}
).AndReturn({'port': {
"status": "BUILD",
"id": "2e00180a-ff9d-42c4-b701-a0606b243447"
}})
neutronclient.Client.show_port(
'2e00180a-ff9d-42c4-b701-a0606b243447'
).AndReturn({'port': {
"status": "ACTIVE",
"id": "2e00180a-ff9d-42c4-b701-a0606b243447"
}})
self.m.ReplayAll()
t = template_format.parse(neutron_port_with_address_pair_template)
t['resources']['port']['properties'][
'allowed_address_pairs'][0]['ip_address'] = '10.0.3.0/24'
stack = utils.parse_stack(t)
port = stack['port']
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:noako,项目名称:heat,代码行数:35,代码来源:test_neutron_port.py
示例2: validate
def validate(self):
"""Validate the provided params."""
super(RBACPolicy, self).validate()
action = self.properties[self.ACTION]
obj_type = self.properties[self.OBJECT_TYPE]
obj_id_or_name = self.properties[self.OBJECT_ID]
# Validate obj_type and action per SUPPORTED_TYPES_ACTIONS.
if obj_type not in self.SUPPORTED_TYPES_ACTIONS:
msg = (_("Invalid object_type: %(obj_type)s. "
"Valid object_type :%(value)s") %
{'obj_type': obj_type,
'value': self.SUPPORTED_TYPES_ACTIONS.keys()})
raise exception.StackValidationFailed(message=msg)
if action not in self.SUPPORTED_TYPES_ACTIONS[obj_type]:
msg = (_("Invalid action %(action)s for object type "
"%(obj_type)s. Valid actions :%(value)s") %
{'action': action, 'obj_type': obj_type,
'value': self.SUPPORTED_TYPES_ACTIONS[obj_type]})
raise exception.StackValidationFailed(message=msg)
# Make sure the value of object_id is correct.
neutronV20.find_resourceid_by_name_or_id(self.client(),
obj_type,
obj_id_or_name)
开发者ID:Hopebaytech,项目名称:heat,代码行数:26,代码来源:rbac_policy.py
示例3: test_port_security_enabled
def test_port_security_enabled(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client), "network", "abcd1234"
).MultipleTimes().AndReturn("abcd1234")
neutronclient.Client.create_port(
{
"port": {
"network_id": u"abcd1234",
"port_security_enabled": False,
"name": utils.PhysName("test_stack", "port"),
"admin_state_up": True,
}
}
).AndReturn({"port": {"status": "BUILD", "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}})
neutronclient.Client.show_port("fc68ea2c-b60b-4b4f-bd82-94ec81110766").AndReturn(
{"port": {"status": "ACTIVE", "id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}}
)
self.m.ReplayAll()
t = template_format.parse(neutron_port_security_template)
stack = utils.parse_stack(t)
port = stack["port"]
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:ricolin,项目名称:heat,代码行数:28,代码来源:test_neutron_port.py
示例4: create_vpnservice
def create_vpnservice(self, resolve_neutron=True, resolve_router=True):
self.stub_SubnetConstraint_validate()
self.stub_RouterConstraint_validate()
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'subnet',
'sub123',
cmd_resource=None,
).AndReturn('sub123')
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'router',
'rou123',
cmd_resource=None,
).AndReturn('rou123')
if resolve_neutron:
snippet = template_format.parse(vpnservice_template)
else:
snippet = template_format.parse(vpnservice_template_deprecated)
if resolve_router:
props = snippet['resources']['VPNService']['properties']
props['router'] = 'rou123'
del props['router_id']
neutronclient.Client.create_vpnservice(
self.VPN_SERVICE_CONF).AndReturn({'vpnservice': {'id': 'vpn123'}})
self.stack = utils.parse_stack(snippet)
resource_defns = self.stack.t.resource_definitions(self.stack)
return vpnservice.VPNService('vpnservice',
resource_defns['VPNService'],
self.stack)
开发者ID:MatMaul,项目名称:heat,代码行数:31,代码来源:test_neutron_vpnservice.py
示例5: test_ip_address_is_cidr
def test_ip_address_is_cidr(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client), "network", "abcd1234", cmd_resource=None
).MultipleTimes().AndReturn("abcd1234")
neutronclient.Client.create_port(
{
"port": {
"network_id": u"abcd1234",
"allowed_address_pairs": [{"ip_address": u"10.0.3.0/24", "mac_address": u"00-B0-D0-86-BB-F7"}],
"name": utils.PhysName("test_stack", "port"),
"admin_state_up": True,
}
}
).AndReturn({"port": {"status": "BUILD", "id": "2e00180a-ff9d-42c4-b701-a0606b243447"}})
neutronclient.Client.show_port("2e00180a-ff9d-42c4-b701-a0606b243447").AndReturn(
{"port": {"status": "ACTIVE", "id": "2e00180a-ff9d-42c4-b701-a0606b243447"}}
)
self.m.ReplayAll()
t = template_format.parse(neutron_port_with_address_pair_template)
t["resources"]["port"]["properties"]["allowed_address_pairs"][0]["ip_address"] = "10.0.3.0/24"
stack = utils.parse_stack(t)
port = stack["port"]
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:zhengxiaochuan-3,项目名称:heat,代码行数:27,代码来源:test_neutron_port.py
示例6: test_port_security_enabled
def test_port_security_enabled(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'abcd1234'
).MultipleTimes().AndReturn('abcd1234')
neutronclient.Client.create_port({'port': {
'network_id': u'abcd1234',
'port_security_enabled': False,
'name': utils.PhysName('test_stack', 'port'),
'admin_state_up': True}}
).AndReturn({'port': {
"status": "BUILD",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
neutronclient.Client.show_port(
'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
).AndReturn({'port': {
"status": "ACTIVE",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766",
}})
self.m.ReplayAll()
t = template_format.parse(neutron_port_security_template)
stack = utils.parse_stack(t)
port = stack['port']
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:32,代码来源:test_neutron_port.py
示例7: args2body
def args2body(self, parsed_args):
_security_group_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), "security_group", parsed_args.security_group_id
)
body = {
"security_group_rule": {
"security_group_id": _security_group_id,
"direction": parsed_args.direction,
"ethertype": parsed_args.ethertype,
}
}
if parsed_args.protocol:
body["security_group_rule"].update({"protocol": parsed_args.protocol})
if parsed_args.port_range_min:
body["security_group_rule"].update({"port_range_min": parsed_args.port_range_min})
if parsed_args.port_range_max:
body["security_group_rule"].update({"port_range_max": parsed_args.port_range_max})
if parsed_args.remote_ip_prefix:
body["security_group_rule"].update({"remote_ip_prefix": parsed_args.remote_ip_prefix})
if parsed_args.remote_group_id:
_remote_group_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), "security_group", parsed_args.remote_group_id
)
body["security_group_rule"].update({"remote_group_id": _remote_group_id})
if parsed_args.tenant_id:
body["security_group_rule"].update({"tenant_id": parsed_args.tenant_id})
return body
开发者ID:Cerberus98,项目名称:python-neutronclient,代码行数:27,代码来源:securitygroup.py
示例8: test_update_router_gateway_as_property
def test_update_router_gateway_as_property(self):
self._create_router_with_gateway()
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'other_public',
cmd_resource=None,
).AndReturn('91e47a57-7508-46fe-afc9-fc454e8580e1')
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'subnet',
'sub1234',
cmd_resource=None,
).AndReturn('sub1234')
neutronclient.Client.update_router(
'3e46229d-8fce-4733-819a-b5fe630550f8',
{'router': {
"external_gateway_info": {
'network_id': '91e47a57-7508-46fe-afc9-fc454e8580e1',
'enable_snat': False
},
}}
).AndReturn(None)
neutronclient.Client.show_router(
'3e46229d-8fce-4733-819a-b5fe630550f8').MultipleTimes().AndReturn({
"router": {
"status": "ACTIVE",
"external_gateway_info": {
"network_id": "91e47a57-7508-46fe-afc9-fc454e8580e1",
"enable_snat": False
},
"name": "Test Router",
"admin_state_up": True,
"tenant_id": "3e21026f2dc94372b105808c0e721661",
"routes": [],
"id": "3e46229d-8fce-4733-819a-b5fe630550f8"
}
})
self.m.ReplayAll()
t = template_format.parse(neutron_external_gateway_template)
stack = utils.parse_stack(t)
rsrc = self.create_router(t, stack, 'router')
props = t['resources']['router']['properties'].copy()
props['external_gateway_info'] = {
"network": "other_public",
"enable_snat": False
}
update_template = rsrc.t.freeze(properties=props)
scheduler.TaskRunner(rsrc.update, update_template)()
self.assertEqual((rsrc.UPDATE, rsrc.COMPLETE), rsrc.state)
gateway_info = rsrc.FnGetAtt('external_gateway_info')
self.assertEqual('91e47a57-7508-46fe-afc9-fc454e8580e1',
gateway_info.get('network_id'))
self.assertFalse(gateway_info.get('enable_snat'))
self.m.VerifyAll()
开发者ID:hongbin,项目名称:heat,代码行数:60,代码来源:test_neutron_router.py
示例9: run
def run(self, parsed_args):
self.log.debug('run(%s)' % parsed_args)
neutron_client = self.get_client()
neutron_client.format = parsed_args.request_format
_router_id = neutronV20.find_resourceid_by_name_or_id(
neutron_client, self.resource, parsed_args.router)
_ext_net_id = neutronV20.find_resourceid_by_name_or_id(
neutron_client, 'network', parsed_args.external_network)
router_dict = {'network_id': _ext_net_id}
if parsed_args.disable_snat:
router_dict['enable_snat'] = False
if parsed_args.fixed_ip:
ips = []
for ip_spec in parsed_args.fixed_ip:
ip_dict = utils.str2dict(ip_spec)
subnet_name_id = ip_dict.get('subnet_id')
if subnet_name_id:
subnet_id = neutronV20.find_resourceid_by_name_or_id(
neutron_client, 'subnet', subnet_name_id)
ip_dict['subnet_id'] = subnet_id
ips.append(ip_dict)
router_dict['external_fixed_ips'] = ips
neutron_client.add_gateway_router(_router_id, router_dict)
print(_('Set gateway for router %s') % parsed_args.router,
file=self.app.stdout)
开发者ID:Gvkrishna,项目名称:python-neutronclient,代码行数:25,代码来源:router.py
示例10: args2body
def args2body(self, parsed_args):
_network_id = neutronV20.find_resourceid_by_name_or_id(self.get_client(), "network", parsed_args.network_id)
body = {"port": {"admin_state_up": parsed_args.admin_state, "network_id": _network_id}}
if parsed_args.mac_address:
body["port"].update({"mac_address": parsed_args.mac_address})
if parsed_args.device_id:
body["port"].update({"device_id": parsed_args.device_id})
if parsed_args.tenant_id:
body["port"].update({"tenant_id": parsed_args.tenant_id})
if parsed_args.name:
body["port"].update({"name": parsed_args.name})
ips = []
if parsed_args.fixed_ip:
for ip_spec in parsed_args.fixed_ip:
ip_dict = utils.str2dict(ip_spec)
if "subnet_id" in ip_dict:
subnet_name_id = ip_dict["subnet_id"]
_subnet_id = neutronV20.find_resourceid_by_name_or_id(self.get_client(), "subnet", subnet_name_id)
ip_dict["subnet_id"] = _subnet_id
ips.append(ip_dict)
if ips:
body["port"].update({"fixed_ips": ips})
self.args2body_secgroup(parsed_args, body["port"])
self.args2body_extradhcpopt(parsed_args, body["port"])
return body
开发者ID:kbijon,项目名称:OpenStack-CVRM,代码行数:27,代码来源:port.py
示例11: _test_network_gateway_create
def _test_network_gateway_create(self, resolve_neutron=True):
rsrc = self.prepare_create_network_gateway(resolve_neutron)
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'6af055d3-26f6-48dd-a597-7611d7e58d35'
).MultipleTimes().AndReturn(
'6af055d3-26f6-48dd-a597-7611d7e58d35')
neutronclient.Client.disconnect_network_gateway(
'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', {
'network_id': u'6af055d3-26f6-48dd-a597-7611d7e58d35',
'segmentation_id': 10,
'segmentation_type': u'vlan'
}
).AndReturn(None)
neutronclient.Client.disconnect_network_gateway(
u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', {
'network_id': u'6af055d3-26f6-48dd-a597-7611d7e58d35',
'segmentation_id': 10,
'segmentation_type': u'vlan'
}
).AndReturn(qe.NeutronClientException(status_code=404))
neutronclient.Client.delete_network_gateway(
u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
).AndReturn(None)
neutronclient.Client.show_network_gateway(
u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
).AndReturn(sng)
neutronclient.Client.show_network_gateway(
u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
).AndRaise(qe.NeutronClientException(status_code=404))
neutronclient.Client.delete_network_gateway(
u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37'
).AndRaise(qe.NeutronClientException(status_code=404))
self.m.ReplayAll()
rsrc.validate()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
ref_id = rsrc.FnGetRefId()
self.assertEqual(u'ed4c03b9-8251-4c09-acc4-e59ee9e6aa37', ref_id)
self.assertRaises(
exception.InvalidTemplateAttribute, rsrc.FnGetAtt, 'Foo')
self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
scheduler.TaskRunner(rsrc.delete)()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
self.m.VerifyAll()
开发者ID:ricolin,项目名称:heat,代码行数:60,代码来源:test_neutron_network_gateway.py
示例12: args2body
def args2body(self, parsed_args):
_network_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'network', parsed_args.network_id)
body = {'subnet': {'network_id': _network_id,
'ip_version': parsed_args.ip_version, }, }
if parsed_args.cidr:
# With subnetpool, cidr is now optional for creating subnet.
cidr = parsed_args.cidr
body['subnet'].update({'cidr': cidr})
unusable_cidr = '/32' if parsed_args.ip_version == 4 else '/128'
if cidr.endswith(unusable_cidr):
self.log.warning(_("An IPv%(ip)d subnet with a %(cidr)s CIDR "
"will have only one usable IP address so "
"the device attached to it will not have "
"any IP connectivity.")
% {"ip": parsed_args.ip_version,
"cidr": unusable_cidr})
if parsed_args.prefixlen:
body['subnet'].update({'prefixlen': parsed_args.prefixlen})
if parsed_args.subnetpool:
if parsed_args.subnetpool == 'None':
_subnetpool_id = None
else:
_subnetpool_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'subnetpool', parsed_args.subnetpool)
body['subnet'].update({'subnetpool_id': _subnetpool_id})
updatable_args2body(parsed_args, body)
if parsed_args.tenant_id:
body['subnet'].update({'tenant_id': parsed_args.tenant_id})
return body
开发者ID:annp,项目名称:python-neutronclient,代码行数:34,代码来源:subnet.py
示例13: find
def find(self, id_or_name, ref_only=False):
service = None
ref = None
if isinstance(self, Key):
service = self._agent.client.compute.keypairs
if isinstance(self, Image):
service = self._agent.client.compute.images
elif isinstance(self, Server):
service = self._agent.client.compute.servers
elif isinstance(self, Flavor):
service = self._agent.client.compute.flavors
elif isinstance(self, Volume):
service = self._agent.client.volume.volumes
elif isinstance(self, SecurityGroup):
service = self._agent.client.compute.security_groups
elif isinstance(self, Network):
service = self._agent.client.compute.networks
elif isinstance(self, SubNet):
sid = neutronV20.find_resourceid_by_name_or_id(
self._agent.clientneutron, "subnet", id_or_name)
ref = self._agent.clientneutron.show_subnet(sid)
elif isinstance(self, Router):
rid = neutronV20.find_resourceid_by_name_or_id(
self._agent.clientneutron, "router", id_or_name)
ref = self._agent.clientneutron.show_router(rid)
if service:
ref = utils.find_resource(service, id_or_name)
if ref_only:
return ref
self._ref = ref
return self
开发者ID:bharat109puri,项目名称:warm,代码行数:32,代码来源:__init__.py
示例14: _mock_create_with_security_groups
def _mock_create_with_security_groups(self, port_prop):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'net1234',
cmd_resource=None,
).MultipleTimes().AndReturn('net1234')
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'subnet',
'sub1234',
cmd_resource=None,
).MultipleTimes().AndReturn('sub1234')
neutronclient.Client.create_port({'port': port_prop}).AndReturn(
{'port': {
"status": "BUILD",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"}})
neutronclient.Client.show_port(
'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
).AndReturn({'port': {
"status": "ACTIVE",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
self.m.ReplayAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:25,代码来源:test_neutron_port.py
示例15: args2body
def args2body(self, parsed_args):
_security_group_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'security_group', parsed_args.security_group_id)
body = {'security_group_rule': {
'security_group_id': _security_group_id,
'direction': parsed_args.direction,
'ethertype': parsed_args.ethertype}}
if parsed_args.protocol:
body['security_group_rule'].update(
{'protocol': parsed_args.protocol})
if parsed_args.port_range_min:
body['security_group_rule'].update(
{'port_range_min': parsed_args.port_range_min})
if parsed_args.port_range_max:
body['security_group_rule'].update(
{'port_range_max': parsed_args.port_range_max})
if parsed_args.remote_ip_prefix:
body['security_group_rule'].update(
{'remote_ip_prefix': parsed_args.remote_ip_prefix})
if parsed_args.remote_group_id:
_remote_group_id = neutronV20.find_resourceid_by_name_or_id(
self.get_client(), 'security_group',
parsed_args.remote_group_id)
body['security_group_rule'].update(
{'remote_group_id': _remote_group_id})
if parsed_args.tenant_id:
body['security_group_rule'].update(
{'tenant_id': parsed_args.tenant_id})
return body
开发者ID:yuvarajan07,项目名称:python-neutronclient,代码行数:29,代码来源:securitygroup.py
示例16: test_subnet
def test_subnet(self):
update_props = {'subnet': {
'dns_nameservers': ['8.8.8.8', '192.168.1.254'],
'name': 'mysubnet',
'enable_dhcp': True,
'host_routes': [{'destination': '192.168.1.0/24',
'nexthop': '194.168.1.2'}],
'gateway_ip': '10.0.3.105',
'allocation_pools': [
{'start': '10.0.3.20', 'end': '10.0.3.100'},
{'start': '10.0.3.110', 'end': '10.0.3.200'}]}}
t = self._test_subnet(u_props=update_props)
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'None',
cmd_resource=None,
).AndReturn('None')
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'router',
'None',
cmd_resource=None,
).AndReturn('None')
neutronclient.Client.update_subnet(
'91e47a57-7508-46fe-afc9-fc454e8580e1', update_props)
stack = utils.parse_stack(t)
rsrc = self.create_subnet(t, stack, 'sub_net')
self.m.ReplayAll()
scheduler.TaskRunner(rsrc.create)()
self.assertEqual((rsrc.CREATE, rsrc.COMPLETE), rsrc.state)
rsrc.validate()
ref_id = rsrc.FnGetRefId()
self.assertEqual('91e47a57-7508-46fe-afc9-fc454e8580e1', ref_id)
self.assertIsNone(rsrc.FnGetAtt('network_id'))
self.assertEqual('fc68ea2c-b60b-4b4f-bd82-94ec81110766',
rsrc.FnGetAtt('network_id'))
self.assertEqual('8.8.8.8', rsrc.FnGetAtt('dns_nameservers')[0])
# assert the dependency (implicit or explicit) between the ports
# and the subnet
self.assertIn(stack['port'], stack.dependencies[stack['sub_net']])
self.assertIn(stack['port2'], stack.dependencies[stack['sub_net']])
update_snippet = rsrc_defn.ResourceDefinition(rsrc.name, rsrc.type(),
update_props['subnet'])
rsrc.handle_update(update_snippet, {}, update_props['subnet'])
# with name None
del update_props['subnet']['name']
rsrc.handle_update(update_snippet, {}, update_props['subnet'])
# with no prop_diff
rsrc.handle_update(update_snippet, {}, {})
self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
self.assertIsNone(scheduler.TaskRunner(rsrc.delete)())
self.m.VerifyAll()
开发者ID:TonyChengTW,项目名称:OpenStack_Liberty_Control,代码行数:60,代码来源:test_neutron_subnet.py
示例17: test_allowed_address_pair
def test_allowed_address_pair(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'abcd1234'
).MultipleTimes().AndReturn('abcd1234')
neutronclient.Client.create_port({'port': {
'network_id': u'abcd1234',
'allowed_address_pairs': [{
'ip_address': u'10.0.3.21',
'mac_address': u'00-B0-D0-86-BB-F7'
}],
'name': utils.PhysName('test_stack', 'port'),
'admin_state_up': True}}
).AndReturn({'port': {
"status": "BUILD",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
neutronclient.Client.show_port(
'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
).AndReturn({'port': {
"status": "ACTIVE",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
self.m.ReplayAll()
t = template_format.parse(neutron_port_with_address_pair_template)
stack = utils.parse_stack(t)
port = stack['port']
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:33,代码来源:test_neutron_port.py
示例18: test_gateway_router
def test_gateway_router(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'fc68ea2c-b60b-4b4f-bd82-94ec81110766',
cmd_resource=None,
).MultipleTimes().AndReturn('fc68ea2c-b60b-4b4f-bd82-94ec81110766')
neutronclient.Client.add_gateway_router(
'3e46229d-8fce-4733-819a-b5fe630550f8',
{'network_id': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766'}
).AndReturn(None)
neutronclient.Client.remove_gateway_router(
'3e46229d-8fce-4733-819a-b5fe630550f8'
).AndReturn(None)
neutronclient.Client.remove_gateway_router(
'3e46229d-8fce-4733-819a-b5fe630550f8'
).AndRaise(qe.NeutronClientException(status_code=404))
self.stub_RouterConstraint_validate()
self.m.ReplayAll()
t = template_format.parse(neutron_template)
stack = utils.parse_stack(t)
rsrc = self.create_gateway_router(
t, stack, 'gateway', properties={
'router_id': '3e46229d-8fce-4733-819a-b5fe630550f8',
'network': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
})
scheduler.TaskRunner(rsrc.delete)()
rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE, 'to delete again')
scheduler.TaskRunner(rsrc.delete)()
self.m.VerifyAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:33,代码来源:test_neutron_router.py
示例19: test_missing_subnet_id
def test_missing_subnet_id(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'net1234'
).MultipleTimes().AndReturn('net1234')
neutronclient.Client.create_port({'port': {
'network_id': u'net1234',
'fixed_ips': [
{'ip_address': u'10.0.3.21'}
],
'name': utils.PhysName('test_stack', 'port'),
'admin_state_up': True,
'device_owner': u'network:dhcp'}}
).AndReturn({'port': {
"status": "BUILD",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
neutronclient.Client.show_port(
'fc68ea2c-b60b-4b4f-bd82-94ec81110766'
).AndReturn({'port': {
"status": "ACTIVE",
"id": "fc68ea2c-b60b-4b4f-bd82-94ec81110766"
}})
self.m.ReplayAll()
t = template_format.parse(neutron_port_template)
t['resources']['port']['properties']['fixed_ips'][0].pop('subnet')
stack = utils.parse_stack(t)
port = stack['port']
scheduler.TaskRunner(port.create)()
self.m.VerifyAll()
开发者ID:sizowie,项目名称:heat,代码行数:35,代码来源:test_neutron_port.py
示例20: test_create_router_gateway_enable_snat
def test_create_router_gateway_enable_snat(self):
neutronV20.find_resourceid_by_name_or_id(
mox.IsA(neutronclient.Client),
'network',
'public',
cmd_resource=None,
).AndReturn('fc68ea2c-b60b-4b4f-bd82-94ec81110766')
neutronclient.Client.create_router({
"router": {
"name": "Test Router",
"external_gateway_info": {
'network_id': 'fc68ea2c-b60b-4b4f-bd82-94ec81110766',
},
"admin_state_up": True,
}
}).AndReturn({
"router": {
"status": "BUILD",
"external_gateway_info": None,
"name": "Test Router",
"admin_state_up": True,
"tenant_id": "3e21026f2dc94372b105808c0e721661",
"id": "3e46229d-8fce-4733-819a-b5fe630550f8",
}
})
neutronclient.Client.show_router(
'3e46229d-8fce-4733-819a-b5fe630550f8').MultipleTimes().AndReturn({
"router": {
"status": "ACTIVE",
"external_gateway_info": {
"network_id":
"fc68ea2c-b60b-4b4f-bd82-94ec81110766",
"enable_snat": True
},
"name": "Test Router",
"admin_state_up": True,
"tenant_id": "3e21026f2dc94372b105808c0e721661",
"routes": [],
"id": "3e46229d-8fce-4733-819a-b5fe630550f8"
}
})
self.m.ReplayAll()
t = template_format.parse(neutron_external_gateway_template)
t["resources"]["router"]["properties"]["external_gateway_info"].pop(
"enable_snat")
t["resources"]["router"]["properties"]["external_gateway_info"].pop(
"external_fixed_ips")
stack = utils.parse_stack(t)
rsrc = self.create_router(t, stack, 'router')
rsrc.validate()
ref_id = rsrc.FnGetRefId()
self.assertEqual('3e46229d-8fce-4733-819a-b5fe630550f8', ref_id)
gateway_info = rsrc.FnGetAtt('external_gateway_info')
self.assertEqual('fc68ea2c-b60b-4b4f-bd82-94ec81110766',
gateway_info.get('network_id'))
self.m.VerifyAll()
开发者ID:MarcDufresne,项目名称:heat,代码行数:60,代码来源:test_neutron_router.py
注:本文中的neutronclient.neutron.v2_0.find_resourceid_by_name_or_id函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论