本文整理汇总了Python中neutron.db.api.get_engine函数的典型用法代码示例。如果您正苦于以下问题:Python get_engine函数的具体用法?Python get_engine怎么用?Python get_engine使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_engine函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: start
def start(self):
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing 500 errors later when they
# are discovered to be broken.
if CONF.database.connection:
api.get_engine().pool.dispose()
self._server = self._service.pool.spawn(self._service._run, self._application, self._service._socket)
开发者ID:balagopalraj,项目名称:clearlinux,代码行数:7,代码来源:wsgi.py
示例2: serve_rpc
def serve_rpc():
plugin = manager.NeutronManager.get_plugin()
# If 0 < rpc_workers then start_rpc_listeners would be called in a
# subprocess and we cannot simply catch the NotImplementedError. It is
# simpler to check this up front by testing whether the plugin supports
# multiple RPC workers.
if not plugin.rpc_workers_supported():
LOG.debug("Active plugin doesn't implement start_rpc_listeners")
if 0 < cfg.CONF.rpc_workers:
LOG.error(
_LE("'rpc_workers = %d' ignored because " "start_rpc_listeners is not implemented."),
cfg.CONF.rpc_workers,
)
raise NotImplementedError()
try:
rpc = RpcWorker(plugin)
if cfg.CONF.rpc_workers < 1:
rpc.start()
return rpc
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
session.get_engine().pool.dispose()
launcher = common_service.ProcessLauncher(wait_interval=1.0)
launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
return launcher
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Unrecoverable error: please check log for " "details."))
开发者ID:kongseokhwan,项目名称:kulcloud-iitp-neutron,代码行数:33,代码来源:service.py
示例3: setUp
def setUp(self):
super(FlavorPluginTestCase, self).setUp()
self.config_parse()
cfg.CONF.set_override(
'core_plugin',
'neutron.tests.unit.extensions.test_flavors.DummyCorePlugin')
cfg.CONF.set_override(
'service_plugins',
['neutron.tests.unit.extensions.test_flavors.DummyServicePlugin'])
self.useFixture(
fixtures.MonkeyPatch('neutron.manager.NeutronManager._instance'))
self.plugin = flavors_plugin.FlavorsPlugin()
self.ctx = context.get_admin_context()
providers = [DummyServiceDriver.get_service_type() +
":" + _provider + ":" + _driver]
self.service_manager = servicetype_db.ServiceTypeManager.get_instance()
self.service_providers = mock.patch.object(
provconf.NeutronModule, 'service_providers').start()
self.service_providers.return_value = providers
for provider in providers:
self.service_manager.add_provider_configuration(
provider.split(':')[0], provconf.ProviderConfiguration())
dbapi.get_engine()
开发者ID:21atlas,项目名称:neutron,代码行数:28,代码来源:test_flavors.py
示例4: setUp
def setUp(self):
super(MySqlBaseFunctionalTest, self).setUp()
self.context = context.Context('fake', 'fake', is_admin=False)
configure_mappers()
engine = neutron_db_api.get_engine()
models.BASEV2.metadata.create_all(engine)
quota_driver.Quota.metadata.create_all(engine)
开发者ID:Anonymike,项目名称:quark,代码行数:7,代码来源:base.py
示例5: register_models
def register_models(self):
"""Register Models and create properties."""
try:
engine = db_api.get_engine()
model.PowerVCMapping.metadata.create_all(engine)
except sql.exc.OperationalError as e:
LOG.info(_("Database registration exception: %s"), e)
开发者ID:openstack,项目名称:powervc-driver,代码行数:7,代码来源:powervc_db_v2.py
示例6: setUp
def setUp(self, policy_drivers=None,
core_plugin=n_test_plugin.PLUGIN_NAME, ml2_options=None,
sc_plugin=None):
policy_drivers = policy_drivers or ['neutron_resources']
config.cfg.CONF.set_override('policy_drivers',
policy_drivers,
group='group_policy')
sc_cfg.cfg.CONF.set_override('servicechain_drivers',
['dummy'], group='servicechain')
config.cfg.CONF.set_override('allow_overlapping_ips', True)
super(CommonNeutronBaseTestCase, self).setUp(core_plugin=core_plugin,
ml2_options=ml2_options,
sc_plugin=sc_plugin)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
res = mock.patch('neutron.db.l3_db.L3_NAT_dbonly_mixin.'
'_check_router_needs_rescheduling').start()
res.return_value = None
self._plugin = manager.NeutronManager.get_plugin()
self._plugin.remove_networks_from_down_agents = mock.Mock()
self._plugin.is_agent_down = mock.Mock(return_value=False)
self._context = nctx.get_admin_context()
plugins = manager.NeutronManager.get_service_plugins()
self._gbp_plugin = plugins.get(pconst.GROUP_POLICY)
self._l3_plugin = plugins.get(pconst.L3_ROUTER_NAT)
config.cfg.CONF.set_override('debug', True)
config.cfg.CONF.set_override('verbose', True)
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:27,代码来源:test_neutron_resources_driver.py
示例7: setUp
def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
node_plumber=None):
if node_drivers:
cfg.CONF.set_override('node_drivers', node_drivers,
group='node_composition_plugin')
cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping',
'chain_mapping'],
group='group_policy')
super(TestQuotasForServiceChain, self).setUp(
core_plugin=core_plugin or CORE_PLUGIN,
gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
cfg.CONF.set_override('quota_servicechain_node', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_spec', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_servicechain_instance', 1,
group='QUOTAS')
cfg.CONF.set_override('quota_service_profile', 1,
group='QUOTAS')
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:26,代码来源:test_ncp_plugin.py
示例8: setUp
def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
sc_plugin=None, **kwargs):
core_plugin = core_plugin or ML2PLUS_PLUGIN
policy_drivers = policy_drivers or ['aim_mapping']
ml2_opts = ml2_options or {'mechanism_drivers': ['logger', 'apic_aim'],
'extension_drivers': ['apic_aim'],
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': ['opflex']}
super(AIMBaseTestCase, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
ml2_options=ml2_opts, sc_plugin=sc_plugin)
config.cfg.CONF.set_override('network_vlan_ranges',
['physnet1:1000:1099'],
group='ml2_type_vlan')
self.saved_keystone_client = ksc_client.Client
ksc_client.Client = test_aim_md.FakeKeystoneClient
self._tenant_id = 'test-tenant'
self._neutron_context = nctx.Context(
'', kwargs.get('tenant_id', self._tenant_id),
is_admin_context=False)
self._neutron_admin_context = nctx.get_admin_context()
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self._aim_mgr = None
self._aim_context = aim_context.AimContext(
self._neutron_context.session)
self._db = model.DbModel()
self._name_mapper = None
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:31,代码来源:test_aim_mapping_driver.py
示例9: setUp
def setUp(self):
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
config.cfg.CONF.set_override('mechanism_drivers',
['logger', 'apic_aim'],
'ml2')
config.cfg.CONF.set_override('extension_drivers',
['apic_aim'],
'ml2')
config.cfg.CONF.set_override('type_drivers',
['opflex', 'local', 'vlan'],
'ml2')
config.cfg.CONF.set_override('tenant_network_types',
['opflex'],
'ml2')
config.cfg.CONF.set_override('network_vlan_ranges',
['physnet1:1000:1099'],
group='ml2_type_vlan')
super(ApicAimTestCase, self).setUp(PLUGIN_NAME)
self.port_create_status = 'DOWN'
self.saved_keystone_client = ksc_client.Client
ksc_client.Client = FakeKeystoneClient
engine = db_api.get_engine()
aim_model_base.Base.metadata.create_all(engine)
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.start_rpc_listeners()
self.driver = self.plugin.mechanism_manager.mech_drivers[
'apic_aim'].obj
开发者ID:ashutosh-mishra,项目名称:my-gbp,代码行数:33,代码来源:test_apic_aim.py
示例10: setUp
def setUp(self):
super(BaseFunctionalTest, self).setUp()
self.context = context.Context('fake', 'fake', is_admin=False)
cfg.CONF.set_override('connection', 'sqlite://', 'database')
configure_mappers()
self.engine = neutron_db_api.get_engine()
models.BASEV2.metadata.create_all(self.engine)
quota_driver.Quota.metadata.create_all(self.engine)
开发者ID:Anonymike,项目名称:quark,代码行数:8,代码来源:base.py
示例11: _launch
def _launch(self, application, workers=0):
service = WorkerService(self, application)
if workers < 1:
# The API service should run in the current process.
self._server = service
service.start()
systemd.notify_once()
else:
# dispose the whole pool before os.fork, otherwise there will
# be shared DB connections in child processes which may cause
# DB errors.
if CONF.database.connection:
api.get_engine().pool.dispose()
# The API service runs in a number of child processes.
# Minimize the cost of checking for child exit by extending the
# wait interval past the default of 0.01s.
self._server = common_service.ProcessLauncher(wait_interval=1.0)
self._server.launch_service(service, workers=workers)
开发者ID:kongseokhwan,项目名称:kulcloud-iitp-neutron,代码行数:18,代码来源:wsgi.py
示例12: setUp
def setUp(self):
super(FlavorManagerTestCase, self).setUp()
self.config_parse()
cfg.CONF.set_override(
'core_plugin',
'neutron.tests.unit.extensions.test_flavors.DummyCorePlugin')
cfg.CONF.set_override(
'service_plugins',
['neutron.tests.unit.extensions.test_flavors.DummyServicePlugin'])
self.useFixture(
fixtures.MonkeyPatch('neutron.manager.NeutronManager._instance'))
self.plugin = flavors_db.FlavorManager(
manager.NeutronManager().get_instance())
self.ctx = context.get_admin_context()
dbapi.get_engine()
开发者ID:dhanunjaya,项目名称:neutron,代码行数:18,代码来源:test_flavors.py
示例13: setUp
def setUp(self):
super(BaseFunctionalTest, self).setUp()
self.context = context.Context('fake', 'fake', is_admin=False)
cfg.CONF.set_override('connection', 'sqlite://', 'database')
configure_mappers()
# Must set the neutron's facade to none before each test
# otherwise the data will be shared between tests
neutron_db_api._FACADE = None
self.engine = neutron_db_api.get_engine()
models.BASEV2.metadata.create_all(self.engine)
quota_driver.Quota.metadata.create_all(self.engine)
开发者ID:Cerberus98,项目名称:quark,代码行数:11,代码来源:base.py
示例14: setUp
def setUp(self):
super(SqlTestCase, self).setUp()
# Register all data models
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
def unregister_models():
"""Unregister all data models."""
model_base.BASEV2.metadata.drop_all(engine)
self.addCleanup(unregister_models)
开发者ID:AsherBond,项目名称:quantum,代码行数:11,代码来源:testlib_api.py
示例15: setUp
def setUp(self, core_plugin=None, gp_plugin=None, service_plugins=None):
testlib_api.SqlTestCase._TABLES_ESTABLISHED = False
if not gp_plugin:
gp_plugin = DB_GP_PLUGIN_KLASS
if not service_plugins:
service_plugins = {'l3_plugin_name': "router",
'gp_plugin_name': gp_plugin,
'servicechain_plugin': SC_PLUGIN_KLASS}
super(GroupPolicyMappingDbTestCase, self).setUp(
core_plugin=core_plugin, gp_plugin=gp_plugin,
service_plugins=service_plugins
)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
开发者ID:tbachman,项目名称:group-based-policy,代码行数:14,代码来源:test_group_policy_mapping_db.py
示例16: _setUp
def _setUp(self):
# Register all data models
engine = db_api.get_engine()
if not SqlFixture._TABLES_ESTABLISHED:
model_base.BASEV2.metadata.create_all(engine)
SqlFixture._TABLES_ESTABLISHED = True
def clear_tables():
with engine.begin() as conn:
for table in reversed(
model_base.BASEV2.metadata.sorted_tables):
conn.execute(table.delete())
self.addCleanup(clear_tables)
开发者ID:21atlas,项目名称:neutron,代码行数:14,代码来源:testlib_api.py
示例17: setUp
def setUp(self):
super(QuarkIpAvailabilityBaseFunctionalTest, self).setUp()
self.connection = neutron_db_api.get_engine().connect()
self.networks = models.BASEV2.metadata.tables["quark_networks"]
self.subnets = models.BASEV2.metadata.tables["quark_subnets"]
self.ip_policy = models.BASEV2.metadata.tables[
"quark_ip_policy"]
self.ip_policy_cidr = models.BASEV2.metadata.tables[
"quark_ip_policy_cidrs"]
self.ip_addresses = models.BASEV2.metadata.tables[
"quark_ip_addresses"]
self.locks = models.BASEV2.metadata.tables[
"quark_locks"]
self.default_kwargs = {
"network_id": "00000000-0000-0000-0000-000000000000",
"ip_version": 4}
开发者ID:Anonymike,项目名称:quark,代码行数:16,代码来源:test_ip_availability.py
示例18: setUp
def setUp(self, core_plugin=None, gp_plugin=None, node_drivers=None,
node_plumber=None):
if node_drivers:
cfg.CONF.set_override('node_drivers', node_drivers,
group='node_composition_plugin')
cfg.CONF.set_override('node_plumber', node_plumber or 'dummy_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping'],
group='group_policy')
super(NodeCompositionPluginTestCase, self).setUp(
core_plugin=core_plugin or CORE_PLUGIN,
gp_plugin=gp_plugin or GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
self.driver = self.sc_plugin.driver_manager.ordered_drivers[0].obj
开发者ID:sriharshabarkuru,项目名称:group-based-policy,代码行数:17,代码来源:test_ncp_plugin.py
示例19: setUp
def setUp(self, core_plugin=None, sc_plugin=None, service_plugins=None,
ext_mgr=None, gp_plugin=None):
if not sc_plugin:
sc_plugin = DB_GP_PLUGIN_KLASS
if not service_plugins:
service_plugins = {
'l3_plugin_name': 'router',
'gp_plugin_name': gp_plugin or GP_PLUGIN_KLASS,
'sc_plugin_name': sc_plugin}
super(ServiceChainDbTestCase, self).setUp(
plugin=core_plugin, ext_mgr=ext_mgr,
service_plugins=service_plugins
)
self.plugin = importutils.import_object(sc_plugin)
if not ext_mgr:
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
开发者ID:tnahak,项目名称:group-based-policy,代码行数:20,代码来源:test_servicechain_db.py
示例20: setUp
def setUp(self):
config.cfg.CONF.set_override('service_delete_timeout',
SERVICE_DELETE_TIMEOUT,
group='nfp_node_driver')
config.cfg.CONF.set_override(
'extension_drivers', ['proxy_group'], group='group_policy')
config.cfg.CONF.set_override('node_drivers', ['nfp_node_driver'],
group='node_composition_plugin')
config.cfg.CONF.set_override('node_plumber', 'stitching_plumber',
group='node_composition_plugin')
config.cfg.CONF.set_override('policy_drivers',
['implicit_policy', 'resource_mapping',
'chain_mapping'],
group='group_policy')
super(NFPNodeDriverTestCase, self).setUp(
core_plugin=CORE_PLUGIN,
gp_plugin=GP_PLUGIN_KLASS,
sc_plugin=SC_PLUGIN_KLASS)
engine = db_api.get_engine()
model_base.BASEV2.metadata.create_all(engine)
开发者ID:ashutosh-mishra,项目名称:group-based-policy,代码行数:21,代码来源:test_nfp_node_driver.py
注:本文中的neutron.db.api.get_engine函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论