本文整理汇总了Python中neutron_lib.plugins.directory.get_plugin函数的典型用法代码示例。如果您正苦于以下问题:Python get_plugin函数的具体用法?Python get_plugin怎么用?Python get_plugin使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_plugin函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _extract
def _extract(self, resource_type, resource_id, field):
# NOTE(salv-orlando): This check currently assumes the parent
# resource is handled by the core plugin. It might be worth
# having a way to map resources to plugins so to make this
# check more general
plugin = directory.get_plugin()
if resource_type in service_const.EXT_PARENT_RESOURCE_MAPPING:
plugin = directory.get_plugin(
service_const.EXT_PARENT_RESOURCE_MAPPING[resource_type])
f = getattr(plugin, 'get_%s' % resource_type)
# f *must* exist, if not found it is better to let neutron
# explode. Check will be performed with admin context
try:
data = f(context.get_admin_context(),
resource_id,
fields=[field])
except exceptions.NotFound as e:
# NOTE(kevinbenton): a NotFound exception can occur if a
# list operation is happening at the same time as one of
# the parents and its children being deleted. So we issue
# a RetryRequest so the API will redo the lookup and the
# problem items will be gone.
raise db_exc.RetryRequest(e)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception('Policy check error while calling %s!', f)
return data[field]
开发者ID:openstack,项目名称:neutron,代码行数:27,代码来源:policy.py
示例2: add_router_interface
def add_router_interface(self, context, router_id, interface_info):
# Validate args
router = self._get_router(context, router_id)
tenant_id = router['tenant_id']
with db.context_manager.writer.using(context):
# create interface in DB
# TODO(wolverineav): hack until fixed at right place
setattr(context, 'GUARD_TRANSACTION', False)
new_intf_info = super(L3RestProxy,
self).add_router_interface(context,
router_id,
interface_info)
port = self._get_port(context, new_intf_info['port_id'])
subnet_id = new_intf_info['subnet_id']
# we will use the port's subnet id as interface's id
intf_details = self._get_router_intf_details(context,
subnet_id)
# get gateway_ip from port instead of gateway_ip
if port.get("fixed_ips"):
intf_details['ip_address'] = port["fixed_ips"][0]['ip_address']
# create interface on the network controller
self.servers.rest_add_router_interface(tenant_id, router_id,
intf_details)
directory.get_plugin().update_port(
context, port['id'], {'port': {'status': 'ACTIVE'}})
return new_intf_info
开发者ID:sarath-kumar,项目名称:networking-bigswitch,代码行数:29,代码来源:l3_router_plugin.py
示例3: get_routers_and_interfaces
def get_routers_and_interfaces(self):
core = directory.get_plugin()
ctx = nctx.get_admin_context()
routers = directory.get_plugin(plugin_constants.L3).get_routers(ctx)
router_interfaces = list()
for r in routers:
ports = core.get_ports(
ctx,
filters={
'device_id': [r['id']],
'device_owner': [n_const.DEVICE_OWNER_ROUTER_INTF]}) or []
for p in ports:
router_interface = r.copy()
net_id = p['network_id']
subnet_id = p['fixed_ips'][0]['subnet_id']
subnet = core.get_subnet(ctx, subnet_id)
ml2_db = NetworkContext(self, ctx, {'id': net_id})
seg_id = ml2_db.network_segments[0]['segmentation_id']
router_interface['seg_id'] = seg_id
router_interface['cidr'] = subnet['cidr']
router_interface['gip'] = subnet['gateway_ip']
router_interface['ip_version'] = subnet['ip_version']
router_interface['subnet_id'] = subnet_id
router_interfaces.append(router_interface)
return routers, router_interfaces
开发者ID:openstack,项目名称:networking-arista,代码行数:26,代码来源:l3_arista.py
示例4: setUp
def setUp(self):
self.cfg = self.useFixture(config_fixture.Config())
self.cfg.config(core_plugin='neutron.plugins.ml2.plugin.Ml2Plugin')
self.cfg.config(mechanism_drivers=[
'logger', 'opendaylight_v2'], group='ml2')
self.useFixture(odl_base.OpenDaylightRestClientFixture())
self.cfg.config(service_plugins=['odl-router_v2'])
core_plugin = cfg.CONF.core_plugin
service_plugins = {'l3_plugin_name': 'odl-router_v2'}
self.useFixture(odl_base.OpenDaylightJournalThreadFixture())
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'_record_in_journal').start()
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'sync_from_callback_precommit').start()
mock.patch.object(mech_driver_v2.OpenDaylightMechanismDriver,
'sync_from_callback_postcommit').start()
self.useFixture(odl_base.OpenDaylightPeriodicTaskFixture())
self.useFixture(odl_base.OpenDaylightFeaturesFixture())
self.useFixture(odl_base.OpenDaylightPseudoAgentPrePopulateFixture())
super(OpenDaylightL3TestCase, self).setUp(
plugin=core_plugin, service_plugins=service_plugins)
self.plugin = directory.get_plugin()
self.plugin._network_is_external = mock.Mock(return_value=True)
self.driver = directory.get_plugin(constants.L3)
self.thread = journal.OpenDaylightJournalThread()
开发者ID:openstack,项目名称:networking-odl,代码行数:25,代码来源:test_l3_odl_v2.py
示例5: setUp
def setUp(self):
config.cfg.CONF.set_override("extension_drivers", self._extension_drivers, group="ml2")
mock.patch("neutron.services.qos.notification_drivers.message_queue" ".RpcQosServiceNotificationDriver").start()
super(TestRevisionPlugin, self).setUp()
self.cp = directory.get_plugin()
self.l3p = directory.get_plugin(constants.L3)
self.ctx = nctx.get_admin_context()
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:test_revision_plugin.py
示例6: __init__
def __init__(self, sc_plugin, context, service_chain_instance,
service_chain_specs, current_service_chain_node, position,
current_service_profile, provider_group, consumer_group=None,
management_group=None, original_service_chain_node=None,
original_service_profile=None, service_targets=None,
classifier=None, is_consumer_external=False):
self._gbp_plugin = get_gbp_plugin()
self._sc_plugin = sc_plugin
self._plugin_context = context
self._admin_context = None
self._service_chain_instance = service_chain_instance
self._current_service_chain_node = current_service_chain_node
self._current_service_profile = current_service_profile
self._original_service_chain_node = original_service_chain_node
self._original_service_profile = original_service_profile
self._service_targets = service_targets
self._service_chain_specs = service_chain_specs
self._provider_group = provider_group
self._consumer_group = consumer_group
self._management_group = management_group
self._classifier = classifier
self._is_consumer_external = is_consumer_external
self._relevant_specs = None
self._core_plugin = directory.get_plugin()
self._l3_plugin = directory.get_plugin(constants.L3)
self._position = position
开发者ID:openstack,项目名称:group-based-policy,代码行数:26,代码来源:context.py
示例7: create_port_postcommit
def create_port_postcommit(self, context):
if not self._is_port_supported(context.current):
LOG.debug("Ignoring unsupported vnic type")
return
if self._is_port_sriov(context.current):
LOG.debug("SR-IOV port, nothing to do")
return
# If bsn_l3 plugin and it is a gateway port, bind to ivs.
if (self.l3_bsn_plugin and
context.current['device_owner'] == ROUTER_GATEWAY_PORT_OWNER):
directory.get_plugin().update_port_status(
context._plugin_context, context.current['id'],
const.PORT_STATUS_ACTIVE)
try:
# create port on the network controller
port = self._prepare_port_for_controller(context)
except servermanager.TenantIDNotFound as e:
LOG.warning("Skipping create port %(port)s as %(exp)s",
{'port': context.current.get('id'), 'exp': e})
return
if port:
# For vhostuser type ports, membership rule and endpoint was
# created during bind_port, so skip this
if port[portbindings.VIF_TYPE] == portbindings.VIF_TYPE_VHOST_USER:
return
self.async_port_create(port["network"]["tenant_id"],
port["network"]["id"], port)
开发者ID:sarath-kumar,项目名称:networking-bigswitch,代码行数:32,代码来源:driver.py
示例8: setUp
def setUp(self, policy_drivers=None, core_plugin=None, l3_plugin=None,
ml2_options=None, sc_plugin=None, qos_plugin=None,
trunk_plugin=None):
core_plugin = core_plugin or ML2PLUS_PLUGIN
policy_drivers = policy_drivers or ['neutron_resources']
config.cfg.CONF.set_override('policy_drivers',
policy_drivers,
group='group_policy')
sc_cfg.cfg.CONF.set_override('node_drivers',
['dummy_driver'],
group='node_composition_plugin')
sc_cfg.cfg.CONF.set_override('node_plumber', 'dummy_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('allow_overlapping_ips', True)
super(CommonNeutronBaseTestCase, self).setUp(core_plugin=core_plugin,
l3_plugin=l3_plugin,
ml2_options=ml2_options,
sc_plugin=sc_plugin,
qos_plugin=qos_plugin,
trunk_plugin=trunk_plugin)
res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
'_check_router_needs_rescheduling').start()
res.return_value = None
self._plugin = directory.get_plugin()
self._plugin.remove_networks_from_down_agents = mock.Mock()
self._plugin.is_agent_down = mock.Mock(return_value=False)
self._context = nctx.get_admin_context()
self._gbp_plugin = directory.get_plugin(pconst.GROUP_POLICY)
self._l3_plugin = directory.get_plugin(constants.L3)
config.cfg.CONF.set_override('debug', True)
开发者ID:openstack,项目名称:group-based-policy,代码行数:30,代码来源:test_neutron_resources_driver.py
示例9: setUp
def setUp(self):
cfg.CONF.set_override('extension_drivers',
self._extension_drivers,
group='ml2')
super(TestRevisionPlugin, self).setUp()
self.cp = directory.get_plugin()
self.l3p = directory.get_plugin(constants.L3)
self._ctx = nctx.get_admin_context()
开发者ID:igordcard,项目名称:neutron,代码行数:8,代码来源:test_revision_plugin.py
示例10: get_active_networks_info
def get_active_networks_info(self, context, **kwargs):
"""Returns all the networks/subnets/ports in system."""
host = kwargs.get('host')
LOG.debug('get_active_networks_info from %s', host)
networks = self._get_active_networks(context, **kwargs)
plugin = directory.get_plugin()
filters = {'network_id': [network['id'] for network in networks]}
ports = plugin.get_ports(context, filters=filters)
# default is to filter subnets based on 'enable_dhcp' flag
if kwargs.get('enable_dhcp_filter', True):
filters['enable_dhcp'] = [True]
# NOTE(kevinbenton): we sort these because the agent builds tags
# based on position in the list and has to restart the process if
# the order changes.
subnets = sorted(plugin.get_subnets(context, filters=filters),
key=operator.itemgetter('id'))
# Handle the possibility that the dhcp agent(s) only has connectivity
# inside a segment. If the segment service plugin is loaded and
# there are active dhcp enabled subnets, then filter out the subnets
# that are not on the host's segment.
seg_plug = directory.get_plugin(
segment_ext.SegmentPluginBase.get_plugin_type())
seg_subnets = [subnet for subnet in subnets
if subnet.get('segment_id')]
nonlocal_subnets = []
if seg_plug and seg_subnets:
host_segment_ids = seg_plug.get_segments_by_hosts(context, [host])
# Gather the ids of all the subnets that are on a segment that
# this host touches
seg_subnet_ids = {subnet['id'] for subnet in seg_subnets
if subnet['segment_id'] in host_segment_ids}
# Gather the ids of all the networks that are routed
routed_net_ids = {seg_subnet['network_id']
for seg_subnet in seg_subnets}
# Remove the subnets with segments that are not in the same
# segments as the host. Do this only for the networks that are
# routed because we want non-routed networks to work as
# before.
nonlocal_subnets = [subnet for subnet in seg_subnets
if subnet['id'] not in seg_subnet_ids]
subnets = [subnet for subnet in subnets
if subnet['network_id'] not in routed_net_ids or
subnet['id'] in seg_subnet_ids]
grouped_subnets = self._group_by_network_id(subnets)
grouped_nonlocal_subnets = self._group_by_network_id(nonlocal_subnets)
grouped_ports = self._group_by_network_id(ports)
for network in networks:
network['subnets'] = grouped_subnets.get(network['id'], [])
network['non_local_subnets'] = (
grouped_nonlocal_subnets.get(network['id'], []))
network['ports'] = grouped_ports.get(network['id'], [])
return networks
开发者ID:igordcard,项目名称:neutron,代码行数:54,代码来源:dhcp_rpc.py
示例11: test_net_tag_bumps_net_revision
def test_net_tag_bumps_net_revision(self):
with self.network() as network:
rev = network["network"]["revision_number"]
tag_plugin = directory.get_plugin("TAG")
tag_plugin.update_tag(self.ctx, "networks", network["network"]["id"], "mytag")
updated = directory.get_plugin().get_network(self.ctx, network["network"]["id"])
self.assertGreater(updated["revision_number"], rev)
tag_plugin.delete_tag(self.ctx, "networks", network["network"]["id"], "mytag")
rev = updated["revision_number"]
updated = directory.get_plugin().get_network(self.ctx, network["network"]["id"])
self.assertGreater(updated["revision_number"], rev)
开发者ID:openstack,项目名称:neutron,代码行数:11,代码来源:test_revision_plugin.py
示例12: __init__
def __init__(self):
# REVISIT: Defer until after validating config? Or pass in PD
# & MD?
self.core_plugin = directory.get_plugin()
self.md = self.core_plugin.mechanism_manager.mech_drivers[
'apic_aim'].obj
self.pd = self.md.gbp_driver
self.sfcd = None
sfc_plugin = directory.get_plugin('sfc')
if sfc_plugin:
driver = sfc_plugin.driver_manager.drivers.get('aim')
if driver:
self.sfcd = driver.obj
开发者ID:openstack,项目名称:group-based-policy,代码行数:13,代码来源:aim_validation.py
示例13: test_net_tag_bumps_net_revision
def test_net_tag_bumps_net_revision(self):
with self.network() as network:
rev = network['network']['revision_number']
tag_plugin = directory.get_plugin('TAG')
tag_plugin.update_tag(self.ctx, 'networks',
network['network']['id'], 'mytag')
updated = directory.get_plugin().get_network(
self.ctx, network['network']['id'])
self.assertGreater(updated['revision_number'], rev)
tag_plugin.delete_tag(self.ctx, 'networks',
network['network']['id'], 'mytag')
rev = updated['revision_number']
updated = directory.get_plugin().get_network(
self.ctx, network['network']['id'])
self.assertGreater(updated['revision_number'], rev)
开发者ID:igordcard,项目名称:neutron,代码行数:15,代码来源:test_revision_plugin.py
示例14: get_resources
def get_resources(cls):
"""Returns Ext Resources."""
ip_controller = IpAddressesController(
directory.get_plugin())
ip_port_controller = IpAddressPortController(
directory.get_plugin())
resources = []
resources.append(extensions.ResourceExtension(
Ip_addresses.get_alias(),
ip_controller))
parent = {'collection_name': 'ip_addresses',
'member_name': 'ip_address'}
resources.append(extensions.ResourceExtension(
'ports', ip_port_controller, parent=parent))
return resources
开发者ID:openstack,项目名称:quark,代码行数:15,代码来源:ip_addresses.py
示例15: tearDown
def tearDown(self):
# disables api_replay_mode for these tests
cfg.CONF.set_override('api_replay_mode', False)
# remove the extension from the plugin
directory.get_plugin().supported_extension_aliases.remove(
api_replay.ALIAS)
# Revert the attributes map back to normal
for attr_name in ('ports', 'networks', 'security_groups',
'security_group_rules', 'routers', 'policies'):
attr_info = attributes.RESOURCES[attr_name]
attr_info['id']['allow_post'] = False
super(TestApiReplay, self).tearDown()
开发者ID:openstack,项目名称:vmware-nsx,代码行数:15,代码来源:test_api_replay.py
示例16: test_validate_port_has_binding_host
def test_validate_port_has_binding_host(self):
with self.port() as port:
core_plugin = directory.get_plugin()
port['port']['binding:host_id'] = 'host'
core_plugin.update_port(self.context, port['port']['id'], port)
validator = rules.TrunkPortValidator(port['port']['id'])
self.assertTrue(validator.is_bound(self.context))
开发者ID:igordcard,项目名称:neutron,代码行数:7,代码来源:test_rules.py
示例17: index
def index(self, request, **kwargs):
plugin = directory.get_plugin()
policy.enforce(request.context,
"get_%s" % DHCP_AGENTS,
{})
return plugin.list_dhcp_agents_hosting_network(
request.context, kwargs['network_id'])
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:7,代码来源:dhcpagentscheduler.py
示例18: _add_az_to_response
def _add_az_to_response(router_res, router_db):
l3_plugin = directory.get_plugin(constants.L3)
if not extensions.is_extension_supported(
l3_plugin, 'router_availability_zone'):
return
router_res['availability_zones'] = (
l3_plugin.get_router_availability_zones(router_db))
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:router.py
示例19: _get_net_id_by_portchain_id
def _get_net_id_by_portchain_id(self, context, portchain_id):
sfc_plugin = directory.get_plugin('sfc')
port_chain = sfc_plugin.get_port_chain(context, portchain_id)
if not port_chain:
raise n_exceptions.PortChainNotFound(portchain_id=portchain_id)
port_pairs = sfc_plugin.get_port_pairs(
context, {'portpairgroup_id': port_chain['port_pair_groups']})
if not port_pairs:
raise n_exceptions.PortPairsNotFoundForPortPairGroup(
portpairgroup_id=port_chain['port_pair_groups'])
core_plugin = directory.get_plugin()
port = super(central_plugin.TricirclePlugin, core_plugin
).get_port(context, port_pairs[0]['ingress'])
if not port:
raise n_exceptions.PortNotFound(port_id=port_pairs[0]['ingress'])
return port['network_id']
开发者ID:openstack,项目名称:tricircle,代码行数:16,代码来源:central_fc_driver.py
示例20: _update_routed_network_host_routes
def _update_routed_network_host_routes(self, context, network_id,
deleted_cidr=None):
"""Update host routes on subnets on a routed network after event
Host routes on the subnets on a routed network may need updates after
any CREATE or DELETE event.
:param network_id: Network ID
:param deleted_cidr: The cidr of a deleted subnet.
"""
for subnet in self._get_subnets(context, network_id):
host_routes = [{'destination': str(route.destination),
'nexthop': route.nexthop}
for route in subnet.host_routes]
calc_host_routes = self._calculate_routed_network_host_routes(
context=context,
ip_version=subnet.ip_version,
network_id=subnet.network_id,
subnet_id=subnet.id,
segment_id=subnet.segment_id,
host_routes=copy.deepcopy(host_routes),
gateway_ip=subnet.gateway_ip,
deleted_cidr=deleted_cidr)
if self._host_routes_need_update(host_routes, calc_host_routes):
LOG.debug(
"Updating host routes for subnet %s on routed network %s",
(subnet.id, subnet.network_id))
plugin = directory.get_plugin()
plugin.update_subnet(context, subnet.id,
{'subnet': {
'host_routes': calc_host_routes}})
开发者ID:igordcard,项目名称:neutron,代码行数:31,代码来源:plugin.py
注:本文中的neutron_lib.plugins.directory.get_plugin函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论