本文整理汇总了Python中neutron_lib.api.validators.is_attr_set函数的典型用法代码示例。如果您正苦于以下问题:Python is_attr_set函数的具体用法?Python is_attr_set怎么用?Python is_attr_set使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了is_attr_set函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _validate_flow_classifier
def _validate_flow_classifier(self, context):
fc = context.current
# Verify L7 params are set
l7_p = fc['l7_parameters']
if any(x for x in sfc_cts.AIM_FLC_L7_PARAMS.keys()
if not validators.is_attr_set(l7_p.get(x))):
raise sfc_exc.BadFlowClassifier(
params=sfc_cts.AIM_FLC_L7_PARAMS.keys())
# Verify standard params are set
# TODO(ivar): src and dst prefix are needed only for SVI networks
if any(x for x in sfc_cts.AIM_FLC_PARAMS
if not validators.is_attr_set(fc.get(x))):
raise sfc_exc.BadFlowClassifier(params=sfc_cts.AIM_FLC_PARAMS)
# Verify networks exist
src_net = self.plugin.get_network(
context._plugin_context, l7_p[sfc_cts.LOGICAL_SRC_NET])
if l7_p[sfc_cts.LOGICAL_SRC_NET] != l7_p[sfc_cts.LOGICAL_DST_NET]:
# Verify dst existence
self.plugin.get_network(context._plugin_context,
l7_p[sfc_cts.LOGICAL_DST_NET])
elif src_net.get('apic:svi') is False:
# Same network, not SVI
raise sfc_exc.FlowClassifierSameSrcDstNetworks()
if validators.is_attr_set(fc.get('source_ip_prefix')) and (
fc.get('source_ip_prefix') == fc.get('destination_ip_prefix')):
# Same subnet for source and dst is not allowed. For overlapping
# (but not same) subnets LPM will be applied.
raise sfc_exc.FlowClassifierSameSubnet()
开发者ID:openstack,项目名称:group-based-policy,代码行数:29,代码来源:flowc_driver.py
示例2: _check_invalid_security_groups_specified
def _check_invalid_security_groups_specified(self, context, port,
only_warn=False):
"""Check if the lists of security groups are valid
When only_warn is True we do not raise an exception here, because this
may fail nova boot.
Instead we will later remove provider security groups from the regular
security groups list of the port.
Since all the provider security groups of the tenant will be on this
list anyway, the result will be the same.
"""
if validators.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
for sg in port.get(ext_sg.SECURITYGROUPS, []):
# makes sure user doesn't add non-provider secgrp as secgrp
if self._is_provider_security_group(context, sg):
if only_warn:
LOG.warning(
"Ignored provider security group %(sg)s in "
"security groups list for port %(id)s",
{'sg': sg, 'id': port['id']})
else:
raise provider_sg.SecurityGroupIsProvider(id=sg)
if validators.is_attr_set(
port.get(provider_sg.PROVIDER_SECURITYGROUPS)):
# also check all provider groups are provider.
for sg in port.get(provider_sg.PROVIDER_SECURITYGROUPS, []):
self._check_provider_security_group_exists(context, sg)
开发者ID:openstack,项目名称:vmware-nsx,代码行数:29,代码来源:extended_security_group.py
示例3: _fill_provider_info
def _fill_provider_info(self, from_net, to_net):
provider_attrs = provider_net.ATTRIBUTES
for provider_attr in provider_attrs:
if validators.is_attr_set(from_net.get(provider_attr)):
to_net[provider_attr] = from_net[provider_attr]
if validators.is_attr_set(from_net.get(az_def.AZ_HINTS)):
to_net[az_def.AZ_HINTS] = from_net[az_def.AZ_HINTS]
开发者ID:openstack,项目名称:tricircle,代码行数:7,代码来源:helper.py
示例4: _l7_params_conflict
def _l7_params_conflict(fc1, fc2):
if (validators.is_attr_set(fc1['l7_parameters']) and
validators.is_attr_set(fc2['l7_parameters'])):
if fc1['l7_parameters'] == fc2['l7_parameters']:
return True
return all(not validators.is_attr_set(fc['l7_parameters'])
for fc in [fc1, fc2])
开发者ID:openstack,项目名称:group-based-policy,代码行数:7,代码来源:patch.py
示例5: process_create_policy_target_group
def process_create_policy_target_group(self, session, data, result):
data = data['policy_target_group']
proxied = data.get('proxied_group_id')
if validators.is_attr_set(proxied):
# Set value for proxied group
record = (session.query(db.GroupProxyMapping).filter_by(
policy_target_group_id=proxied).first())
if record:
if record.proxy_group_id:
raise driver_proxy_group.InvalidProxiedGroup(
group_id=proxied)
record.proxy_group_id = result['id']
else:
# Record may not exist for that PTG yet
record = db.GroupProxyMapping(
policy_target_group_id=proxied,
proxy_group_id=result['id'],
proxied_group_id=None)
session.add(record)
if not validators.is_attr_set(data.get('proxy_type')):
data['proxy_type'] = driver_proxy_group.DEFAULT_PROXY_TYPE
record = (session.query(db.GroupProxyMapping).filter_by(
policy_target_group_id=result['id']).one())
record.proxy_type = data['proxy_type']
result['proxy_type'] = data['proxy_type']
# Proxy PTGs can't have chains enforced
data['enforce_service_chains'] = False
record = (session.query(db.GroupProxyMapping).filter_by(
policy_target_group_id=result['id']).one())
record.enforce_service_chains = data['enforce_service_chains']
result['enforce_service_chains'] = data['enforce_service_chains']
elif validators.is_attr_set(data.get('proxy_type')):
raise driver_proxy_group.ProxyTypeSetWithoutProxiedPTG()
开发者ID:openstack,项目名称:group-based-policy,代码行数:33,代码来源:proxy_group_driver.py
示例6: _determine_port_security_and_has_ip
def _determine_port_security_and_has_ip(self, context, port):
"""Returns a tuple of booleans (port_security_enabled, has_ip).
Port_security is the value associated with the port if one is present
otherwise the value associated with the network is returned. has_ip is
if the port is associated with an ip or not.
"""
has_ip = self._ip_on_port(port)
# we don't apply security groups for dhcp, router
if port.get('device_owner') and net.is_port_trusted(port):
return (False, has_ip)
if validators.is_attr_set(port.get(psec.PORTSECURITY)):
port_security_enabled = port[psec.PORTSECURITY]
# If port has an ip and security_groups are passed in
# conveniently set port_security_enabled to true this way
# user doesn't also have to pass in port_security_enabled=True
# when creating ports.
elif has_ip and validators.is_attr_set(port.get('security_groups')):
port_security_enabled = True
else:
port_security_enabled = self._get_network_security_binding(
context, port['network_id'])
return (port_security_enabled, has_ip)
开发者ID:cubeek,项目名称:neutron,代码行数:26,代码来源:portsecurity_db.py
示例7: _process_portbindings_create_and_update
def _process_portbindings_create_and_update(self, context, port_data,
port):
binding_profile = port.get(portbindings.PROFILE)
binding_profile_set = validators.is_attr_set(binding_profile)
if not binding_profile_set and binding_profile is not None:
del port[portbindings.PROFILE]
binding_vnic = port.get(portbindings.VNIC_TYPE)
binding_vnic_set = validators.is_attr_set(binding_vnic)
if not binding_vnic_set and binding_vnic is not None:
del port[portbindings.VNIC_TYPE]
# REVISIT(irenab) Add support for vnic_type for plugins that
# can handle more than one type.
# Currently implemented for ML2 plugin that does not use
# PortBindingMixin.
host = port_data.get(portbindings.HOST_ID)
host_set = validators.is_attr_set(host)
with context.session.begin(subtransactions=True):
bind_port = context.session.query(
PortBindingPort).filter_by(port_id=port['id']).first()
if host_set:
if not bind_port:
context.session.add(PortBindingPort(port_id=port['id'],
host=host))
else:
bind_port.host = host
else:
host = bind_port.host if bind_port else None
self._extend_port_dict_binding_host(port, host)
开发者ID:21atlas,项目名称:neutron,代码行数:30,代码来源:portbindings_db.py
示例8: _save_subnet
def _save_subnet(self, context,
network,
subnet_args,
dns_nameservers,
host_routes,
subnet_request):
self._validate_subnet_cidr(context, network, subnet_args['cidr'])
self._validate_network_subnetpools(network,
subnet_args['subnetpool_id'],
subnet_args['ip_version'])
subnet = models_v2.Subnet(**subnet_args)
context.session.add(subnet)
# NOTE(changzhi) Store DNS nameservers with order into DB one
# by one when create subnet with DNS nameservers
if validators.is_attr_set(dns_nameservers):
for order, server in enumerate(dns_nameservers):
dns = models_v2.DNSNameServer(
address=server,
order=order,
subnet_id=subnet.id)
context.session.add(dns)
if validators.is_attr_set(host_routes):
for rt in host_routes:
route = models_v2.SubnetRoute(
subnet_id=subnet.id,
destination=rt['destination'],
nexthop=rt['nexthop'])
context.session.add(route)
self.save_allocation_pools(context, subnet,
subnet_request.allocation_pools)
return subnet
开发者ID:gangzip,项目名称:neutron,代码行数:35,代码来源:ipam_backend_mixin.py
示例9: _save_subnet
def _save_subnet(self, context,
network,
subnet_args,
dns_nameservers,
host_routes,
subnet_request):
self._validate_subnet_cidr(context, network, subnet_args['cidr'])
self._validate_network_subnetpools(network,
subnet_args['subnetpool_id'],
subnet_args['ip_version'])
service_types = subnet_args.pop('service_types', [])
segment_id = subnet_args.get('segment_id')
if segment_id:
# TODO(slaweq): integrate check if segment exists in
# self._validate_segment() method
segment = network_obj.NetworkSegment.get_object(context,
id=segment_id)
if not segment:
raise segment_exc.SegmentNotFound(segment_id=segment_id)
subnet = subnet_obj.Subnet(context, **subnet_args)
subnet.create()
# TODO(slaweq): when check is segment exists will be integrated in
# self._validate_segment() method, it should be moved to be done before
# subnet object is created
self._validate_segment(context, network['id'], segment_id)
# NOTE(changzhi) Store DNS nameservers with order into DB one
# by one when create subnet with DNS nameservers
if validators.is_attr_set(dns_nameservers):
for order, server in enumerate(dns_nameservers):
dns = subnet_obj.DNSNameServer(context,
address=server,
order=order,
subnet_id=subnet.id)
dns.create()
if validators.is_attr_set(host_routes):
for rt in host_routes:
route = subnet_obj.Route(
context,
subnet_id=subnet.id,
destination=common_utils.AuthenticIPNetwork(
rt['destination']),
nexthop=netaddr.IPAddress(rt['nexthop']))
route.create()
if validators.is_attr_set(service_types):
for service_type in service_types:
service_type_obj = subnet_obj.SubnetServiceType(
context, subnet_id=subnet.id, service_type=service_type)
service_type_obj.create()
self.save_allocation_pools(context, subnet,
subnet_request.allocation_pools)
return subnet_obj.Subnet.get_object(context, id=subnet.id)
开发者ID:noironetworks,项目名称:neutron,代码行数:59,代码来源:ipam_backend_mixin.py
示例10: _make_subnet_args
def _make_subnet_args(self, detail, subnet, subnetpool_id):
args = super(IpamBackendMixin, self)._make_subnet_args(
detail, subnet, subnetpool_id)
if validators.is_attr_set(subnet.get(segment.SEGMENT_ID)):
args['segment_id'] = subnet[segment.SEGMENT_ID]
if validators.is_attr_set(subnet.get('service_types')):
args['service_types'] = subnet['service_types']
return args
开发者ID:noironetworks,项目名称:neutron,代码行数:8,代码来源:ipam_backend_mixin.py
示例11: test_is_attr_set
def test_is_attr_set(self):
data = constants.ATTR_NOT_SPECIFIED
self.assertIs(validators.is_attr_set(data), False)
data = None
self.assertIs(validators.is_attr_set(data), False)
data = "I'm set"
self.assertIs(validators.is_attr_set(data), True)
开发者ID:muraliran,项目名称:neutron-lib,代码行数:9,代码来源:test_validators.py
示例12: network_matches_filters
def network_matches_filters(self, network, filters):
if not filters:
return True
if any(validators.is_attr_set(network.get(attr)) for attr in provider.ATTRIBUTES):
segments = [self._get_provider_segment(network)]
elif validators.is_attr_set(network.get(mpnet.SEGMENTS)):
segments = self._get_attribute(network, mpnet.SEGMENTS)
else:
return True
return any(self._match_segment(s, filters) for s in segments)
开发者ID:bigswitch,项目名称:neutron,代码行数:10,代码来源:managers.py
示例13: update_port
def update_port(self, context, old_port_db, old_port, new_port):
"""Update the port IPs
Updates the port's IPs based on any new fixed_ips passed in or if
deferred IP allocation is in effect because allocation requires host
binding information that wasn't provided until port update.
:param old_port_db: The port database record
:param old_port: A port dict created by calling _make_port_dict. This
must be called before calling this method in order to
load data from extensions, specifically host binding.
:param new_port: The new port data passed through the API.
"""
old_host = old_port.get(portbindings.HOST_ID)
new_host = new_port.get(portbindings.HOST_ID)
host = new_host if validators.is_attr_set(new_host) else old_host
changes = self.update_port_with_ips(context,
host,
old_port_db,
new_port,
new_port.get('mac_address'))
fixed_ips_requested = validators.is_attr_set(new_port.get('fixed_ips'))
old_ips = old_port.get('fixed_ips')
deferred_ip_allocation = (
old_port.get('ip_allocation') ==
ipalloc_apidef.IP_ALLOCATION_DEFERRED and
host and not old_host and
not old_ips and
not fixed_ips_requested)
if not deferred_ip_allocation:
# Check that any existing IPs are valid on the new segment
new_host_requested = host and host != old_host
if old_ips and new_host_requested and not fixed_ips_requested:
valid_subnets = self._ipam_get_subnets(
context, old_port['network_id'], host,
service_type=old_port.get('device_owner'))
valid_subnet_ids = {s['id'] for s in valid_subnets}
for fixed_ip in old_ips:
if fixed_ip['subnet_id'] not in valid_subnet_ids:
raise segment_exc.HostNotCompatibleWithFixedIps(
host=host, port_id=old_port['id'])
return changes
# Allocate as if this were the port create.
port_copy = copy.deepcopy(old_port)
port_copy['fixed_ips'] = const.ATTR_NOT_SPECIFIED
port_copy.update(new_port)
context.session.expire(old_port_db, ['fixed_ips'])
ips = self.allocate_ips_for_port_and_store(
context, {'port': port_copy}, port_copy['id'])
getattr(old_port_db, 'fixed_ips') # refresh relationship before return
return self.Changes(add=ips, original=[], remove=[])
开发者ID:noironetworks,项目名称:neutron,代码行数:54,代码来源:ipam_backend_mixin.py
示例14: validate_pools_with_subnetpool
def validate_pools_with_subnetpool(self, subnet):
"""Verifies that allocation pools are set correctly
Allocation pools can be set for specific subnet request only
"""
has_allocpool = validators.is_attr_set(subnet['allocation_pools'])
is_any_subnetpool_request = not validators.is_attr_set(subnet['cidr'])
if is_any_subnetpool_request and has_allocpool:
reason = _("allocation_pools allowed only "
"for specific subnet requests.")
raise exc.BadRequest(resource='subnets', msg=reason)
开发者ID:annp,项目名称:neutron,代码行数:11,代码来源:ipam_backend_mixin.py
示例15: _process_provider_create
def _process_provider_create(self, network):
if any(validators.is_attr_set(network.get(attr)) for attr in provider.ATTRIBUTES):
# Verify that multiprovider and provider attributes are not set
# at the same time.
if validators.is_attr_set(network.get(mpnet.SEGMENTS)):
raise mpnet.SegmentsSetInConjunctionWithProviders()
segment = self._get_provider_segment(network)
return [self._process_provider_segment(segment)]
elif validators.is_attr_set(network.get(mpnet.SEGMENTS)):
segments = [self._process_provider_segment(s) for s in network[mpnet.SEGMENTS]]
mpnet.check_duplicate_segments(segments, self.is_partial_segment)
return segments
开发者ID:bigswitch,项目名称:neutron,代码行数:12,代码来源:managers.py
示例16: _save_subnet
def _save_subnet(self, context,
network,
subnet_args,
dns_nameservers,
host_routes,
subnet_request):
self._validate_subnet_cidr(context, network, subnet_args['cidr'])
self._validate_network_subnetpools(network,
subnet_args['subnetpool_id'],
subnet_args['ip_version'])
service_types = subnet_args.pop('service_types', [])
subnet = models_v2.Subnet(**subnet_args)
segment_id = subnet_args.get('segment_id')
try:
context.session.add(subnet)
context.session.flush()
except db_exc.DBReferenceError:
raise segment_exc.SegmentNotFound(segment_id=segment_id)
self._validate_segment(context, network['id'], segment_id)
# NOTE(changzhi) Store DNS nameservers with order into DB one
# by one when create subnet with DNS nameservers
if validators.is_attr_set(dns_nameservers):
for order, server in enumerate(dns_nameservers):
dns = subnet_obj.DNSNameServer(context,
address=server,
order=order,
subnet_id=subnet.id)
dns.create()
if validators.is_attr_set(host_routes):
for rt in host_routes:
route = subnet_obj.Route(
context,
subnet_id=subnet.id,
destination=common_utils.AuthenticIPNetwork(
rt['destination']),
nexthop=netaddr.IPAddress(rt['nexthop']))
route.create()
if validators.is_attr_set(service_types):
for service_type in service_types:
service_type_entry = sst_model.SubnetServiceType(
subnet_id=subnet.id,
service_type=service_type)
context.session.add(service_type_entry)
self.save_allocation_pools(context, subnet,
subnet_request.allocation_pools)
return subnet
开发者ID:sebrandon1,项目名称:neutron,代码行数:53,代码来源:ipam_backend_mixin.py
示例17: _process_dns_floatingip_create_precommit
def _process_dns_floatingip_create_precommit(self, context,
floatingip_data, req_data):
# expects to be called within a plugin's session
dns_domain = req_data.get(dns.DNSDOMAIN)
if not validators.is_attr_set(dns_domain):
return
if not self.dns_driver:
return
dns_name = req_data[dns.DNSNAME]
self._validate_floatingip_dns(dns_name, dns_domain)
current_dns_name, current_dns_domain = (
self._get_requested_state_for_external_dns_service_create(
context, floatingip_data, req_data))
dns_actions_data = None
if current_dns_name and current_dns_domain:
context.session.add(FloatingIPDNS(
floatingip_id=floatingip_data['id'],
dns_name=req_data[dns.DNSNAME],
dns_domain=req_data[dns.DNSDOMAIN],
published_dns_name=current_dns_name,
published_dns_domain=current_dns_domain))
dns_actions_data = DNSActionsData(
current_dns_name=current_dns_name,
current_dns_domain=current_dns_domain)
floatingip_data['dns_name'] = dns_name
floatingip_data['dns_domain'] = dns_domain
return dns_actions_data
开发者ID:annp,项目名称:neutron,代码行数:29,代码来源:dns_db.py
示例18: _get_security_groups_on_port
def _get_security_groups_on_port(self, context, port):
"""Check that all security groups on port belong to tenant.
:returns: all security groups IDs on port belonging to tenant.
"""
port = port['port']
if not validators.is_attr_set(port.get(ext_sg.SECURITYGROUPS)):
return
if port.get('device_owner') and net.is_port_trusted(port):
return
port_sg = port.get(ext_sg.SECURITYGROUPS, [])
tenant_id = port.get('tenant_id')
sg_objs = sg_obj.SecurityGroup.get_objects(context, id=port_sg)
valid_groups = set(
g.id for g in sg_objs
if (not tenant_id or g.tenant_id == tenant_id or
sg_obj.SecurityGroup.is_shared_with_tenant(
context, g.id, tenant_id))
)
requested_groups = set(port_sg)
port_sg_missing = requested_groups - valid_groups
if port_sg_missing:
raise ext_sg.SecurityGroupNotFound(id=', '.join(port_sg_missing))
return list(requested_groups)
开发者ID:openstack,项目名称:neutron,代码行数:29,代码来源:securitygroups_db.py
示例19: _process_l3_update
def _process_l3_update(self, context, net_data, req_data, allow_all=True):
new_value = req_data.get(extnet_apidef.EXTERNAL)
net_id = net_data['id']
if not validators.is_attr_set(new_value):
return
if net_data.get(extnet_apidef.EXTERNAL) == new_value:
return
if new_value:
net_obj.ExternalNetwork(
context, network_id=net_id).create()
net_data[extnet_apidef.EXTERNAL] = True
if allow_all:
context.session.add(rbac_db.NetworkRBAC(
object_id=net_id, action='access_as_external',
target_tenant='*', tenant_id=net_data['tenant_id']))
else:
# must make sure we do not have any external gateway ports
# (and thus, possible floating IPs) on this network before
# allow it to be update to external=False
port = context.session.query(models_v2.Port).filter_by(
device_owner=DEVICE_OWNER_ROUTER_GW,
network_id=net_data['id']).first()
if port:
raise extnet_exc.ExternalNetworkInUse(net_id=net_id)
net_obj.ExternalNetwork.delete_objects(
context, network_id=net_id)
for rbdb in (context.session.query(rbac_db.NetworkRBAC).filter_by(
object_id=net_id, action='access_as_external')):
context.session.delete(rbdb)
net_data[extnet_apidef.EXTERNAL] = False
开发者ID:eayunstack,项目名称:neutron,代码行数:33,代码来源:external_net_db.py
示例20: _get_security_groups_on_port
def _get_security_groups_on_port(self, context, port):
"""Check that all security groups on port belong to tenant.
:returns: all security groups IDs on port belonging to tenant.
"""
p = port['port']
if not validators.is_attr_set(
p.get(securitygroups_db.ext_sg.SECURITYGROUPS)):
return
if p.get('device_owner') and p['device_owner'].startswith('network:'):
return
port_sg = p.get(securitygroups_db.ext_sg.SECURITYGROUPS, [])
filters = {'id': port_sg}
valid_groups = set(g['id'] for g in
self.get_security_groups(context, fields=['id'],
filters=filters))
requested_groups = set(port_sg)
port_sg_missing = requested_groups - valid_groups
if port_sg_missing:
raise securitygroups_db.ext_sg.SecurityGroupNotFound(
id=', '.join(port_sg_missing))
return requested_groups
开发者ID:openstack,项目名称:group-based-policy,代码行数:25,代码来源:patch.py
注:本文中的neutron_lib.api.validators.is_attr_set函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论