本文整理汇总了Python中neutron.plugins.nec.db.api.get_portinfo函数的典型用法代码示例。如果您正苦于以下问题:Python get_portinfo函数的具体用法?Python get_portinfo怎么用?Python get_portinfo使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了get_portinfo函数的19个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: _test_portinfo_change
def _test_portinfo_change(self, portinfo_change_first=True):
with self.port() as port:
port_id = port['port']['id']
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
portinfo = {'id': port_id, 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 0)
self.assertEqual(ndb.get_portinfo(self.context.session,
port_id).port_no, 123)
if portinfo_change_first:
portinfo = {'id': port_id, 'port_no': 456}
self.rpcapi_update_ports(added=[portinfo])
# OFC port is recreated.
self.assertEqual(self.ofc.create_ofc_port.call_count, 2)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertEqual(ndb.get_portinfo(self.context.session,
port_id).port_no, 456)
if not portinfo_change_first:
# The port is expected to delete when exiting with-clause.
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
portinfo = {'id': port_id, 'port_no': 456}
self.rpcapi_update_ports(added=[portinfo])
# No OFC operations are expected.
self.assertEqual(self.ofc.create_ofc_port.call_count, 1)
self.assertEqual(self.ofc.delete_ofc_port.call_count, 1)
self.assertIsNone(ndb.get_portinfo(self.context.session, port_id))
开发者ID:Taejun,项目名称:neutron,代码行数:32,代码来源:test_nec_plugin.py
示例2: update_ports
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.
Expected input format is:
{'topic': 'q-agent-notifier',
'agent_id': 'nec-q-agent.' + <hostname>,
'datapath_id': <datapath_id of br-int on remote host>,
'port_added': [<new PortInfo>,...],
'port_removed': [<removed Port ID>,...]}
"""
LOG.debug(_("NECPluginV2RPCCallbacks.update_ports() called, " "kwargs=%s ."), kwargs)
datapath_id = kwargs["datapath_id"]
session = rpc_context.session
for p in kwargs.get("port_added", []):
id = p["id"]
portinfo = ndb.get_portinfo(session, id)
if portinfo:
if necutils.cmp_dpid(portinfo.datapath_id, datapath_id) and portinfo.port_no == p["port_no"]:
LOG.debug(_("update_ports(): ignore unchanged portinfo in " "port_added message (port_id=%s)."), id)
continue
ndb.del_portinfo(session, id)
port = self._get_port(rpc_context, id)
if port:
ndb.add_portinfo(session, id, datapath_id, p["port_no"], mac=p.get("mac", ""))
# NOTE: Make sure that packet filters on this port exist while
# the port is active to avoid unexpected packet transfer.
if portinfo:
self.plugin.deactivate_port(rpc_context, port)
self.plugin.deactivate_packet_filters_by_port(rpc_context, id)
self.plugin.activate_packet_filters_by_port(rpc_context, id)
self.plugin.activate_port_if_ready(rpc_context, port)
for id in kwargs.get("port_removed", []):
portinfo = ndb.get_portinfo(session, id)
if not portinfo:
LOG.debug(
_(
"update_ports(): ignore port_removed message "
"due to portinfo for port_id=%s was not "
"registered"
),
id,
)
continue
if not necutils.cmp_dpid(portinfo.datapath_id, datapath_id):
LOG.debug(
_(
"update_ports(): ignore port_removed message "
"received from different host "
"(registered_datapath_id=%(registered)s, "
"received_datapath_id=%(received)s)."
),
{"registered": portinfo.datapath_id, "received": datapath_id},
)
continue
ndb.del_portinfo(session, id)
port = self._get_port(rpc_context, id)
if port:
self.plugin.deactivate_port(rpc_context, port)
self.plugin.deactivate_packet_filters_by_port(rpc_context, id)
开发者ID:raceli,项目名称:neutron,代码行数:59,代码来源:nec_plugin.py
示例3: testf_del_portinfo
def testf_del_portinfo(self):
"""test delete portinfo."""
i, d, p, v, m, n = self.get_portinfo_random_params()
ndb.add_portinfo(self.session, i, d, p, v, m)
portinfo = ndb.get_portinfo(self.session, i)
self.assertEqual(portinfo.id, i)
ndb.del_portinfo(self.session, i)
portinfo_none = ndb.get_portinfo(self.session, i)
self.assertEqual(None, portinfo_none)
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:9,代码来源:test_db.py
示例4: testf_del_portinfo
def testf_del_portinfo(self):
"""test delete portinfo."""
with self.portinfo_random_params() as params:
self._add_portinfo(self.session, params)
portinfo = ndb.get_portinfo(self.session, params['port_id'])
self.assertEqual(portinfo.id, params['port_id'])
ndb.del_portinfo(self.session, params['port_id'])
portinfo_none = ndb.get_portinfo(self.session, params['port_id'])
self.assertIsNone(portinfo_none)
开发者ID:CiscoSystems,项目名称:neutron,代码行数:9,代码来源:test_db.py
示例5: teste_get_portinfo
def teste_get_portinfo(self):
"""test get portinfo."""
with self.portinfo_random_params() as params:
self._add_portinfo(self.session, params)
portinfo = ndb.get_portinfo(self.session, params['port_id'])
self._compare_portinfo(portinfo, params)
nonexist_id = uuidutils.generate_uuid()
portinfo_none = ndb.get_portinfo(self.session, nonexist_id)
self.assertIsNone(portinfo_none)
开发者ID:CiscoSystems,项目名称:neutron,代码行数:10,代码来源:test_db.py
示例6: teste_get_portinfo
def teste_get_portinfo(self):
"""test get portinfo."""
i, d, p, v, m, n = self.get_portinfo_random_params()
ndb.add_portinfo(self.session, i, d, p, v, m)
portinfo = ndb.get_portinfo(self.session, i)
self.assertEqual(portinfo.id, i)
self.assertEqual(portinfo.datapath_id, d)
self.assertEqual(portinfo.port_no, p)
self.assertEqual(portinfo.vlan_id, v)
self.assertEqual(portinfo.mac, m)
portinfo_none = ndb.get_portinfo(self.session, n)
self.assertEqual(None, portinfo_none)
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:13,代码来源:test_db.py
示例7: test_portinfo_added_unknown_port
def test_portinfo_added_unknown_port(self):
portinfo = {'id': 'dummy-p1', 'port_no': 123}
self.rpcapi_update_ports(added=[portinfo])
self.assertIsNone(ndb.get_portinfo(self.context.session,
'dummy-p1'))
self.assertEqual(self.ofc.exists_ofc_port.call_count, 0)
self.assertEqual(self.ofc.create_ofc_port.call_count, 0)
开发者ID:Taejun,项目名称:neutron,代码行数:7,代码来源:test_nec_plugin.py
示例8: _process_portbindings_portinfo_update
def _process_portbindings_portinfo_update(self, context, port_data, port):
"""Update portinfo according to bindings:profile in update_port().
:param context: neutron api request context
:param port_data: port attributes passed in PUT request
:param port: port attributes to be returned
:returns: 'ADD', 'MOD', 'DEL' or None
"""
if portbindings.PROFILE not in port_data:
return
profile = port_data.get(portbindings.PROFILE)
# If binding:profile is None or an empty dict,
# it means binding:.profile needs to be cleared.
# TODO(amotoki): Allow Make None in binding:profile in
# the API layer. See LP bug #1220011.
profile_set = attrs.is_attr_set(profile) and profile
cur_portinfo = ndb.get_portinfo(context.session, port['id'])
if profile_set:
portinfo = self._validate_portinfo(profile)
portinfo_changed = 'ADD'
if cur_portinfo:
if (portinfo['datapath_id'] == cur_portinfo.datapath_id and
portinfo['port_no'] == cur_portinfo.port_no):
return
ndb.del_portinfo(context.session, port['id'])
portinfo_changed = 'MOD'
portinfo['mac'] = port['mac_address']
ndb.add_portinfo(context.session, port['id'], **portinfo)
elif cur_portinfo:
portinfo_changed = 'DEL'
portinfo = None
ndb.del_portinfo(context.session, port['id'])
self._extend_port_dict_binding_portinfo(port, portinfo)
return portinfo_changed
开发者ID:fyafighter,项目名称:neutron,代码行数:34,代码来源:nec_plugin.py
示例9: update_ports
def update_ports(self, rpc_context, **kwargs):
"""Update ports' information and activate/deavtivate them.
Expected input format is:
{'topic': 'q-agent-notifier',
'agent_id': 'nec-q-agent.' + <hostname>,
'datapath_id': <datapath_id of br-int on remote host>,
'port_added': [<new PortInfo>,...],
'port_removed': [<removed Port ID>,...]}
"""
LOG.debug(_("NECPluginV2RPCCallbacks.update_ports() called, "
"kwargs=%s ."), kwargs)
datapath_id = kwargs['datapath_id']
session = rpc_context.session
for p in kwargs.get('port_added', []):
id = p['id']
portinfo = ndb.get_portinfo(session, id)
if portinfo:
ndb.del_portinfo(session, id)
ndb.add_portinfo(session, id, datapath_id, p['port_no'],
mac=p.get('mac', ''))
port = self._get_port(rpc_context, id)
if port:
if portinfo:
self.plugin.deactivate_port(rpc_context, port)
self.plugin.activate_port_if_ready(rpc_context, port)
for id in kwargs.get('port_removed', []):
portinfo = ndb.get_portinfo(session, id)
if not portinfo:
LOG.debug(_("update_ports(): ignore port_removed message "
"due to portinfo for port_id=%s was not "
"registered"), id)
continue
if portinfo.datapath_id != datapath_id:
LOG.debug(_("update_ports(): ignore port_removed message "
"received from different host "
"(registered_datapath_id=%(registered)s, "
"received_datapath_id=%(received)s)."),
{'registered': portinfo.datapath_id,
'received': datapath_id})
continue
ndb.del_portinfo(session, id)
port = self._get_port(rpc_context, id)
if port:
self.plugin.deactivate_port(rpc_context, port)
开发者ID:matrohon,项目名称:quantum,代码行数:45,代码来源:nec_plugin.py
示例10: activate_port_if_ready
def activate_port_if_ready(self, context, port, network=None):
"""Activate port by creating port on OFC if ready.
Activate port and packet_filters associated with the port.
Conditions to activate port on OFC are:
* port admin_state is UP
* network admin_state is UP
* portinfo are available (to identify port on OFC)
"""
if not network:
network = super(NECPluginV2, self).get_network(context,
port['network_id'])
port_status = OperationalStatus.ACTIVE
if not port['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"port.admin_state_up is False."))
port_status = OperationalStatus.DOWN
elif not network['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"network.admin_state_up is False."))
port_status = OperationalStatus.DOWN
elif not ndb.get_portinfo(context.session, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"no portinfo for this port."))
port_status = OperationalStatus.DOWN
# activate packet_filters before creating port on OFC.
if self.packet_filter_enabled:
if port_status is OperationalStatus.ACTIVE:
filters = dict(in_port=[port['id']],
status=[OperationalStatus.DOWN],
admin_state_up=[True])
pfs = (super(NECPluginV2, self).
get_packet_filters(context, filters=filters))
for pf in pfs:
self._activate_packet_filter_if_ready(context, pf,
network=network,
in_port=port)
if port_status in [OperationalStatus.ACTIVE]:
if self.ofc.exists_ofc_port(context, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"ofc_port already exists."))
else:
try:
self.ofc.create_ofc_port(context, port['id'], port)
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
reason = _("create_ofc_port() failed due to %s") % exc
LOG.error(reason)
port_status = OperationalStatus.ERROR
if port_status is not port['status']:
self._update_resource_status(context, "port", port['id'],
port_status)
开发者ID:matrohon,项目名称:quantum,代码行数:55,代码来源:nec_plugin.py
示例11: create_ofc_port
def create_ofc_port(self, context, port_id, port):
ofc_net_id = self._get_ofc_id(context, "ofc_network",
port['network_id'])
ofc_net_id = self.driver.convert_ofc_network_id(
context, ofc_net_id, port['tenant_id'])
portinfo = ndb.get_portinfo(context.session, port_id)
if not portinfo:
raise nexc.PortInfoNotFound(id=port_id)
ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id)
self._add_ofc_item(context, "ofc_port", port_id, ofc_port_id)
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:11,代码来源:ofc_manager.py
示例12: _activate_packet_filter_if_ready
def _activate_packet_filter_if_ready(self, context, packet_filter,
network=None, in_port=None):
"""Activate packet_filter by creating filter on OFC if ready.
Conditions to create packet_filter on OFC are:
* packet_filter admin_state is UP
* network admin_state is UP
* (if 'in_port' is specified) portinfo is available
"""
net_id = packet_filter['network_id']
if not network:
network = super(NECPluginV2, self).get_network(context, net_id)
in_port_id = packet_filter.get("in_port")
if in_port_id and not in_port:
in_port = super(NECPluginV2, self).get_port(context, in_port_id)
pf_status = OperationalStatus.ACTIVE
if not packet_filter['admin_state_up']:
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"packet_filter.admin_state_up is False."))
pf_status = OperationalStatus.DOWN
elif not network['admin_state_up']:
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"network.admin_state_up is False."))
pf_status = OperationalStatus.DOWN
elif in_port_id and in_port_id is in_port.get('id'):
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"invalid in_port_id."))
pf_status = OperationalStatus.DOWN
elif in_port_id and not ndb.get_portinfo(context.session, in_port_id):
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"no portinfo for in_port."))
pf_status = OperationalStatus.DOWN
if pf_status in [OperationalStatus.ACTIVE]:
if self.ofc.exists_ofc_packet_filter(context, packet_filter['id']):
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"ofc_packet_filter already exists."))
else:
try:
(self.ofc.
create_ofc_packet_filter(context,
packet_filter['id'],
packet_filter))
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
reason = _("create_ofc_packet_filter() failed due to "
"%s") % exc
LOG.error(reason)
pf_status = OperationalStatus.ERROR
if pf_status is not packet_filter['status']:
self._update_resource_status(context, "packet_filter",
packet_filter['id'], pf_status)
开发者ID:matrohon,项目名称:quantum,代码行数:53,代码来源:nec_plugin.py
示例13: create_ofc_packet_filter
def create_ofc_packet_filter(self, context, filter_id, filter_dict):
ofc_net_id = self._get_ofc_id(context, "ofc_network",
filter_dict['network_id'])
ofc_net_id = self.driver.convert_ofc_network_id(
context, ofc_net_id, filter_dict['tenant_id'])
in_port_id = filter_dict.get('in_port')
portinfo = None
if in_port_id:
portinfo = ndb.get_portinfo(context.session, in_port_id)
if not portinfo:
raise nexc.PortInfoNotFound(id=in_port_id)
ofc_pf_id = self.driver.create_filter(ofc_net_id,
filter_dict, portinfo, filter_id)
self._add_ofc_item(context, "ofc_packet_filter", filter_id, ofc_pf_id)
开发者ID:Brocade-OpenSource,项目名称:OpenStack-DNRM-Neutron,代码行数:15,代码来源:ofc_manager.py
示例14: create_ofc_port
def create_ofc_port(self, context, port_id, port):
ofc_net_id = self._get_ofc_id(context, "ofc_network",
port['network_id'])
portinfo = ndb.get_portinfo(context.session, port_id)
if not portinfo:
raise nexc.PortInfoNotFound(id=port_id)
# Associate packet filters
filters = self.plugin.get_packet_filters_for_port(context, port)
if filters is not None:
params = {'filters': filters}
else:
params = {}
ofc_port_id = self.driver.create_port(ofc_net_id, portinfo, port_id,
**params)
self._add_ofc_item(context, "ofc_port", port_id, ofc_port_id)
开发者ID:50infivedays,项目名称:neutron,代码行数:17,代码来源:ofc_manager.py
示例15: create_ofc_packet_filter
def create_ofc_packet_filter(self, context, filter_id, filter_dict):
ofc_net_id = self._get_ofc_id(context, "ofc_network",
filter_dict['network_id'])
in_port_id = filter_dict.get('in_port')
portinfo = None
if in_port_id:
portinfo = ndb.get_portinfo(context.session, in_port_id)
if not portinfo:
raise nexc.PortInfoNotFound(id=in_port_id)
# Collect ports to be associated with the filter
apply_ports = ndb.get_active_ports_on_ofc(
context, filter_dict['network_id'], in_port_id)
ofc_pf_id = self.driver.create_filter(ofc_net_id,
filter_dict, portinfo, filter_id,
apply_ports)
self._add_ofc_item(context, "ofc_packet_filter", filter_id, ofc_pf_id)
开发者ID:50infivedays,项目名称:neutron,代码行数:17,代码来源:ofc_manager.py
示例16: activate_port_if_ready
def activate_port_if_ready(self, context, port, network=None):
"""Activate port by creating port on OFC if ready.
Conditions to activate port on OFC are:
* port admin_state is UP
* network admin_state is UP
* portinfo are available (to identify port on OFC)
"""
if not network:
network = super(NECPluginV2, self).get_network(context,
port['network_id'])
if not port['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"port.admin_state_up is False."))
return port
elif not network['admin_state_up']:
LOG.debug(_("activate_port_if_ready(): skip, "
"network.admin_state_up is False."))
return port
elif not ndb.get_portinfo(context.session, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"no portinfo for this port."))
return port
elif self.ofc.exists_ofc_port(context, port['id']):
LOG.debug(_("activate_port_if_ready(): skip, "
"ofc_port already exists."))
return port
try:
self.ofc.create_ofc_port(context, port['id'], port)
port_status = const.PORT_STATUS_ACTIVE
except (nexc.OFCException, nexc.OFCConsistencyBroken) as exc:
LOG.error(_("create_ofc_port() failed due to %s"), exc)
port_status = const.PORT_STATUS_ERROR
if port_status is not port['status']:
self._update_resource_status(context, "port", port['id'],
port_status)
port['status'] = port_status
return port
开发者ID:ChengZuo,项目名称:neutron,代码行数:42,代码来源:nec_plugin.py
示例17: activate_packet_filter_if_ready
def activate_packet_filter_if_ready(self, context, packet_filter):
"""Activate packet_filter by creating filter on OFC if ready.
Conditions to create packet_filter on OFC are:
* packet_filter admin_state is UP
* (if 'in_port' is specified) portinfo is available
"""
LOG.debug(_("activate_packet_filter_if_ready() called, "
"packet_filter=%s."), packet_filter)
pf_id = packet_filter['id']
in_port_id = packet_filter.get('in_port')
current = packet_filter['status']
pf_status = current
if not packet_filter['admin_state_up']:
LOG.debug(_("activate_packet_filter_if_ready(): skip pf_id=%s, "
"packet_filter.admin_state_up is False."), pf_id)
elif in_port_id and not ndb.get_portinfo(context.session, in_port_id):
LOG.debug(_("activate_packet_filter_if_ready(): skip "
"pf_id=%s, no portinfo for the in_port."), pf_id)
elif self.ofc.exists_ofc_packet_filter(context, packet_filter['id']):
LOG.debug(_("_activate_packet_filter_if_ready(): skip, "
"ofc_packet_filter already exists."))
else:
LOG.debug(_("activate_packet_filter_if_ready(): create "
"packet_filter id=%s on OFC."), pf_id)
try:
self.ofc.create_ofc_packet_filter(context, pf_id,
packet_filter)
pf_status = pf_db.PF_STATUS_ACTIVE
except (nexc.OFCException, nexc.OFCMappingNotFound) as exc:
LOG.error(_("Failed to create packet_filter id=%(id)s on "
"OFC: %(exc)s"), {'id': pf_id, 'exc': str(exc)})
pf_status = pf_db.PF_STATUS_ERROR
if pf_status != current:
self._update_resource_status(context, "packet_filter", pf_id,
pf_status)
packet_filter.update({'status': pf_status})
return packet_filter
开发者ID:kavonm,项目名称:neutron,代码行数:42,代码来源:packet_filter.py
示例18: create_filter
def create_filter(self, context, filter_dict, filter_id=None):
ofc_network_id = ndb.get_ofc_id(context.session, "ofc_network",
filter_dict['network_id'])
# Prepare portinfo
in_port_id = filter_dict.get('in_port')
if in_port_id:
portinfo = ndb.get_portinfo(context.session, in_port_id)
if not portinfo:
raise nexc.PortInfoNotFound(id=in_port_id)
else:
portinfo = None
# Prepare filter body
if filter_dict['action'].upper() in ["ACCEPT", "ALLOW"]:
ofc_action = "ALLOW"
elif filter_dict['action'].upper() in ["DROP", "DENY"]:
ofc_action = "DENY"
body = {'priority': filter_dict['priority'],
'slice': self._get_network_id(ofc_network_id),
'action': ofc_action}
ofp_wildcards = ["dl_vlan", "dl_vlan_pcp", "nw_tos"]
if portinfo:
body['in_datapath_id'] = portinfo.datapath_id
body['in_port'] = portinfo.port_no
else:
body['wildcards'] = "in_datapath_id"
ofp_wildcards.append("in_port")
if filter_dict['src_mac']:
body['dl_src'] = filter_dict['src_mac']
else:
ofp_wildcards.append("dl_src")
if filter_dict['dst_mac']:
body['dl_dst'] = filter_dict['dst_mac']
else:
ofp_wildcards.append("dl_dst")
if filter_dict['src_cidr']:
body['nw_src'] = filter_dict['src_cidr']
else:
ofp_wildcards.append("nw_src:32")
if filter_dict['dst_cidr']:
body['nw_dst'] = filter_dict['dst_cidr']
else:
ofp_wildcards.append("nw_dst:32")
if filter_dict['protocol']:
if filter_dict['protocol'].upper() == "ICMP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(1)
elif filter_dict['protocol'].upper() == "TCP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(6)
elif filter_dict['protocol'].upper() == "UDP":
body['dl_type'] = "0x800"
body['nw_proto'] = hex(17)
elif filter_dict['protocol'].upper() == "ARP":
body['dl_type'] = "0x806"
ofp_wildcards.append("nw_proto")
else:
body['nw_proto'] = filter_dict['protocol']
else:
ofp_wildcards.append("nw_proto")
if 'dl_type' in body:
pass
elif filter_dict['eth_type']:
body['dl_type'] = filter_dict['eth_type']
else:
ofp_wildcards.append("dl_type")
if filter_dict['src_port']:
body['tp_src'] = hex(filter_dict['src_port'])
else:
ofp_wildcards.append("tp_src")
if filter_dict['dst_port']:
body['tp_dst'] = hex(filter_dict['dst_port'])
else:
ofp_wildcards.append("tp_dst")
ofc_filter_id = filter_id or uuidutils.generate_uuid()
body['id'] = ofc_filter_id
body['ofp_wildcards'] = ','.join(ofp_wildcards)
self.client.post(self.filters_path, body=body)
return self.filter_path % ofc_filter_id
开发者ID:CiscoSystems,项目名称:neutron,代码行数:92,代码来源:trema.py
示例19: _get_portinfo
def _get_portinfo(self, port_id):
return ndb.get_portinfo(self.context.session, port_id)
开发者ID:Taejun,项目名称:neutron,代码行数:2,代码来源:test_nec_plugin.py
注:本文中的neutron.plugins.nec.db.api.get_portinfo函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论