本文整理汇总了Python中neutron.ipam.utils.check_subnet_ip函数的典型用法代码示例。如果您正苦于以下问题:Python check_subnet_ip函数的具体用法?Python check_subnet_ip怎么用?Python check_subnet_ip使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了check_subnet_ip函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _get_subnet_for_fixed_ip
def _get_subnet_for_fixed_ip(self, context, fixed, network_id):
if 'subnet_id' in fixed:
subnet = self._get_subnet(context, fixed['subnet_id'])
if subnet['network_id'] != network_id:
msg = (_("Failed to create port on network %(network_id)s"
", because fixed_ips included invalid subnet "
"%(subnet_id)s") %
{'network_id': network_id,
'subnet_id': fixed['subnet_id']})
raise n_exc.InvalidInput(error_message=msg)
# Ensure that the IP is valid on the subnet
if ('ip_address' in fixed and
not ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address'])):
raise n_exc.InvalidIpForSubnet(ip_address=fixed['ip_address'])
return subnet
if 'ip_address' not in fixed:
msg = _('IP allocation requires subnet_id or ip_address')
raise n_exc.InvalidInput(error_message=msg)
filter = {'network_id': [network_id]}
subnets = self._get_subnets(context, filters=filter)
for subnet in subnets:
if ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address']):
return subnet
raise n_exc.InvalidIpForNetwork(ip_address=fixed['ip_address'])
开发者ID:asgard-lab,项目名称:neutron,代码行数:29,代码来源:ipam_backend_mixin.py
示例2: _set_default_route
def _set_default_route(self, network, device_name):
"""Sets the default gateway for this dhcp namespace.
This method is idempotent and will only adjust the route if adjusting
it would change it from what it already is. This makes it safe to call
and avoids unnecessary perturbation of the system.
"""
device = ip_lib.IPDevice(device_name, namespace=network.namespace)
gateway = device.route.get_gateway()
if gateway:
gateway = gateway.get("gateway")
for subnet in network.subnets:
skip_subnet = subnet.ip_version != 4 or not subnet.enable_dhcp or subnet.gateway_ip is None
if skip_subnet:
continue
if gateway != subnet.gateway_ip:
LOG.debug(
"Setting gateway for dhcp netns on net %(n)s to " "%(ip)s",
{"n": network.id, "ip": subnet.gateway_ip},
)
# Check for and remove the on-link route for the old
# gateway being replaced, if it is outside the subnet
is_old_gateway_not_in_subnet = gateway and not ipam_utils.check_subnet_ip(subnet.cidr, gateway)
if is_old_gateway_not_in_subnet:
v4_onlink = device.route.list_onlink_routes(constants.IP_VERSION_4)
v6_onlink = device.route.list_onlink_routes(constants.IP_VERSION_6)
existing_onlink_routes = set(r["cidr"] for r in v4_onlink + v6_onlink)
if gateway in existing_onlink_routes:
device.route.delete_route(gateway, scope="link")
is_new_gateway_not_in_subnet = subnet.gateway_ip and not ipam_utils.check_subnet_ip(
subnet.cidr, subnet.gateway_ip
)
if is_new_gateway_not_in_subnet:
device.route.add_route(subnet.gateway_ip, scope="link")
device.route.add_gateway(subnet.gateway_ip)
return
# No subnets on the network have a valid gateway. Clean it up to avoid
# confusion from seeing an invalid gateway here.
if gateway is not None:
LOG.debug("Removing gateway for dhcp netns on net %s", network.id)
device.route.delete_gateway(gateway)
开发者ID:leftyLin,项目名称:neutron,代码行数:49,代码来源:dhcp.py
示例3: _update_gateway_route
def _update_gateway_route(self, agent_gateway_port,
interface_name, tbl_index):
ns_name = self.get_name()
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
# If the 'fg-' device doesn't exist in the namespace then trying
# to send advertisements or configure the default route will just
# throw exceptions. Unsubscribe this external network so that
# the next call will trigger the interface to be plugged.
if not ipd.exists():
LOG.warning('DVR: FIP gateway port with interface '
'name: %(device)s does not exist in the given '
'namespace: %(ns)s', {'device': interface_name,
'ns': ns_name})
msg = _('DVR: Gateway update route in FIP namespace failed, retry '
'should be attempted on next call')
raise l3_exc.FloatingIpSetupException(msg)
for fixed_ip in agent_gateway_port['fixed_ips']:
ip_lib.send_ip_addr_adv_notif(ns_name,
interface_name,
fixed_ip['ip_address'])
for subnet in agent_gateway_port['subnets']:
gw_ip = subnet.get('gateway_ip')
if gw_ip:
is_gateway_not_in_subnet = not ipam_utils.check_subnet_ip(
subnet.get('cidr'), gw_ip)
if is_gateway_not_in_subnet:
ipd.route.add_route(gw_ip, scope='link')
self._add_default_gateway_for_fip(gw_ip, ipd, tbl_index)
else:
current_gateway = ipd.route.get_gateway()
if current_gateway and current_gateway.get('gateway'):
ipd.route.delete_gateway(current_gateway.get('gateway'))
开发者ID:igordcard,项目名称:neutron,代码行数:34,代码来源:dvr_fip_ns.py
示例4: update_gateway_port
def update_gateway_port(self, agent_gateway_port):
gateway_ip_not_changed = self.agent_gateway_port and (
not self._check_for_gateway_ip_change(agent_gateway_port))
self.agent_gateway_port = agent_gateway_port
if gateway_ip_not_changed:
return
ns_name = self.get_name()
interface_name = self.get_ext_device_name(agent_gateway_port['id'])
for fixed_ip in agent_gateway_port['fixed_ips']:
ip_lib.send_ip_addr_adv_notif(ns_name,
interface_name,
fixed_ip['ip_address'],
self.agent_conf)
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
for subnet in agent_gateway_port['subnets']:
gw_ip = subnet.get('gateway_ip')
if gw_ip:
is_gateway_not_in_subnet = not ipam_utils.check_subnet_ip(
subnet.get('cidr'), gw_ip)
if is_gateway_not_in_subnet:
ipd.route.add_route(gw_ip, scope='link')
ipd.route.add_gateway(gw_ip)
else:
current_gateway = ipd.route.get_gateway()
if current_gateway and current_gateway.get('gateway'):
ipd.route.delete_gateway(current_gateway.get('gateway'))
开发者ID:annp,项目名称:neutron,代码行数:28,代码来源:dvr_fip_ns.py
示例5: _gateway_added
def _gateway_added(self, ex_gw_port, interface_name):
"""Add Floating IP gateway port."""
LOG.debug("add gateway interface(%s)", interface_name)
ns_name = self.get_name()
self.driver.plug(
ex_gw_port["network_id"],
ex_gw_port["id"],
interface_name,
ex_gw_port["mac_address"],
bridge=self.agent_conf.external_network_bridge,
namespace=ns_name,
prefix=FIP_EXT_DEV_PREFIX,
)
ip_cidrs = common_utils.fixed_ip_cidrs(ex_gw_port["fixed_ips"])
self.driver.init_l3(interface_name, ip_cidrs, namespace=ns_name, clean_connections=True)
for fixed_ip in ex_gw_port["fixed_ips"]:
ip_lib.send_ip_addr_adv_notif(ns_name, interface_name, fixed_ip["ip_address"], self.agent_conf)
for subnet in ex_gw_port["subnets"]:
gw_ip = subnet.get("gateway_ip")
if gw_ip:
is_gateway_not_in_subnet = not ipam_utils.check_subnet_ip(subnet.get("cidr"), gw_ip)
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
if is_gateway_not_in_subnet:
ipd.route.add_route(gw_ip, scope="link")
ipd.route.add_gateway(gw_ip)
cmd = ["sysctl", "-w", "net.ipv4.conf.%s.proxy_arp=1" % interface_name]
# TODO(Carl) mlavelle's work has self.ip_wrapper
ip_wrapper = ip_lib.IPWrapper(namespace=ns_name)
ip_wrapper.netns.execute(cmd, check_exit_code=False)
开发者ID:dims,项目名称:neutron,代码行数:33,代码来源:dvr_fip_ns.py
示例6: _get_subnet_id_for_given_fixed_ip
def _get_subnet_id_for_given_fixed_ip(
self, context, fixed_ip, port_dict):
"""Returns the subnet_id that matches the fixedip on a network."""
filters = {'network_id': [port_dict['network_id']]}
subnets = self._core_plugin.get_subnets(context, filters)
for subnet in subnets:
if ipam_utils.check_subnet_ip(subnet['cidr'], fixed_ip):
return subnet['id']
开发者ID:andreitira,项目名称:neutron,代码行数:8,代码来源:l3_dvr_db.py
示例7: _add_route_to_gw
def _add_route_to_gw(self, ex_gw_port, device_name,
namespace, preserve_ips):
# Note: ipv6_gateway is an ipv6 LLA
# and so doesn't need a special route
for subnet in ex_gw_port.get('subnets', []):
is_gateway_not_in_subnet = (subnet['gateway_ip'] and
not ipam_utils.check_subnet_ip(
subnet['cidr'],
subnet['gateway_ip']))
if is_gateway_not_in_subnet:
preserve_ips.append(subnet['gateway_ip'])
device = ip_lib.IPDevice(device_name, namespace=namespace)
device.route.add_route(subnet['gateway_ip'], scope='link')
开发者ID:MODITDC,项目名称:neutron,代码行数:13,代码来源:router_info.py
示例8: _get_subnet_for_fixed_ip
def _get_subnet_for_fixed_ip(self, context, fixed, subnets):
# Subnets are all the subnets belonging to the same network.
if not subnets:
msg = _('IP allocation requires subnets for network')
raise exc.InvalidInput(error_message=msg)
if 'subnet_id' in fixed:
def get_matching_subnet():
for subnet in subnets:
if subnet['id'] == fixed['subnet_id']:
return subnet
subnet = get_matching_subnet()
if not subnet:
subnet_obj = self._get_subnet_object(context,
fixed['subnet_id'])
msg = (_("Failed to create port on network %(network_id)s"
", because fixed_ips included invalid subnet "
"%(subnet_id)s") %
{'network_id': subnet_obj.network_id,
'subnet_id': fixed['subnet_id']})
raise exc.InvalidInput(error_message=msg)
# Ensure that the IP is valid on the subnet
if ('ip_address' in fixed and
not ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address'])):
raise exc.InvalidIpForSubnet(ip_address=fixed['ip_address'])
return subnet
if 'ip_address' not in fixed:
msg = _('IP allocation requires subnet_id or ip_address')
raise exc.InvalidInput(error_message=msg)
for subnet in subnets:
if ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address']):
return subnet
raise exc.InvalidIpForNetwork(ip_address=fixed['ip_address'])
开发者ID:noironetworks,项目名称:neutron,代码行数:37,代码来源:ipam_backend_mixin.py
示例9: _validate_create_redirect_target_vip
def _validate_create_redirect_target_vip(self, context, redirect_target,
subnet_mapping, vip):
# VIP not allowed if redudancyEnabled is False
if redirect_target.get('redundancy_enabled') == "False":
if redirect_target.get('virtual_ip_address'):
msg = (_("VIP can be addded to a redirect target only "
"when redundancyEnabled is True"))
raise nuage_exc.NuageBadRequest(msg=msg)
# VIP should be in the same subnet as redirect_target['subnet_id']
if vip:
subnet = self.core_plugin.get_subnet(context,
subnet_mapping['subnet_id'])
if not ipam_utils.check_subnet_ip(subnet['cidr'], vip):
msg = ("VIP should be in the same subnet as subnet %s " %
subnet_mapping['subnet_id'])
raise nuage_exc.NuageBadRequest(msg=msg)
开发者ID:nuagenetworks,项目名称:nuage-openstack-neutron,代码行数:17,代码来源:nuage_redirect_target.py
示例10: _update_gateway_port
def _update_gateway_port(self, agent_gateway_port, interface_name):
if (self.agent_gateway_port and
not self._check_for_gateway_ip_change(agent_gateway_port)):
return
ns_name = self.get_name()
ipd = ip_lib.IPDevice(interface_name, namespace=ns_name)
# If the 'fg-' device doesn't exist in the namespace then trying
# to send advertisements or configure the default route will just
# throw exceptions. Unsubscribe this external network so that
# the next call will trigger the interface to be plugged.
if not ipd.exists():
self.unsubscribe(agent_gateway_port['network_id'])
LOG.warning(_LW('DVR: FIP gateway port with interface '
'name: %(device)s does not exist in the given '
'namespace: %(ns)s'), {'device': interface_name,
'ns': ns_name})
msg = _('DVR: Gateway setup in FIP namespace failed, retry '
'should be attempted on next call')
raise n_exc.FloatingIpSetupException(msg)
for fixed_ip in agent_gateway_port['fixed_ips']:
ip_lib.send_ip_addr_adv_notif(ns_name,
interface_name,
fixed_ip['ip_address'],
self.agent_conf.send_arp_for_ha)
for subnet in agent_gateway_port['subnets']:
gw_ip = subnet.get('gateway_ip')
if gw_ip:
is_gateway_not_in_subnet = not ipam_utils.check_subnet_ip(
subnet.get('cidr'), gw_ip)
if is_gateway_not_in_subnet:
ipd.route.add_route(gw_ip, scope='link')
ipd.route.add_gateway(gw_ip)
else:
current_gateway = ipd.route.get_gateway()
if current_gateway and current_gateway.get('gateway'):
ipd.route.delete_gateway(current_gateway.get('gateway'))
# Cache the agent gateway port after successfully configuring
# the gateway, so that checking on self.agent_gateway_port
# will be a valid check
self.agent_gateway_port = agent_gateway_port
开发者ID:2020human,项目名称:neutron,代码行数:43,代码来源:dvr_fip_ns.py
示例11: _allocate_specific_subnet
def _allocate_specific_subnet(self, request):
with self._context.session.begin(subtransactions=True):
self._lock_subnetpool()
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
cidr = request.subnet_cidr
gateway = request.gateway_ip
if gateway and not ipam_utils.check_subnet_ip(cidr, gateway):
msg = _("Cannot allocate requested subnet due to bad gateway "
"address")
raise n_exc.SubnetAllocationError(reason=msg)
available = self._get_available_prefix_list()
matched = netaddr.all_matching_cidrs(cidr, available)
if len(matched) is 1 and matched[0].prefixlen <= cidr.prefixlen:
return IpamSubnet(request.tenant_id,
request.subnet_id,
cidr,
gateway_ip=gateway,
allocation_pools=request.allocation_pools)
msg = _("Cannot allocate requested subnet from the available "
"set of prefixes")
raise n_exc.SubnetAllocationError(reason=msg)
开发者ID:jocado,项目名称:neutron,代码行数:23,代码来源:subnet_alloc.py
示例12: test_check_subnet_ip_v6_network
def test_check_subnet_ip_v6_network(self):
self.assertFalse(utils.check_subnet_ip('F111::0/64', 'F111::0'))
开发者ID:21atlas,项目名称:neutron,代码行数:2,代码来源:test_utils.py
示例13: _extract_valid_peer_ips
def _extract_valid_peer_ips(self, cidr, peer_ips):
return [(peer_ip) for peer_ip in peer_ips
if ipam_utils.check_subnet_ip(cidr, peer_ip)]
开发者ID:Prabhat2015,项目名称:networking-midonet,代码行数:3,代码来源:bgp_db_midonet.py
示例14: _check_gateway_in_subnet
def _check_gateway_in_subnet(cls, cidr, gateway):
"""Validate that the gateway is on the subnet."""
ip = netaddr.IPAddress(gateway)
if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
return ipam_utils.check_subnet_ip(cidr, gateway)
return True
开发者ID:JioCloud,项目名称:neutron,代码行数:6,代码来源:db_base_plugin_common.py
示例15: _test_fixed_ips_for_port
def _test_fixed_ips_for_port(self, context, network_id, fixed_ips,
device_owner):
"""Test fixed IPs for port.
Check that configured subnets are valid prior to allocating any
IPs. Include the subnet_id in the result if only an IP address is
configured.
:raises: InvalidInput, IpAddressInUse, InvalidIpForNetwork,
InvalidIpForSubnet
"""
fixed_ip_set = []
for fixed in fixed_ips:
found = False
if 'subnet_id' not in fixed:
if 'ip_address' not in fixed:
msg = _('IP allocation requires subnet_id or ip_address')
raise n_exc.InvalidInput(error_message=msg)
filter = {'network_id': [network_id]}
subnets = self._get_subnets(context, filters=filter)
for subnet in subnets:
if ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address']):
found = True
subnet_id = subnet['id']
break
if not found:
raise n_exc.InvalidIpForNetwork(
ip_address=fixed['ip_address'])
else:
subnet = self._get_subnet(context, fixed['subnet_id'])
if subnet['network_id'] != network_id:
msg = (_("Failed to create port on network %(network_id)s"
", because fixed_ips included invalid subnet "
"%(subnet_id)s") %
{'network_id': network_id,
'subnet_id': fixed['subnet_id']})
raise n_exc.InvalidInput(error_message=msg)
subnet_id = subnet['id']
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
if 'ip_address' in fixed:
# Ensure that the IP's are unique
if not IpamNonPluggableBackend._check_unique_ip(
context, network_id,
subnet_id, fixed['ip_address']):
raise n_exc.IpAddressInUse(net_id=network_id,
ip_address=fixed['ip_address'])
# Ensure that the IP is valid on the subnet
if (not found and
not ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address'])):
raise n_exc.InvalidIpForSubnet(
ip_address=fixed['ip_address'])
if (is_auto_addr_subnet and
device_owner not in
constants.ROUTER_INTERFACE_OWNERS):
msg = (_("IPv6 address %(address)s can not be directly "
"assigned to a port on subnet %(id)s since the "
"subnet is configured for automatic addresses") %
{'address': fixed['ip_address'],
'id': subnet_id})
raise n_exc.InvalidInput(error_message=msg)
fixed_ip_set.append({'subnet_id': subnet_id,
'ip_address': fixed['ip_address']})
else:
# A scan for auto-address subnets on the network is done
# separately so that all such subnets (not just those
# listed explicitly here by subnet ID) are associated
# with the port.
if (device_owner in constants.ROUTER_INTERFACE_OWNERS or
device_owner == constants.DEVICE_OWNER_ROUTER_SNAT or
not is_auto_addr_subnet):
fixed_ip_set.append({'subnet_id': subnet_id})
if len(fixed_ip_set) > cfg.CONF.max_fixed_ips_per_port:
msg = _('Exceeded maximim amount of fixed ips per port')
raise n_exc.InvalidInput(error_message=msg)
return fixed_ip_set
开发者ID:tealover,项目名称:neutron,代码行数:81,代码来源:ipam_non_pluggable_backend.py
示例16: test_check_subnet_ip_v6_valid
def test_check_subnet_ip_v6_valid(self):
self.assertTrue(utils.check_subnet_ip('F111::0/64', 'F111::1'))
self.assertTrue(utils.check_subnet_ip('F111::0/64',
'F111::FFFF:FFFF:FFFF:FFFF'))
开发者ID:21atlas,项目名称:neutron,代码行数:4,代码来源:test_utils.py
示例17: _make_route_info
def _make_route_info(self, nexthop, dest_networks):
return [(network, nexthop) for network in dest_networks
if not ipam_utils.check_subnet_ip(network, nexthop)]
开发者ID:Prabhat2015,项目名称:networking-midonet,代码行数:3,代码来源:bgp_db_midonet.py
示例18: test_check_subnet_ip_v4_valid
def test_check_subnet_ip_v4_valid(self):
self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.1'))
self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.254'))
开发者ID:21atlas,项目名称:neutron,代码行数:3,代码来源:test_utils.py
示例19: test_check_subnet_ip_v4_broadcast
def test_check_subnet_ip_v4_broadcast(self):
self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.255'))
开发者ID:21atlas,项目名称:neutron,代码行数:2,代码来源:test_utils.py
示例20: test_check_subnet_ip_v4_network
def test_check_subnet_ip_v4_network(self):
self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.0'))
开发者ID:21atlas,项目名称:neutron,代码行数:2,代码来源:test_utils.py
注:本文中的neutron.ipam.utils.check_subnet_ip函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论