本文整理汇总了Python中neutron.plugins.ml2.db.get_dynamic_segment函数的典型用法代码示例。如果您正苦于以下问题:Python get_dynamic_segment函数的具体用法?Python get_dynamic_segment怎么用?Python get_dynamic_segment使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_dynamic_segment函数的11个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_allocate_dynamic_segment_multiple_physnets
def test_allocate_dynamic_segment_multiple_physnets(self):
data = {'network': {'name': 'net1',
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
network = self.deserialize(self.fmt,
network_req.get_response(self.api))
segment = {driver_api.NETWORK_TYPE: 'vlan',
driver_api.PHYSICAL_NETWORK: 'physnet1'}
network_id = network['network']['id']
self.driver.type_manager.allocate_dynamic_segment(
self.context.session, network_id, segment)
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
network_id,
'physnet1')
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
self.assertEqual('physnet1',
dynamic_segment[driver_api.PHYSICAL_NETWORK])
dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
self.assertTrue(dynamic_segmentation_id > 0)
dynamic_segment1 = ml2_db.get_dynamic_segment(self.context.session,
network_id,
'physnet1')
dynamic_segment1_id = dynamic_segment1[driver_api.SEGMENTATION_ID]
self.assertEqual(dynamic_segmentation_id, dynamic_segment1_id)
segment2 = {driver_api.NETWORK_TYPE: 'vlan',
driver_api.PHYSICAL_NETWORK: 'physnet2'}
self.driver.type_manager.allocate_dynamic_segment(
self.context.session, network_id, segment2)
dynamic_segment2 = ml2_db.get_dynamic_segment(self.context.session,
network_id,
'physnet2')
dynamic_segmentation2_id = dynamic_segment2[driver_api.SEGMENTATION_ID]
self.assertNotEqual(dynamic_segmentation_id, dynamic_segmentation2_id)
开发者ID:caboucha,项目名称:neutron,代码行数:33,代码来源:test_ml2_plugin.py
示例2: test_release_network_segments
def test_release_network_segments(self):
data = {'network': {'name': 'net1',
'admin_state_up': True,
'shared': False,
pnet.NETWORK_TYPE: 'vlan',
pnet.PHYSICAL_NETWORK: 'physnet1',
pnet.SEGMENTATION_ID: 1,
'tenant_id': 'tenant_one'}}
network_req = self.new_create_request('networks', data)
res = network_req.get_response(self.api)
network = self.deserialize(self.fmt, res)
network_id = network['network']['id']
segment = {driver_api.NETWORK_TYPE: 'vlan',
driver_api.PHYSICAL_NETWORK: 'physnet2'}
self.driver.type_manager.allocate_dynamic_segment(
self.context.session, network_id, segment)
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session,
network_id,
'physnet2')
self.assertEqual('vlan', dynamic_segment[driver_api.NETWORK_TYPE])
self.assertEqual('physnet2',
dynamic_segment[driver_api.PHYSICAL_NETWORK])
self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
with mock.patch.object(type_vlan.VlanTypeDriver,
'release_segment') as rs:
req = self.new_delete_request('networks', network_id)
res = req.get_response(self.api)
self.assertEqual(2, rs.call_count)
self.assertEqual(ml2_db.get_network_segments(
self.context.session, network_id), [])
self.assertIsNone(ml2_db.get_dynamic_segment(
self.context.session, network_id, 'physnet2'))
开发者ID:caboucha,项目名称:neutron,代码行数:33,代码来源:test_ml2_plugin.py
示例3: test_release_network_segments
def test_release_network_segments(self):
data = {
"network": {
"name": "net1",
"admin_state_up": True,
"shared": False,
pnet.NETWORK_TYPE: "vlan",
pnet.PHYSICAL_NETWORK: "physnet1",
pnet.SEGMENTATION_ID: 1,
"tenant_id": "tenant_one",
}
}
network_req = self.new_create_request("networks", data)
res = network_req.get_response(self.api)
network = self.deserialize(self.fmt, res)
network_id = network["network"]["id"]
segment = {driver_api.NETWORK_TYPE: "vlan", driver_api.PHYSICAL_NETWORK: "physnet2"}
self.driver.type_manager.allocate_dynamic_segment(self.context.session, network_id, segment)
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session, network_id, "physnet2")
self.assertEqual("vlan", dynamic_segment[driver_api.NETWORK_TYPE])
self.assertEqual("physnet2", dynamic_segment[driver_api.PHYSICAL_NETWORK])
self.assertTrue(dynamic_segment[driver_api.SEGMENTATION_ID] > 0)
with mock.patch.object(type_vlan.VlanTypeDriver, "release_segment") as rs:
req = self.new_delete_request("networks", network_id)
res = req.get_response(self.api)
self.assertEqual(2, rs.call_count)
self.assertEqual(ml2_db.get_network_segments(self.context.session, network_id), [])
self.assertIsNone(ml2_db.get_dynamic_segment(self.context.session, network_id, "physnet2"))
开发者ID:CiscoSystems,项目名称:neutron,代码行数:29,代码来源:test_ml2_plugin.py
示例4: test_allocate_release_dynamic_segment
def test_allocate_release_dynamic_segment(self):
data = {"network": {"name": "net1", "tenant_id": "tenant_one"}}
network_req = self.new_create_request("networks", data)
network = self.deserialize(self.fmt, network_req.get_response(self.api))
segment = {driver_api.NETWORK_TYPE: "vlan", driver_api.PHYSICAL_NETWORK: "physnet1"}
network_id = network["network"]["id"]
self.driver.type_manager.allocate_dynamic_segment(self.context.session, network_id, segment)
dynamic_segment = ml2_db.get_dynamic_segment(self.context.session, network_id, "physnet1")
self.assertEqual("vlan", dynamic_segment[driver_api.NETWORK_TYPE])
self.assertEqual("physnet1", dynamic_segment[driver_api.PHYSICAL_NETWORK])
dynamic_segmentation_id = dynamic_segment[driver_api.SEGMENTATION_ID]
self.assertTrue(dynamic_segmentation_id > 0)
self.driver.type_manager.release_dynamic_segment(self.context.session, dynamic_segment[driver_api.ID])
self.assertIsNone(ml2_db.get_dynamic_segment(self.context.session, network_id, "physnet1"))
开发者ID:CiscoSystems,项目名称:neutron,代码行数:14,代码来源:test_ml2_plugin.py
示例5: _bind_segment_to_vif_type
def _bind_segment_to_vif_type(self, context, agent=None):
mappings = {}
if agent:
mappings = agent['configurations'].get('bridge_mappings', {})
for res in self.resource_groups:
if agent and res['ResourceGroupName'] not in mappings:
continue
if res['device_owner'] != context._port['device_owner']:
continue
network_id = context.network.current['id']
dummy_segment = db.get_dynamic_segment(
context.network._plugin_context.session,
network_id, physical_network=res['ResourceGroupName'])
LOG.debug("1st: dummy segment is %s", dummy_segment)
if not dummy_segment:
dummy_segment = {
api.PHYSICAL_NETWORK: res['ResourceGroupName'],
api.NETWORK_TYPE: plugin_const.TYPE_VLAN,
api.SEGMENTATION_ID: 0
}
db.add_network_segment(
context.network._plugin_context.session,
network_id, dummy_segment, is_dynamic=True)
LOG.debug("2nd: dummy segment is %s", dummy_segment)
context.set_binding(dummy_segment[api.ID],
self.vif_type,
{portbindings.CAP_PORT_FILTER: True,
portbindings.OVS_HYBRID_PLUG: True})
return True
return False
开发者ID:sajeevm,项目名称:networking-nec,代码行数:32,代码来源:mech_necnwa.py
示例6: _l2_delete_segment
def _l2_delete_segment(self, context, nwa_info):
session = context.network._plugin_context.session
del_segment = db.get_dynamic_segment(
session,
context.network.current['id'],
physical_network=nwa_info['physical_network'])
if del_segment:
LOG.debug('delete_network_segment %s', del_segment)
db.delete_network_segment(session, del_segment['id'])
开发者ID:sajeevm,项目名称:networking-nec,代码行数:9,代码来源:mech_necnwa.py
示例7: release_dynamic_segment_from_agent
def release_dynamic_segment_from_agent(self, context, **kwargs):
network_id = kwargs.get('network_id')
physical_network = kwargs.get('physical_network')
session = db_api.get_session()
del_segment = db_ml2.get_dynamic_segment(
session, network_id, physical_network=physical_network,
)
if del_segment:
LOG.debug("release_dynamic_segment segment=%s", del_segment)
if del_segment['segmentation_id'] != 0:
db_ml2.delete_network_segment(session, del_segment['id'])
开发者ID:nec-openstack,项目名称:networking-nec-nwa,代码行数:12,代码来源:nwa_l2_server_callback.py
示例8: allocate_dynamic_segment
def allocate_dynamic_segment(self, session, network_id, segment):
"""Allocate a dynamic segment using a partial or full segment dict."""
dynamic_segment = db.get_dynamic_segment(
session, network_id, segment.get(api.PHYSICAL_NETWORK), segment.get(api.SEGMENTATION_ID)
)
if dynamic_segment:
return dynamic_segment
driver = self.drivers.get(segment.get(api.NETWORK_TYPE))
dynamic_segment = driver.obj.reserve_provider_segment(session, segment)
db.add_network_segment(session, network_id, dynamic_segment, is_dynamic=True)
return dynamic_segment
开发者ID:bigswitch,项目名称:neutron,代码行数:13,代码来源:managers.py
示例9: bind_port
def bind_port(self, context):
LOG.debug("Attempting to bind port %(port)s on network %(network)s",
{'port': context.current['id'],
'network': context.network.current['id']})
for segment in context.segments_to_bind:
if self._is_segment_nexus_vxlan(segment):
# Find physical network setting for this host.
host_id = context.current.get(portbindings.HOST_ID)
host_connections = self._get_switch_info(host_id)
if not host_connections:
return
for switch_ip, attr2, attr3 in host_connections:
physnet = self._nexus_switches.get((switch_ip, 'physnet'))
if physnet:
break
else:
raise excep.PhysnetNotConfigured(host_id=host_id,
host_connections=host_connections)
# Allocate dynamic vlan segment.
vlan_segment = {api.NETWORK_TYPE: p_const.TYPE_VLAN,
api.PHYSICAL_NETWORK: physnet}
context.allocate_dynamic_segment(vlan_segment)
# Retrieve the dynamically allocated segment.
# Database has provider_segment dictionary key.
network_id = context.current['network_id']
dynamic_segment = ml2_db.get_dynamic_segment(
db_api.get_session(), network_id, physnet)
# Have other drivers bind the VLAN dynamic segment.
if dynamic_segment:
context.continue_binding(segment[api.ID],
[dynamic_segment])
else:
raise excep.NoDynamicSegmentAllocated(
network_id=network_id, physnet=physnet)
else:
LOG.debug("No binding required for segment ID %(id)s, "
"segment %(seg)s, phys net %(physnet)s, and "
"network type %(nettype)s",
{'id': segment[api.ID],
'seg': segment[api.SEGMENTATION_ID],
'physnet': segment[api.PHYSICAL_NETWORK],
'nettype': segment[api.NETWORK_TYPE]})
开发者ID:svarma1,项目名称:networking-cisco,代码行数:47,代码来源:mech_cisco_nexus.py
示例10: bind_port
def bind_port(self, context):
if not (context.current['device_owner'].startswith('compute') or
context.current['device_owner'] == n_const.DEVICE_OWNER_DHCP):
return
LOG.info("bind_port Attempting to bind port %(port)s"
" on network %(network)s",
{'port': context.current['id'],
'network': context.network.current['id']})
for segment in context.segments_to_bind:
if self._is_segment_h3c_vxlan(segment):
physnet = 'vlanphy'
# Allocate dynamic vlan segment.
vlan_segment = {api.NETWORK_TYPE: p_const.TYPE_VLAN,
api.PHYSICAL_NETWORK: physnet}
context.allocate_dynamic_segment(vlan_segment)
# Retrieve the dynamically allocated segment.
# Database has provider_segment dictionary key.
network_id = context.current['network_id']
dynamic_segment = ml2_db.get_dynamic_segment(
db_api.get_session(), network_id, physnet)
# Have other drivers bind the VLAN dynamic segment.
if dynamic_segment:
context.continue_binding(segment[api.ID],
[dynamic_segment])
else:
LOG.debug("VLAN dynamic segment not allocated."
"VLAN dynamic segment not created for VXLAN "
"overlay static segment. Network segment = %s "
"physnet = %s" % (network_id, physnet))
else:
LOG.debug("No binding required for segment ID %(id)s, "
"segment %(seg)s, phys net %(physnet)s, and "
"network type %(nettype)s",
{'id': segment[api.ID],
'seg': segment[api.SEGMENTATION_ID],
'physnet': segment[api.PHYSICAL_NETWORK],
'nettype': segment[api.NETWORK_TYPE]})
开发者ID:XiaolongHu,项目名称:neutron_agent,代码行数:41,代码来源:mechanism_h3c.py
示例11: _bind_segment_to_vif_type
def _bind_segment_to_vif_type(self, context, physical_network):
if self.multi_dc:
return
network_id = context.network.current['id']
session = context.network._plugin_context.session
dummy_segment = db.get_dynamic_segment(
session, network_id, physical_network=physical_network)
LOG.debug("1st: dummy segment is %s", dummy_segment)
if not dummy_segment:
dummy_segment = {
api.PHYSICAL_NETWORK: physical_network,
api.NETWORK_TYPE: plugin_const.TYPE_VLAN,
api.SEGMENTATION_ID: 0
}
db.add_network_segment(
session, network_id, dummy_segment, is_dynamic=True)
LOG.debug("2nd: dummy segment is %s", dummy_segment)
context.set_binding(dummy_segment[api.ID],
self.vif_type,
{portbindings.CAP_PORT_FILTER: True,
portbindings.OVS_HYBRID_PLUG: True})
开发者ID:nec-openstack,项目名称:networking-nec-nwa,代码行数:21,代码来源:mech_necnwa.py
注:本文中的neutron.plugins.ml2.db.get_dynamic_segment函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论