本文整理汇总了Python中neutron.openstack.common.importutils.import_class函数的典型用法代码示例。如果您正苦于以下问题:Python import_class函数的具体用法?Python import_class怎么用?Python import_class使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了import_class函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self, host=None):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync = False
self.conf = cfg.CONF
self.cache = NetworkCache()
self.root_helper = config.get_root_helper(self.conf)
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
# Work out if DHCP serving for bridged or routed VM interfaces.
try:
interface_driver = importutils.import_object(
self.conf.interface_driver, self.conf)
self.bridged = interface_driver.bridged()
except Exception as e:
msg = (_("Error importing interface driver '%(driver)s': "
"%(inner)s") % {'driver': self.conf.interface_driver,
'inner': e})
LOG.error(msg)
raise SystemExit(msg)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN,
ctx,
self.bridged and
self.conf.use_namespaces)
# create dhcp dir to store dhcp info
dhcp_dir = os.path.dirname("/%s/dhcp/" % self.conf.state_path)
if not os.path.isdir(dhcp_dir):
os.makedirs(dhcp_dir, 0o755)
self.dhcp_version = self.dhcp_driver_cls.check_version()
self._populate_networks_cache()
开发者ID:projectcalico,项目名称:calico-neutron,代码行数:31,代码来源:dhcp_agent.py
示例2: __init__
def __init__(self, plugin):
self._resource_name = RESOURCE_NAME
self._plugin = plugin
self._driver = importutils.import_class(
cfg.CONF.QUOTAS.quota_driver
)
self._update_extended_attributes = True
开发者ID:CampHarmony,项目名称:neutron,代码行数:7,代码来源:quotasv2.py
示例3: _parse_class_args
def _parse_class_args(self):
"""Parse the contrailplugin.ini file.
Opencontrail supports extension such as ipam, policy, these extensions
can be configured in the plugin configuration file as shown below.
Plugin then loads the specified extensions.
contrail_extensions=ipam:<classpath>,policy:<classpath>
"""
contrail_extensions = cfg.CONF.APISERVER.contrail_extensions
# If multiple class specified for same extension, last one will win
# according to DictOpt behavior
for ext_name, ext_class in contrail_extensions.items():
try:
if not ext_class or ext_class == 'None':
self.supported_extension_aliases.append(ext_name)
continue
ext_class = importutils.import_class(ext_class)
ext_instance = ext_class()
ext_instance.set_core(self)
for method in dir(ext_instance):
for prefix in ['get', 'update', 'delete', 'create']:
if method.startswith('%s_' % prefix):
setattr(self, method,
ext_instance.__getattribute__(method))
self.supported_extension_aliases.append(ext_name)
except Exception:
LOG.exception(_("Contrail Backend Error"))
# Converting contrail backend error to Neutron Exception
raise InvalidContrailExtensionError(
ext_name=ext_name, ext_class=ext_class)
self._build_auth_details()
开发者ID:litdong001,项目名称:contrail-neutron-plugin,代码行数:32,代码来源:contrail_plugin_base.py
示例4: _parse_class_args
def _parse_class_args(self):
"""Parse the contrailplugin.ini file.
Opencontrail supports extension such as ipam, policy, these extensions
can be configured in the plugin configuration file as shown below.
Plugin then loads the specified extensions.
contrail_extensions=ipam:<classpath>,policy:<classpath>
"""
self._contrail_extensions_instances = []
contrail_extensions = cfg.CONF.APISERVER.contrail_extensions
#If multiple class specified for same extension, last one will win
#according to DictOpt beheivior
for ext_name, ext_class in contrail_extensions.items():
try:
if not ext_class:
LOG.error(_('Malformed contrail extension...'))
continue
self.supported_extension_aliases.append(ext_name)
ext_class = importutils.import_class(ext_class)
ext_instance = ext_class()
ext_instance.set_core(self)
self._contrail_extensions_instances.append(ext_instance)
except Exception:
LOG.exception(_("Contrail Backend Error"))
#Converting contrail backend error to Neutron Exception
raise InvalidContrailExtensionError(
ext_name=ext_name, ext_class=ext_class)
self._multi_tenancy = cfg.CONF.APISERVER.multi_tenancy
self._max_retries = cfg.CONF.APISERVER.max_retries
self._retry_interval = cfg.CONF.APISERVER.retry_interval
开发者ID:nplanel,项目名称:contrail-neutron-plugin,代码行数:32,代码来源:contrail_plugin_core.py
示例5: load_driver
def load_driver(plugin, ofc_manager):
if (PROVIDER_OPENFLOW in ROUTER_DRIVER_MAP and
not ofc_manager.driver.router_supported):
LOG.warning(
_('OFC does not support router with provider=%(provider)s, '
'so removed it from supported provider '
'(new router driver map=%(driver_map)s)'),
{'provider': PROVIDER_OPENFLOW,
'driver_map': ROUTER_DRIVER_MAP})
del ROUTER_DRIVER_MAP[PROVIDER_OPENFLOW]
if config.PROVIDER.default_router_provider not in ROUTER_DRIVER_MAP:
LOG.error(_('default_router_provider %(default)s is supported! '
'Please specify one of %(supported)s'),
{'default': config.PROVIDER.default_router_provider,
'supported': ROUTER_DRIVER_MAP.keys()})
raise SystemExit(1)
enabled_providers = (set(config.PROVIDER.router_providers +
[config.PROVIDER.default_router_provider]) &
set(ROUTER_DRIVER_MAP.keys()))
for driver in enabled_providers:
driver_klass = importutils.import_class(ROUTER_DRIVER_MAP[driver])
ROUTER_DRIVERS[driver] = driver_klass(plugin, ofc_manager)
LOG.info(_('Enabled router drivers: %s'), ROUTER_DRIVERS.keys())
if not ROUTER_DRIVERS:
LOG.error(_('No router provider is enabled. neutron-server terminated!'
' (supported=%(supported)s, configured=%(config)s)'),
{'supported': ROUTER_DRIVER_MAP.keys(),
'config': config.PROVIDER.router_providers})
raise SystemExit(1)
开发者ID:li-ma,项目名称:neutron,代码行数:35,代码来源:nec_router.py
示例6: setUp
def setUp(self):
super(NeutronPolicyTestCase, self).setUp()
policy.reset()
policy.init()
self.addCleanup(policy.reset)
self.admin_only_legacy = "role:admin"
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
# Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
attributes.RESOURCE_ATTRIBUTE_MAP.update(FAKE_RESOURCE)
self.rules = dict((k, common_policy.parse_rule(v)) for k, v in {
"context_is_admin": "role:admin",
"admin_or_network_owner": "rule:context_is_admin or "
"tenant_id:%(network:tenant_id)s",
"admin_or_owner": ("rule:context_is_admin or "
"tenant_id:%(tenant_id)s"),
"admin_only": "rule:context_is_admin",
"regular_user": "role:user",
"shared": "field:networks:shared=True",
"external": "field:networks:router:external=True",
"default": '@',
"create_network": "rule:admin_or_owner",
"create_network:shared": "rule:admin_only",
"update_network": '@',
"update_network:shared": "rule:admin_only",
"get_network": "rule:admin_or_owner or "
"rule:shared or "
"rule:external",
"create_port:mac": "rule:admin_or_network_owner",
"create_something": "rule:admin_or_owner",
"create_something:attr": "rule:admin_or_owner",
"create_something:attr:sub_attr_1": "rule:admin_or_owner",
"create_something:attr:sub_attr_2": "rule:admin_only",
"get_firewall_policy": "rule:admin_or_owner or "
"rule:shared",
"get_firewall_rule": "rule:admin_or_owner or "
"rule:shared"
}.items())
def fakepolicyinit():
common_policy.set_rules(common_policy.Rules(self.rules))
def remove_fake_resource():
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
self.patcher = mock.patch.object(neutron.policy,
'init',
new=fakepolicyinit)
self.patcher.start()
self.addCleanup(remove_fake_resource)
self.context = context.Context('fake', 'fake', roles=['user'])
plugin_klass = importutils.import_class(
"neutron.db.db_base_plugin_v2.NeutronDbPluginV2")
self.manager_patcher = mock.patch('neutron.manager.NeutronManager')
fake_manager = self.manager_patcher.start()
fake_manager_instance = fake_manager.return_value
fake_manager_instance.plugin = plugin_klass()
开发者ID:ArifovicH,项目名称:neutron,代码行数:59,代码来源:test_policy.py
示例7: _load_plugin
def _load_plugin(self, plugin_provider):
LOG.debug(_("Plugin location: %s"), plugin_provider)
try:
plugin = importutils.import_class(plugin_provider)
return plugin()
except ImportError:
with excutils.save_and_reraise_exception():
LOG.exception(_('Load plugin error: %(provider)s'),
provider=plugin_provider)
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:9,代码来源:plugin_manager.py
示例8: __init__
def __init__(self, app, conf, **local_conf):
cfg.CONF.register_opts(self.opts)
# Determine the context class to use
self.ctxcls = RequestContext
if 'context_class' in local_conf:
self.ctxcls = importutils.import_class(local_conf['context_class'])
super(ContextMiddleware, self).__init__(app)
开发者ID:aliliang,项目名称:tricircle,代码行数:9,代码来源:neutron_proxy_context.py
示例9: __init__
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.binary = binary
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
super(Service, self).__init__(host, topic, manager=self.manager)
开发者ID:xiongmeng1108,项目名称:gcloud7_neutron-2014.2.2,代码行数:14,代码来源:service.py
示例10: _get_plugin_instance
def _get_plugin_instance(self, namespace, plugin_provider):
try:
# Try to resolve plugin by name
mgr = driver.DriverManager(namespace, plugin_provider)
plugin_class = mgr.driver
except RuntimeError as e1:
# fallback to class name
try:
plugin_class = importutils.import_class(plugin_provider)
except ImportError as e2:
LOG.exception(_("Error loading plugin by name, %s"), e1)
LOG.exception(_("Error loading plugin by class, %s"), e2)
raise ImportError(_("Plugin not found."))
return plugin_class()
开发者ID:KumarAcharya,项目名称:neutron,代码行数:14,代码来源:manager.py
示例11: __init__
def __init__(self, host=None):
super(DhcpAgent, self).__init__(host=host)
self.needs_resync = False
self.conf = cfg.CONF
self.cache = NetworkCache()
self.root_helper = config.get_root_helper(self.conf)
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx)
self.device_manager = DeviceManager(self.conf, self.plugin_rpc)
self.lease_relay = DhcpLeaseRelay(self.update_lease)
self.dhcp_version = self.dhcp_driver_cls.check_version()
self._populate_networks_cache()
开发者ID:armando-migliaccio,项目名称:neutron,代码行数:14,代码来源:dhcp_agent.py
示例12: _parse_class_args
def _parse_class_args(self):
"""Parse the contrailplugin.ini file.
Opencontrail supports extension such as ipam, policy, these extensions
can be configured in the plugin configuration file as shown below.
Plugin then loads the specified extensions.
contrail_extensions=ipam:<classpath>,policy:<classpath>
"""
contrail_extensions = cfg.CONF.APISERVER.contrail_extensions
# If multiple class specified for same extension, last one will win
# according to DictOpt behavior
for ext_name, ext_class in contrail_extensions.items():
try:
if not ext_class:
LOG.error(_('Malformed contrail extension...'))
continue
self.supported_extension_aliases.append(ext_name)
if ext_class == 'None':
continue
ext_class = importutils.import_class(ext_class)
ext_instance = ext_class()
ext_instance.set_core(self)
for method in dir(ext_instance):
for prefix in ['get', 'update', 'delete', 'create']:
if method.startswith('%s_' % prefix):
setattr(self, method,
ext_instance.__getattribute__(method))
except Exception:
LOG.exception(_("Contrail Backend Error"))
# Converting contrail backend error to Neutron Exception
raise InvalidContrailExtensionError(
ext_name=ext_name, ext_class=ext_class)
#keystone
self._authn_token = None
if cfg.CONF.auth_strategy == 'keystone':
kcfg = cfg.CONF.keystone_authtoken
body = '{"auth":{"passwordCredentials":{'
body += ' "username": "%s",' % (kcfg.admin_user)
body += ' "password": "%s"},' % (kcfg.admin_password)
body += ' "tenantName":"%s"}}' % (kcfg.admin_tenant_name)
self._authn_body = body
self._authn_token = cfg.CONF.keystone_authtoken.admin_token
self._keystone_url = "%s://%s:%s%s" % (
cfg.CONF.keystone_authtoken.auth_protocol,
cfg.CONF.keystone_authtoken.auth_host,
cfg.CONF.keystone_authtoken.auth_port,
"/v2.0/tokens")
开发者ID:CodeWire,项目名称:Standup-a-3-node-OpenStack-HA-Controller-Cluster-with-Contrail-SDN,代码行数:50,代码来源:contrail_plugin.py
示例13: _parse_class_args
def _parse_class_args(self):
"""Parse the contrailplugin.ini file.
Opencontrail supports extension such as ipam, policy, these extensions
can be configured in the plugin configuration file as shown below.
Plugin then loads the specified extensions.
contrail_extensions=ipam:<classpath>,policy:<classpath>
"""
self._contrail_extensions_instances = []
contrail_extensions = cfg.CONF.APISERVER.contrail_extensions
#If multiple class specified for same extension, last one will win
#according to DictOpt beheivior
for ext_name, ext_class in contrail_extensions.items():
try:
if not ext_class:
LOG.error(_('Malformed contrail extension...'))
continue
self.supported_extension_aliases.append(ext_name)
ext_class = importutils.import_class(ext_class)
ext_instance = ext_class()
ext_instance.set_core(self)
self._contrail_extensions_instances.append(ext_instance)
except Exception:
LOG.exception(_("Contrail Backend Error"))
#Converting contrail backend error to Neutron Exception
raise InvalidContrailExtensionError(
ext_name=ext_name, ext_class=ext_class)
self._multi_tenancy = cfg.CONF.APISERVER.multi_tenancy
self._max_retries = cfg.CONF.APISERVER.max_retries
self._retry_interval = cfg.CONF.APISERVER.retry_interval
#keystone
self._authn_token = None
if cfg.CONF.auth_strategy == 'keystone':
kcfg = cfg.CONF.keystone_authtoken
body = '{"auth":{"passwordCredentials":{'
body += ' "username": "%s",' % (kcfg.admin_user)
body += ' "password": "%s"},' % (kcfg.admin_password)
body += ' "tenantName":"%s"}}' % (kcfg.admin_tenant_name)
self._authn_body = body
self._authn_token = cfg.CONF.keystone_authtoken.admin_token
self._keystone_url = "%s://%s:%s%s" % (
cfg.CONF.keystone_authtoken.auth_protocol,
cfg.CONF.keystone_authtoken.auth_host,
cfg.CONF.keystone_authtoken.auth_port,
"/v2.0/tokens")
开发者ID:haripk,项目名称:contrail-neutron-plugin,代码行数:49,代码来源:contrail_plugin_core.py
示例14: _load_service_plugins
def _load_service_plugins(self):
"""Loads service plugins.
Starts from the core plugin and checks if it supports
advanced services then loads classes provided in configuration.
"""
# load services from the core plugin first
self._load_services_from_core_plugin()
plugin_providers = cfg.CONF.service_plugins
LOG.debug(_("Loading service plugins: %s"), plugin_providers)
for provider in plugin_providers:
if provider == '':
continue
try:
LOG.info(_("Loading Plugin: %s"), provider)
plugin_class = importutils.import_class(provider)
except ImportError:
LOG.exception(_("Error loading plugin"))
raise ImportError(_("Plugin not found."))
plugin_inst = plugin_class()
# only one implementation of svc_type allowed
# specifying more than one plugin
# for the same type is a fatal exception
if plugin_inst.get_plugin_type() in self.service_plugins:
raise ValueError(_("Multiple plugins for service "
"%s were configured"),
plugin_inst.get_plugin_type())
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
# search for possible agent notifiers declared in service plugin
# (needed by agent management extension)
if (hasattr(self.plugin, 'agent_notifiers') and
hasattr(plugin_inst, 'agent_notifiers')):
self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
LOG.debug(_("Successfully loaded %(type)s plugin. "
"Description: %(desc)s"),
{"type": plugin_inst.get_plugin_type(),
"desc": plugin_inst.get_plugin_description()})
开发者ID:CampHarmony,项目名称:neutron,代码行数:42,代码来源:manager.py
示例15: load_driver
def load_driver(plugin, ofc_manager):
if PROVIDER_OPENFLOW in ROUTER_DRIVER_MAP and not ofc_manager.driver.router_supported:
LOG.warning(
_(
"OFC does not support router with provider=%(provider)s, "
"so removed it from supported provider "
"(new router driver map=%(driver_map)s)"
),
{"provider": PROVIDER_OPENFLOW, "driver_map": ROUTER_DRIVER_MAP},
)
del ROUTER_DRIVER_MAP[PROVIDER_OPENFLOW]
if config.PROVIDER.default_router_provider not in ROUTER_DRIVER_MAP:
LOG.error(
_("default_router_provider %(default)s is supported! " "Please specify one of %(supported)s"),
{"default": config.PROVIDER.default_router_provider, "supported": ROUTER_DRIVER_MAP.keys()},
)
raise SystemExit(1)
enabled_providers = set(config.PROVIDER.router_providers + [config.PROVIDER.default_router_provider]) & set(
ROUTER_DRIVER_MAP.keys()
)
for driver in enabled_providers:
driver_klass = importutils.import_class(ROUTER_DRIVER_MAP[driver])
ROUTER_DRIVERS[driver] = driver_klass(plugin, ofc_manager)
LOG.info(_("Enabled router drivers: %s"), ROUTER_DRIVERS.keys())
if not ROUTER_DRIVERS:
LOG.error(
_(
"No router provider is enabled. neutron-server terminated!"
" (supported=%(supported)s, configured=%(config)s)"
),
{"supported": ROUTER_DRIVER_MAP.keys(), "config": config.PROVIDER.router_providers},
)
raise SystemExit(1)
开发者ID:sukhdevkapur,项目名称:neutron,代码行数:39,代码来源:nec_router.py
示例16: __init__
def __init__(self, options=None, config_file=None):
# If no options have been provided, create an empty dict
if not options:
options = {}
msg = validate_pre_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# NOTE(jkoelker) Testing for the subclass with the __subclasshook__
# breaks tach monitoring. It has been removed
# intentianally to allow v2 plugins to be monitored
# for performance metrics.
plugin_provider = cfg.CONF.core_plugin
LOG.debug(_("Plugin location: %s"), plugin_provider)
# If the plugin can't be found let them know gracefully
try:
LOG.info(_("Loading Plugin: %s"), plugin_provider)
plugin_klass = importutils.import_class(plugin_provider)
except ImportError:
LOG.exception(_("Error loading plugin"))
raise Exception(_("Plugin not found. "))
legacy.modernize_quantum_config(cfg.CONF)
self.plugin = plugin_klass()
msg = validate_post_plugin_load()
if msg:
LOG.critical(msg)
raise Exception(msg)
# core plugin as a part of plugin collection simplifies
# checking extensions
# TODO(enikanorov): make core plugin the same as
# the rest of service plugins
self.service_plugins = {constants.CORE: self.plugin}
self._load_service_plugins()
开发者ID:CampHarmony,项目名称:neutron,代码行数:37,代码来源:manager.py
示例17: run_migrations_offline
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
neutron_config = config.neutron_config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
logging_config.fileConfig(config.config_file_name)
plugin_class_path = neutron_config.core_plugin
active_plugins = [plugin_class_path]
active_plugins += neutron_config.service_plugins
for class_path in active_plugins:
importutils.import_class(class_path)
# set the target for 'autogenerate' support
target_metadata = model_base.BASEV2.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with either a URL
or an Engine.
Calls to context.execute() here emit the given string to the
script output.
"""
开发者ID:KumarAcharya,项目名称:neutron,代码行数:30,代码来源:env.py
示例18: _validate_security_group_rule
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron import quota
from oslo.config import cfg
from quark.db import api as db_api
from quark import plugin_views as v
CONF = cfg.CONF
LOG = logging.getLogger("neutron.quark")
DEFAULT_SG_UUID = "00000000-0000-0000-0000-000000000000"
net_driver = (importutils.import_class(CONF.QUARK.net_driver))()
net_driver.load_config()
def _validate_security_group_rule(context, rule):
PROTOCOLS = {"icmp": 1, "tcp": 6, "udp": 17}
ALLOWED_WITH_RANGE = [6, 17]
if rule.get("remote_ip_prefix") and rule.get("remote_group_id"):
raise sg_ext.SecurityGroupRemoteGroupAndRemoteIpPrefix()
protocol = rule.pop('protocol')
port_range_min = rule['port_range_min']
port_range_max = rule['port_range_max']
if protocol:
开发者ID:kilogram,项目名称:quark,代码行数:31,代码来源:security_groups.py
示例19: _load_plugin
def _load_plugin(self, plugin_provider):
LOG.debug(_("Plugin location: %s"), plugin_provider)
plugin_klass = importutils.import_class(plugin_provider)
return plugin_klass()
开发者ID:chenglong7997,项目名称:neutron,代码行数:4,代码来源:meta_neutron_plugin.py
示例20: fileConfig
DATABASE_QUOTA_DRIVER = "neutron.extensions._quotav2_driver.DbQuotaDriver"
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
neutron_config = config.neutron_config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
plugin_class_path = neutron_config.core_plugin
plugin_klass = importutils.import_class(plugin_class_path)
# set the target for 'autogenerate' support
target_metadata = model_base.BASEV2.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
开发者ID:read1984,项目名称:neutron,代码行数:29,代码来源:env.py
注:本文中的neutron.openstack.common.importutils.import_class函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论