本文整理汇总了Python中neutron.openstack.common.importutils.import_object函数的典型用法代码示例。如果您正苦于以下问题:Python import_object函数的具体用法?Python import_object怎么用?Python import_object使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了import_object函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: __init__
def __init__(self):
self.base_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BRIDGE,
portbindings.CAPABILITIES: {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
db.initialize()
self._parse_network_vlan_ranges()
db.sync_network_states(self.network_vlan_ranges)
self.mvrp = cfg.CONF.AGENT.mvrp
self.tenant_network_type = cfg.CONF.VLANS.tenant_network_type
if self.tenant_network_type not in [constants.TYPE_LOCAL,
constants.TYPE_VLAN,
constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Service terminated!"),
self.tenant_network_type)
sys.exit(1)
self._setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver
)
LOG.debug(_("Linux Bridge Plugin initialization complete"))
开发者ID:noelbk,项目名称:neutron,代码行数:26,代码来源:lb_neutron_plugin.py
示例2: __init__
def __init__(self, configfile=None):
self.extra_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_OVS,
portbindings.CAPABILITIES: {
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
ovs_db_v2.initialize()
self._parse_network_vlan_ranges()
ovs_db_v2.sync_vlan_allocations(self.network_vlan_ranges)
self.tenant_network_type = cfg.CONF.OVS.tenant_network_type
if self.tenant_network_type not in [constants.TYPE_LOCAL,
constants.TYPE_VLAN,
constants.TYPE_GRE,
constants.TYPE_VXLAN,
constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Server terminated!"),
self.tenant_network_type)
sys.exit(1)
self.enable_tunneling = cfg.CONF.OVS.enable_tunneling
self.tunnel_id_ranges = []
if self.enable_tunneling:
self._parse_tunnel_id_ranges()
ovs_db_v2.sync_tunnel_allocations(self.tunnel_id_ranges)
elif self.tenant_network_type in constants.TUNNEL_NETWORK_TYPES:
LOG.error(_("Tunneling disabled but tenant_network_type is '%s'. "
"Server terminated!"), self.tenant_network_type)
sys.exit(1)
self.setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver
)
开发者ID:brandon-adams,项目名称:neutron,代码行数:35,代码来源:ovs_neutron_plugin.py
示例3: __init__
def __init__(self):
ndb.initialize()
self.ofc = ofc_manager.OFCManager()
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
# Set the plugin default extension path
# if no api_extensions_path is specified.
if not config.CONF.api_extensions_path:
config.CONF.set_override('api_extensions_path',
'neutron/plugins/nec/extensions')
self.setup_rpc()
self.l3_rpc_notifier = nec_router.L3AgentNotifyAPI()
self.network_scheduler = importutils.import_object(
config.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
config.CONF.router_scheduler_driver
)
nec_router.load_driver(self, self.ofc)
self.port_handlers = {
'create': {
const.DEVICE_OWNER_ROUTER_GW: self.create_router_port,
const.DEVICE_OWNER_ROUTER_INTF: self.create_router_port,
'default': self.activate_port_if_ready,
},
'delete': {
const.DEVICE_OWNER_ROUTER_GW: self.delete_router_port,
const.DEVICE_OWNER_ROUTER_INTF: self.delete_router_port,
'default': self.deactivate_port,
}
}
开发者ID:qingw,项目名称:neutron,代码行数:35,代码来源:nec_plugin.py
示例4: __init__
def __init__(self):
super(LinuxBridgePluginV2, self).__init__()
self.base_binding_dict = {
portbindings.VIF_TYPE: portbindings.VIF_TYPE_BRIDGE,
portbindings.VIF_DETAILS: {
# TODO(rkukura): Replace with new VIF security details
portbindings.CAP_PORT_FILTER:
'security-group' in self.supported_extension_aliases}}
self._parse_network_vlan_ranges()
db.sync_network_states(self.network_vlan_ranges)
self.tenant_network_type = cfg.CONF.VLANS.tenant_network_type
if self.tenant_network_type not in [svc_constants.TYPE_LOCAL,
svc_constants.TYPE_VLAN,
svc_constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Service terminated!"),
self.tenant_network_type)
sys.exit(1)
self._setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver
)
LOG.debug(_("Linux Bridge Plugin initialization complete"))
开发者ID:kavonm,项目名称:neutron,代码行数:26,代码来源:lb_neutron_plugin.py
示例5: __init__
def __init__(self):
"""Initialize Brocade Plugin.
Specify switch address and db configuration.
"""
self.supported_extension_aliases = ["binding", "security-group",
"router", "extraroute",
"agent", "l3_agent_scheduler",
"dhcp_agent_scheduler"]
self.physical_interface = (cfg.CONF.PHYSICAL_INTERFACE.
physical_interface)
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
db.configure_db()
self.ctxt = context.get_admin_context()
self.ctxt.session = db.get_session()
self._vlan_bitmap = vbm.VlanBitmap(self.ctxt)
self._setup_rpc()
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
cfg.CONF.router_scheduler_driver
)
self.brocade_init()
开发者ID:noxhana,项目名称:neutron,代码行数:27,代码来源:NeutronPlugin.py
示例6: _load_device_plugins
def _load_device_plugins(self):
# Load OVS plug-in, if configured, and use its database parameters
if len(self.ovs_plugin) != 0:
# if ovs plug-in is configured, use ovs plug-in's database parameters
self.ovs_plugin_obj = importutils.import_object(self.ovs_plugin)
self.omni_db_obj = importutils.import_object(omni_const.OMNI_DB_CLASS)
self.omni_db_obj.initialize(None, None, None, omni_const.OVS_TABLES)
else:
# if ovs plug-in is not configured, use omni plug-in's database parameters
self.omni_db_obj = importutils.import_object(omni_const.OMNI_DB_CLASS)
self.omni_db_obj.initialize(None, None, None, omni_const.OMNI_TABLES)
self._parse_network_vlan_ranges()
self.omni_db_obj.sync_vlan_allocations(self.network_vlan_ranges)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
#cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS) # for havana
# Load Omni device plug-in
if len(self.omni_plugin) != 0:
self.omni_plugin_obj = importutils.import_object(self.omni_plugin)
self.omni_plugin_obj.initialize(self.omni_db_obj)
else:
LOG.info("Omni Device plug-in is not specified in the config!!!")
return
LOG.info("Device plug-ins loaded!")
开发者ID:Alcatel-LucentEnterpriseData,项目名称:ALUe-OONP_H_R01,代码行数:28,代码来源:omniswitch_network_plugin.py
示例7: __init__
def __init__(self):
self._load_config()
self._load_device_plugins()
self.setup_rpc()
self.network_scheduler = importutils.import_object( cfg.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(cfg.CONF.router_scheduler_driver)
开发者ID:Alcatel-LucentEnterpriseData,项目名称:ALUe-OONP_H_R01,代码行数:7,代码来源:omniswitch_network_plugin.py
示例8: __init__
def __init__(self, conf):
self.conf = conf
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
# the driver is optional
msg = _('Error importing interface driver: %s')
raise SystemExit(msg % conf.interface_driver)
vif_driver = None
try:
self.driver = importutils.import_object(
conf.device_driver,
config.get_root_helper(self.conf),
conf.loadbalancer_state_path,
vif_driver,
self._vip_plug_callback
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
ctx = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_PROCESS_ON_HOST,
ctx,
conf.host
)
self.needs_resync = False
self.cache = LogicalDeviceCache()
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:29,代码来源:agent_manager.py
示例9: __init__
def __init__(self, conf):
self.conf = conf
try:
vif_driver = importutils.import_object(conf.interface_driver, conf)
except ImportError:
msg = _('Error importing interface driver: %s')
raise SystemExit(msg % conf.interface_driver)
try:
self.driver = importutils.import_object(
conf.device_driver,
config.get_root_helper(self.conf),
conf.loadbalancer_state_path,
vif_driver,
self._vip_plug_callback
)
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
self.agent_state = {
'binary': 'neutron-loadbalancer-agent',
'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
'configurations': {'device_driver': conf.device_driver,
'interface_driver': conf.interface_driver},
'agent_type': constants.AGENT_TYPE_LOADBALANCER,
'start_flag': True}
self.admin_state_up = True
self.context = context.get_admin_context_without_session()
self._setup_rpc()
self.needs_resync = False
self.cache = LogicalDeviceCache()
开发者ID:davecahill,项目名称:neutron,代码行数:34,代码来源:agent_manager.py
示例10: __init__
def __init__(self):
ndb.initialize()
self.ofc = ofc_manager.OFCManager()
self.base_binding_dict = self._get_base_binding_dict()
portbindings_base.register_port_dict_function()
neutron_extensions.append_api_extensions_path(extensions.__path__)
self.setup_rpc()
self.l3_rpc_notifier = nec_router.L3AgentNotifyAPI()
self.network_scheduler = importutils.import_object(
config.CONF.network_scheduler_driver
)
self.router_scheduler = importutils.import_object(
config.CONF.router_scheduler_driver
)
nec_router.load_driver(self, self.ofc)
self.port_handlers = {
'create': {
const.DEVICE_OWNER_ROUTER_GW: self.create_router_port,
const.DEVICE_OWNER_ROUTER_INTF: self.create_router_port,
'default': self.activate_port_if_ready,
},
'delete': {
const.DEVICE_OWNER_ROUTER_GW: self.delete_router_port,
const.DEVICE_OWNER_ROUTER_INTF: self.delete_router_port,
'default': self.deactivate_port,
}
}
开发者ID:ChengZuo,项目名称:neutron,代码行数:32,代码来源:nec_plugin.py
示例11: __init__
def __init__(self):
ndb.initialize()
self.ofc = ofc_manager.OFCManager()
# Set the plugin default extension path
# if no api_extensions_path is specified.
if not config.CONF.api_extensions_path:
config.CONF.set_override("api_extensions_path", "neutron/plugins/nec/extensions")
self.setup_rpc()
self.network_scheduler = importutils.import_object(config.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(config.CONF.router_scheduler_driver)
开发者ID:kevinbenton,项目名称:neutron,代码行数:13,代码来源:nec_plugin.py
示例12: __init__
def __init__(self):
# First load drivers, then initialize DB, then initialize drivers
self.type_manager = managers.TypeManager()
self.mechanism_manager = managers.MechanismManager()
db.initialize()
self.type_manager.initialize()
self.mechanism_manager.initialize()
self._setup_rpc()
# REVISIT(rkukura): Use stevedore for these?
self.network_scheduler = importutils.import_object(cfg.CONF.network_scheduler_driver)
self.router_scheduler = importutils.import_object(cfg.CONF.router_scheduler_driver)
LOG.info(_("Modular L2 Plugin initialization complete"))
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:15,代码来源:plugin.py
示例13: __init__
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.root_helper = config.get_root_helper(self.conf)
self.router_info = {}
if not self.conf.interface_driver:
raise SystemExit(_('An interface driver must be specified'))
try:
self.driver = importutils.import_object(
self.conf.interface_driver,
self.conf
)
except Exception:
msg = _("Error importing interface driver "
"'%s'") % self.conf.interface_driver
raise SystemExit(msg)
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
self.fullsync = True
self.sync_sem = semaphore.Semaphore(1)
if self.conf.use_namespaces:
self._destroy_router_namespaces(self.conf.router_id)
super(L3NATAgent, self).__init__(host=self.conf.host)
开发者ID:CiscoSystems,项目名称:quantum,代码行数:27,代码来源:l3_agent.py
示例14: new_nexus_init
def new_nexus_init(self):
self._client = importutils.import_object(NEXUS_DRIVER)
self._client.nexus_switches = {
(NEXUS_IP_ADDRESS, HOSTNAME1): NEXUS_PORT1,
(NEXUS_IP_ADDRESS, 'ssh_port'): NEXUS_SSH_PORT,
(NEXUS_IP_ADDRESS, HOSTNAME2): NEXUS_PORT2,
(NEXUS_IP_ADDRESS, 'ssh_port'): NEXUS_SSH_PORT,
(NEXUS_PC_IP_ADDRESS, 'ssh_port'): NEXUS_SSH_PORT,
}
self._nexus_switches = {
('NEXUS_SWITCH', NEXUS_IP_ADDRESS, HOSTNAME1): NEXUS_PORT1,
('NEXUS_SWITCH', NEXUS_IP_ADDRESS, HOSTNAME2): NEXUS_PORT2,
('NEXUS_SWITCH', NEXUS_PC_IP_ADDRESS, HOSTNAME3):
NEXUS_PORTCHANNELS,
('NEXUS_SWITCH', NEXUS_PC_IP_ADDRESS, 'ssh_port'):
NEXUS_SSH_PORT,
('NEXUS_SWITCH', NEXUS_IP_ADDRESS, HOSTNAME3):
NEXUS_PORTCHANNELS,
('NEXUS_SWITCH', NEXUS_IP_ADDRESS, 'ssh_port'): NEXUS_SSH_PORT,
}
self._client.credentials = {
NEXUS_IP_ADDRESS: {
'username': 'admin',
'password': 'pass1234'
},
NEXUS_PC_IP_ADDRESS: {
'username': 'admin',
'password': 'password'
},
}
db.configure_db()
开发者ID:50infivedays,项目名称:neutron,代码行数:31,代码来源:test_nexus_plugin.py
示例15: __init__
def __init__(self):
"""Initialize the segmentation manager.
Checks which device plugins are configured, and load the inventories
those device plugins for which the inventory is configured.
"""
conf.CiscoConfigOptions()
self._plugins = {}
for key in conf.CISCO_PLUGINS.keys():
plugin_obj = conf.CISCO_PLUGINS[key]
if plugin_obj is not None:
self._plugins[key] = importutils.import_object(plugin_obj)
LOG.debug(_("Loaded device plugin %s"),
conf.CISCO_PLUGINS[key])
if ((const.VSWITCH_PLUGIN in self._plugins) and
hasattr(self._plugins[const.VSWITCH_PLUGIN],
"supported_extension_aliases")):
self.supported_extension_aliases.extend(
self._plugins[const.VSWITCH_PLUGIN].
supported_extension_aliases)
# Initialize credential store after database initialization
cred.Store.initialize()
LOG.debug(_("%(module)s.%(name)s init done"),
{'module': __name__,
'name': self.__class__.__name__})
# Check whether we have a valid Nexus driver loaded
self.is_nexus_plugin = False
nexus_driver = conf.CISCO.nexus_driver
if nexus_driver.endswith('CiscoNEXUSDriver'):
self.is_nexus_plugin = True
开发者ID:AsherBond,项目名称:quantum,代码行数:34,代码来源:virt_phy_sw_v2.py
示例16: __init__
def __init__(self, conf):
LOG.debug(_("Initializing firewall agent"))
self.conf = conf
fwaas_driver_class_path = cfg.CONF.fwaas.driver
self.fwaas_enabled = cfg.CONF.fwaas.enabled
# None means l3-agent has no information on the server
# configuration due to the lack of RPC support.
if self.neutron_service_plugins is not None:
fwaas_plugin_configured = (constants.FIREWALL
in self.neutron_service_plugins)
if fwaas_plugin_configured and not self.fwaas_enabled:
msg = _("FWaaS plugin is configured in the server side, but "
"FWaaS is disabled in L3-agent.")
LOG.error(msg)
raise SystemExit(1)
self.fwaas_enabled = self.fwaas_enabled and fwaas_plugin_configured
if self.fwaas_enabled:
try:
self.fwaas_driver = importutils.import_object(
fwaas_driver_class_path)
LOG.debug(_("FWaaS Driver Loaded: '%s'"),
fwaas_driver_class_path)
except ImportError:
msg = _('Error importing FWaaS device driver: %s')
raise ImportError(msg % fwaas_driver_class_path)
self.services_sync = False
self.root_helper = config.get_root_helper(conf)
# setup RPC to msg fwaas plugin
self.fwplugin_rpc = FWaaSL3PluginApi(topics.FIREWALL_PLUGIN,
conf.host)
super(FWaaSL3AgentRpcCallback, self).__init__(host=conf.host)
开发者ID:HybridCloud-dew,项目名称:hws,代码行数:33,代码来源:firewall_l3_agent.py
示例17: __init__
def __init__(self, server_timeout=None):
super(NeutronRestProxyV2, self).__init__()
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs())
pl_config.register_config()
self.evpool = eventlet.GreenPool(cfg.CONF.RESTPROXY.thread_pool_size)
# Include the BigSwitch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__)
self.add_meta_server_route = cfg.CONF.RESTPROXY.add_meta_server_route
# init network ctrl connections
self.servers = servermanager.ServerPool(server_timeout)
self.servers.get_topo_function = self._get_all_data
self.servers.get_topo_function_args = {'get_ports': True,
'get_floating_ips': True,
'get_routers': True}
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
# setup rpc for security and DHCP agents
self._setup_rpc()
if cfg.CONF.RESTPROXY.sync_data:
self._send_all_data()
LOG.debug(_("NeutronRestProxyV2: initialization done"))
开发者ID:dobriak,项目名称:neutron,代码行数:30,代码来源:plugin.py
示例18: __init__
def __init__(self, server_timeout=None):
super(NeutronRestProxyV2, self).__init__()
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
version_string_with_vcs())
pl_config.register_config()
# Include the BigSwitch Extensions path in the api_extensions
neutron_extensions.append_api_extensions_path(extensions.__path__)
self.add_meta_server_route = cfg.CONF.RESTPROXY.add_meta_server_route
# init network ctrl connections
self.servers = servermanager.ServerPool(server_timeout)
# init dhcp support
self.topic = topics.PLUGIN
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self._dhcp_agent_notifier
)
self.conn = rpc.create_connection(new=True)
self.callbacks = RpcProxy()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
if cfg.CONF.RESTPROXY.sync_data:
self._send_all_data()
LOG.debug(_("NeutronRestProxyV2: initialization done"))
开发者ID:yuhui7red,项目名称:neutron,代码行数:34,代码来源:plugin.py
示例19: __init__
def __init__(self, host, conf=None):
if conf:
self.conf = conf
else:
self.conf = cfg.CONF
self.root_helper = config.get_root_helper(self.conf)
self.router_info = {}
self._check_config_params()
try:
self.driver = importutils.import_object(
self.conf.interface_driver,
self.conf
)
except Exception:
msg = _("Error importing interface driver "
"'%s'") % self.conf.interface_driver
LOG.error(msg)
raise SystemExit(1)
self.context = context.get_admin_context_without_session()
self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.updated_routers = set()
self.removed_routers = set()
self.sync_progress = False
self._clean_stale_namespaces = self.conf.use_namespaces
self._queue = RouterProcessingQueue()
super(L3NATAgent, self).__init__(conf=self.conf)
self.target_ex_net_id = None
开发者ID:aignatov,项目名称:neutron,代码行数:34,代码来源:l3_agent.py
示例20: _load_drivers
def _load_drivers(self):
"""Loads plugin-driver from configuration."""
LOG.info(_("Loading Metering driver %s") % self.conf.driver)
if not self.conf.driver:
raise SystemExit(_('A metering driver must be specified'))
self.metering_driver = importutils.import_object(
self.conf.driver, self, self.conf)
开发者ID:uramanan,项目名称:neutron,代码行数:7,代码来源:metering_agent.py
注:本文中的neutron.openstack.common.importutils.import_object函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论