本文整理汇总了Python中neutron.db.api.get_session函数的典型用法代码示例。如果您正苦于以下问题:Python get_session函数的具体用法?Python get_session怎么用?Python get_session使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_session函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: get_ip_availability
def get_ip_availability(**kwargs):
LOG.debug("Begin querying %s" % kwargs)
used_ips = get_used_ips(neutron_db_api.get_session(), **kwargs)
unused_ips = get_unused_ips(neutron_db_api.get_session(), used_ips,
**kwargs)
LOG.debug("End querying")
return dict(used=used_ips, unused=unused_ips)
开发者ID:evanscottgray,项目名称:quark,代码行数:7,代码来源:ip_availability.py
示例2: _get_agent_fdb
def _get_agent_fdb(self, segment, port, agent_host):
if not agent_host:
return
network_id = port['network_id']
session = db_api.get_session()
agent_active_ports = l2pop_db.get_agent_network_active_port_count(
session, agent_host, network_id)
agent = l2pop_db.get_agent_by_host(db_api.get_session(), agent_host)
if not self._validate_segment(segment, port['id'], agent):
return
agent_ip = l2pop_db.get_agent_ip(agent)
other_fdb_entries = self._get_fdb_entries_template(
segment, agent_ip, port['network_id'])
if agent_active_ports == 0:
# Agent is removing its last activated port in this network,
# other agents needs to be notified to delete their flooding entry.
other_fdb_entries[network_id]['ports'][agent_ip].append(
const.FLOODING_ENTRY)
# Notify other agents to remove fdb rules for current port
if port['device_owner'] != const.DEVICE_OWNER_DVR_INTERFACE:
fdb_entries = self._get_port_fdb_entries(port)
other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
return other_fdb_entries
开发者ID:21atlas,项目名称:neutron,代码行数:28,代码来源:mech_driver.py
示例3: update_port_state_with_notifier
def update_port_state_with_notifier(self, rpc_context, **kwargs):
port_id = kwargs.get('port_id')
network_id = kwargs.get('network_id')
network_type = kwargs.get('network_type')
segmentation_id = kwargs.get('segmentation_id')
physical_network = kwargs.get('physical_network')
# 1 update segment
session = db_api.get_session()
with session.begin(subtransactions=True):
try:
query = (session.query(models_ml2.NetworkSegment).
filter_by(network_id=network_id))
query = query.filter_by(physical_network=physical_network)
query = query.filter_by(is_dynamic=True)
record = query.one()
record.segmentation_id = segmentation_id
except sa_exc.NoResultFound:
pass
# 2 change port state
plugin = manager.NeutronManager.get_plugin()
plugin.update_port_status(
rpc_context,
port_id,
constants.PORT_STATUS_ACTIVE
)
# 3 serch db from port_id
session = db_api.get_session()
port = None
with session.begin(subtransactions=True):
try:
port_db = (session.query(models_v2.Port).
enable_eagerloads(False).
filter(models_v2.Port.id.startswith(port_id)).
one())
port = plugin._make_port_dict(port_db)
except sa_exc.NoResultFound:
LOG.error(_LE("Can't find port with port_id %s"),
port_id)
except sa_exc.MultipleResultsFound:
LOG.error(_LE("Multiple ports have port_id starting with %s"),
port_id)
# 4 send notifier
if port is not None:
LOG.debug("notifier port_update %(net_type)s, %(seg_id)s, "
"%(physnet)s",
{'net_type': network_type,
'seg_id': segmentation_id,
'physnet': physical_network})
plugin.notifier.port_update(
rpc_context, port,
network_type,
segmentation_id,
physical_network
)
return {}
开发者ID:nec-openstack,项目名称:networking-nec-nwa,代码行数:59,代码来源:nwa_l2_server_callback.py
示例4: _test_and_create_object
def _test_and_create_object(id):
try:
session = db_api.get_session()
with session.begin():
session.query(models.DFLockedObjects).filter_by(
object_uuid=id).one()
except orm_exc.NoResultFound:
try:
session = db_api.get_session()
with session.begin():
_create_db_row(session, oid=id)
except db_exc.DBDuplicateEntry:
# the lock is concurrently created.
pass
开发者ID:almightyyeh,项目名称:dragonflow,代码行数:14,代码来源:lockedobjects_db.py
示例5: _verify_get_nsx_switch_and_port_id
def _verify_get_nsx_switch_and_port_id(self, exp_ls_uuid, exp_lp_uuid):
# The nvplib and db calls are mocked, therefore the cluster
# and the neutron_port_id parameters can be set to None
ls_uuid, lp_uuid = nsx_utils.get_nsx_switch_and_port_id(
db_api.get_session(), None, None)
self.assertEqual(exp_ls_uuid, ls_uuid)
self.assertEqual(exp_lp_uuid, lp_uuid)
开发者ID:Taejun,项目名称:neutron,代码行数:7,代码来源:test_nsx_utils.py
示例6: test_populate_policy_profile_delete
def test_populate_policy_profile_delete(self):
# Patch the Client class with the TestClient class
with mock.patch(n1kv_client.__name__ + ".Client",
new=fake_client.TestClient):
# Patch the _get_total_profiles() method to return a custom value
with mock.patch(fake_client.__name__ +
'.TestClient._get_total_profiles') as obj_inst:
# Return 3 policy profiles
obj_inst.return_value = 3
plugin = manager.NeutronManager.get_plugin()
plugin._populate_policy_profiles()
db_session = db.get_session()
profile = n1kv_db_v2.get_policy_profile(
db_session, '00000000-0000-0000-0000-000000000001')
# Verify that DB contains only 3 policy profiles
self.assertEqual('pp-1', profile['name'])
profile = n1kv_db_v2.get_policy_profile(
db_session, '00000000-0000-0000-0000-000000000002')
self.assertEqual('pp-2', profile['name'])
profile = n1kv_db_v2.get_policy_profile(
db_session, '00000000-0000-0000-0000-000000000003')
self.assertEqual('pp-3', profile['name'])
self.assertRaises(c_exc.PolicyProfileIdNotFound,
n1kv_db_v2.get_policy_profile,
db_session,
'00000000-0000-0000-0000-000000000004')
# Return 2 policy profiles
obj_inst.return_value = 2
plugin._populate_policy_profiles()
# Verify that the third policy profile is deleted
self.assertRaises(c_exc.PolicyProfileIdNotFound,
n1kv_db_v2.get_policy_profile,
db_session,
'00000000-0000-0000-0000-000000000003')
开发者ID:aignatov,项目名称:neutron,代码行数:34,代码来源:test_n1kv_plugin.py
示例7: update_ip_owner
def update_ip_owner(self, ip_owner_info):
ports_to_update = set()
port_id = ip_owner_info.get('port')
ipv4 = ip_owner_info.get('ip_address_v4')
ipv6 = ip_owner_info.get('ip_address_v6')
network_id = ip_owner_info.get('network_id')
if not port_id or (not ipv4 and not ipv6):
return ports_to_update
LOG.debug("Got IP owner update: %s", ip_owner_info)
core_plugin = self._get_plugin()
# REVISIT: just use SQLAlchemy session and models_v2.Port?
port = core_plugin.get_port(nctx.get_admin_context(), port_id)
if not port:
LOG.debug("Ignoring update for non-existent port: %s", port_id)
return ports_to_update
ports_to_update.add(port_id)
for ipa in [ipv4, ipv6]:
if not ipa:
continue
try:
session = db_api.get_session()
with session.begin(subtransactions=True):
old_owner = self.ha_ip_handler.get_port_for_ha_ipaddress(
ipa, network_id or port['network_id'], session=session)
self.ha_ip_handler.set_port_id_for_ha_ipaddress(port_id,
ipa,
session)
if old_owner and old_owner['port_id'] != port_id:
self.ha_ip_handler.delete_port_id_for_ha_ipaddress(
old_owner['port_id'], ipa, session=session)
ports_to_update.add(old_owner['port_id'])
except db_exc.DBReferenceError as dbe:
LOG.debug("Ignoring FK error for port %s: %s", port_id, dbe)
return ports_to_update
开发者ID:noironetworks,项目名称:apic-ml2-driver,代码行数:34,代码来源:port_ha_ipaddress_binding.py
示例8: get_port_and_sgs
def get_port_and_sgs(port_id):
"""Get port from database with security group info."""
LOG.debug(_("get_port_and_sgs() called for port_id %s"), port_id)
session = db_api.get_session()
sg_binding_port = sg_db.SecurityGroupPortBinding.port_id
with session.begin(subtransactions=True):
query = session.query(models_v2.Port,
sg_db.SecurityGroupPortBinding.security_group_id)
query = query.outerjoin(sg_db.SecurityGroupPortBinding,
models_v2.Port.id == sg_binding_port)
query = query.filter(models_v2.Port.id.startswith(port_id))
port_and_sgs = query.all()
if not port_and_sgs:
return
port = port_and_sgs[0][0]
plugin = manager.NeutronManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict['security_groups'] = [
sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
for ip in port['fixed_ips']]
return port_dict
开发者ID:50infivedays,项目名称:neutron,代码行数:26,代码来源:db.py
示例9: sanitize_policy_profile_table
def sanitize_policy_profile_table(self):
"""Clear policy profiles from stale VSM."""
db_session = db.get_session()
hosts = config.get_vsm_hosts()
vsm_info = db_session.query(
n1kv_models.PolicyProfile.vsm_ip).distinct()
if vsm_info is None or hosts is None:
return
vsm_ips = [vsm_ip[0] for vsm_ip in vsm_info if vsm_ip[0] not in hosts]
for vsm_ip in vsm_ips:
pprofiles = n1kv_db.get_policy_profiles_by_host(vsm_ip, db_session)
for pprofile in pprofiles:
# Do not delete profile if it is in use and if it
# is the only VSM to have it configured
pp_in_use = n1kv_db.policy_profile_in_use(pprofile['id'],
db_session)
num_vsm_using_pp = db_session.query(
n1kv_models.PolicyProfile).filter_by(
id=pprofile['id']).count()
if (not pp_in_use) or (num_vsm_using_pp > 1):
db_session.delete(pprofile)
db_session.flush()
else:
LOG.warning(_LW('Cannot delete policy profile %s '
'as it is in use.'), pprofile['id'])
开发者ID:JoelCapitao,项目名称:networking-cisco,代码行数:25,代码来源:policy_profile_service.py
示例10: remove_reserved_binding
def remove_reserved_binding(vlan_id, switch_ip, instance_id,
port_id):
"""Removes reserved binding.
This overloads port bindings to support reserved Switch binding
used to maintain the state of a switch so it can be viewed by
all other neutron processes. There's also the case of
a reserved port binding to keep switch information on a given
interface.
The values of these arguments is as follows:
:param vlan_id: 0
:param switch_ip: ip address of the switch
:param instance_id: fixed string RESERVED_NEXUS_SWITCH_DEVICE_ID_R1
: or RESERVED_NEXUS_PORT_DEVICE_ID_R1
:param port_id: switch-state of ACTIVE, RESTORE_S1, RESTORE_S2, INACTIVE
: port-expected port_id
"""
if not port_id:
LOG.warning(_LW("remove_reserved_binding called with no state"))
return
LOG.debug("remove_reserved_binding called")
session = db.get_session()
binding = _lookup_one_nexus_binding(session=session,
vlan_id=vlan_id,
switch_ip=switch_ip,
instance_id=instance_id,
port_id=port_id)
for bind in binding:
session.delete(bind)
session.flush()
return binding
开发者ID:JoelCapitao,项目名称:networking-cisco,代码行数:31,代码来源:nexus_db_v2.py
示例11: sync_tunnel_allocations
def sync_tunnel_allocations(tunnel_id_ranges):
"""Synchronize tunnel_allocations table with configured tunnel ranges."""
# determine current configured allocatable tunnels
tunnel_ids = set()
for tunnel_id_range in tunnel_id_ranges:
tun_min, tun_max = tunnel_id_range
if tun_max + 1 - tun_min > 1000000:
LOG.error(
_("Skipping unreasonable tunnel ID range " "%(tun_min)s:%(tun_max)s"),
{"tun_min": tun_min, "tun_max": tun_max},
)
else:
tunnel_ids |= set(xrange(tun_min, tun_max + 1))
session = db.get_session()
with session.begin():
# remove from table unallocated tunnels not currently allocatable
allocs = session.query(ovs_models_v2.TunnelAllocation).all()
for alloc in allocs:
try:
# see if tunnel is allocatable
tunnel_ids.remove(alloc.tunnel_id)
except KeyError:
# it's not allocatable, so check if its allocated
if not alloc.allocated:
# it's not, so remove it from table
LOG.debug(_("Removing tunnel %s from pool"), alloc.tunnel_id)
session.delete(alloc)
# add missing allocatable tunnels to table
for tunnel_id in sorted(tunnel_ids):
alloc = ovs_models_v2.TunnelAllocation(tunnel_id)
session.add(alloc)
开发者ID:kobtea,项目名称:neutron,代码行数:34,代码来源:ovs_db_v2.py
示例12: _get_mock_port_operation_context
def _get_mock_port_operation_context():
current = {'status': 'DOWN',
'binding:host_id': '',
'allowed_address_pairs': [],
'device_owner': 'fake_owner',
'binding:profile': {},
'fixed_ips': [{
'subnet_id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839'}],
'id': '83d56c48-e9b8-4dcf-b3a7-0813bb3bd940',
'security_groups': [SECURITY_GROUP],
'device_id': 'fake_device',
'name': '',
'admin_state_up': True,
'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
'tenant_id': 'test-tenant',
'binding:vif_details': {},
'binding:vnic_type': 'normal',
'binding:vif_type': 'unbound',
'mac_address': '12:34:56:78:21:b6'}
context = mock.Mock(current=current)
context._plugin.get_security_group = mock.Mock(
return_value=SECURITY_GROUP)
context._plugin.get_port = mock.Mock(return_value=current)
context._plugin_context.session = neutron_db_api.get_session()
context._network_context = mock.Mock(
_network=OpenDaylightMechanismDriverTestCase.
_get_mock_network_operation_context().current)
return context
开发者ID:heekof,项目名称:networking-odl,代码行数:28,代码来源:test_mechanism_odl_v2.py
示例13: _sync_create_ports
def _sync_create_ports(self, combined_res_info, vsm_ip):
"""
Sync ports by creating missing ones on VSM.
:param combined_res_info: tuple containing VSM and neutron information
:param vsm_ip: string representing the IP address of the VSM
"""
(vsm_vmn_dict, neutron_ports) = combined_res_info
vsm_port_uuids = set()
for (k, v) in vsm_vmn_dict.items():
port_dict = v['properties']
port_ids = set(port_dict['portId'].split(','))
vsm_port_uuids = vsm_port_uuids.union(port_ids)
for port in neutron_ports:
if port['id'] not in vsm_port_uuids:
# create these ports on VSM
network_uuid = port['network_id']
binding = n1kv_db.get_policy_binding(port['id'])
policy_profile_id = binding.profile_id
policy_profile = n1kv_db.get_policy_profile_by_uuid(
db.get_session(), policy_profile_id)
vmnetwork_name = "%s%s_%s" % (n1kv_const.VM_NETWORK_PREFIX,
policy_profile_id,
network_uuid)
try:
self.n1kvclient.create_n1kv_port(port, vmnetwork_name,
policy_profile,
vsm_ip=vsm_ip)
except n1kv_exc.VSMError as e:
LOG.warning(_LW('Sync Exception: Port creation on VSM '
'failed: %s'), e.message)
开发者ID:cisco-openstack,项目名称:networking-cisco,代码行数:31,代码来源:n1kv_sync.py
示例14: update_port_ext
def update_port_ext(port_id, commit=None, hardware_id=None,
trunked=None, session=None):
if not session:
session = db_api.get_session()
updated = False
with session.begin(subtransactions=True):
port = (session.query(models.PortExt).
get(port_id))
if commit is not None:
port.commit = commit
updated = True
if hardware_id is not None:
port.hardware_id = hardware_id
updated = True
if trunked is not None:
port.trunked = trunked
updated = True
if updated:
session.add(port)
session.flush()
return port
开发者ID:kumarom,项目名称:ironic-neutron-plugin,代码行数:28,代码来源:db.py
示例15: filter_port_ext
def filter_port_ext(session=None, **kwargs):
if not session:
session = db_api.get_session()
with session.begin(subtransactions=True):
return (session.query(models.PortExt).
filter_by(**kwargs))
开发者ID:kumarom,项目名称:ironic-neutron-plugin,代码行数:7,代码来源:db.py
示例16: get_device_details
def get_device_details(self, rpc_context, **kwargs):
"""Agent requests device details."""
agent_id = kwargs.get('agent_id')
device = kwargs.get('device')
LOG.debug("Device %(device)s details requested from %(agent_id)s",
{'device': device, 'agent_id': agent_id})
plugin = manager.NeutronManager.get_plugin()
port = plugin.get_port_from_device(device)
if port:
binding = db.get_network_binding(db_api.get_session(),
port['network_id'])
entry = {'device': device,
'physical_network': binding.physical_network,
'network_type': binding.network_type,
'segmentation_id': binding.segmentation_id,
'network_id': port['network_id'],
'port_mac': port['mac_address'],
'port_id': port['id'],
'admin_state_up': port['admin_state_up']}
if cfg.CONF.AGENT.rpc_support_old_agents:
entry['vlan_id'] = binding.segmentation_id
new_status = (q_const.PORT_STATUS_ACTIVE if port['admin_state_up']
else q_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
db.set_port_status(port['id'], new_status)
else:
entry = {'device': device}
LOG.debug("%s can not be found in database", device)
return entry
开发者ID:gabriel-samfira,项目名称:neutron,代码行数:29,代码来源:rpc_callbacks.py
示例17: setUp
def setUp(self):
super(VlanAllocationsTest, self).setUp()
db.configure_db()
self.session = db.get_session()
self.net_p = _create_test_network_profile_if_not_there(self.session)
n1kv_db_v2.sync_vlan_allocations(self.session, self.net_p)
self.addCleanup(db.clear_db)
开发者ID:PFZheng,项目名称:neutron,代码行数:7,代码来源:test_n1kv_db.py
示例18: get_port
def get_port(port_id):
session = db.get_session()
try:
port = session.query(models_v2.Port).filter_by(id=port_id).one()
except exc.NoResultFound:
port = None
return port
开发者ID:kobtea,项目名称:neutron,代码行数:7,代码来源:ovs_db_v2.py
示例19: sync_network_states
def sync_network_states(network_vlan_ranges):
"""Synchronize network_states table with current configured VLAN ranges."""
session = db.get_session()
with session.begin():
# get existing allocations for all physical networks
allocations = dict()
entries = session.query(mlnx_models_v2.SegmentationIdAllocation).all()
for entry in entries:
allocations.setdefault(entry.physical_network, set()).add(entry)
# process vlan ranges for each configured physical network
for physical_network, vlan_ranges in network_vlan_ranges.iteritems():
# determine current configured allocatable vlans for this
# physical network
vlan_ids = set()
for vlan_range in vlan_ranges:
vlan_ids |= set(moves.xrange(vlan_range[0], vlan_range[1] + 1))
# remove from table unallocated vlans not currently allocatable
_remove_non_allocatable_vlans(session, allocations, physical_network, vlan_ids)
# add missing allocatable vlans to table
_add_missing_allocatable_vlans(session, physical_network, vlan_ids)
# remove from table unallocated vlans for any unconfigured physical
# networks
_remove_unconfigured_vlans(session, allocations)
开发者ID:armando-migliaccio,项目名称:neutron-1,代码行数:28,代码来源:mlnx_db_v2.py
示例20: add_tunnel_endpoint
def add_tunnel_endpoint(ip, max_retries=10):
"""Return the endpoint of the given IP address or generate a new one."""
# NOTE(rpodolyaka): generation of a new tunnel endpoint must be put into a
# repeatedly executed transactional block to ensure it
# doesn't conflict with any other concurrently executed
# DB transactions in spite of the specified transactions
# isolation level value
for i in xrange(max_retries):
LOG.debug(_("Adding a tunnel endpoint for %s"), ip)
try:
session = db.get_session()
with session.begin(subtransactions=True):
tunnel = (
session.query(ovs_models_v2.TunnelEndpoint).filter_by(ip_address=ip).with_lockmode("update").first()
)
if tunnel is None:
tunnel_id = _generate_tunnel_id(session)
tunnel = ovs_models_v2.TunnelEndpoint(ip, tunnel_id)
session.add(tunnel)
return tunnel
except db_exc.DBDuplicateEntry:
# a concurrent transaction has been commited, try again
LOG.debug(
_(
"Adding a tunnel endpoint failed due to a concurrent"
"transaction had been commited (%s attempts left)"
),
max_retries - (i + 1),
)
raise q_exc.NeutronException(message="Unable to generate a new tunnel id")
开发者ID:kobtea,项目名称:neutron,代码行数:34,代码来源:ovs_db_v2.py
注:本文中的neutron.db.api.get_session函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论