本文整理汇总了Python中neutron_lib.callbacks.registry.subscribe函数的典型用法代码示例。如果您正苦于以下问题:Python subscribe函数的具体用法?Python subscribe怎么用?Python subscribe使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了subscribe函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: register
def register(callback, agent_type):
"""Subscribe callback to init event for the specified agent.
:param agent_type: an agent type as defined in neutron_lib.constants.
:param callback: a callback that can process the agent init event.
"""
registry.subscribe(callback, agent_type, events.AFTER_INIT)
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:capabilities.py
示例2: test__flood_cache_for_query_pulls_once
def test__flood_cache_for_query_pulls_once(self):
resources = [OVOLikeThing(66), OVOLikeThing(67)]
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)
self._pullmock.bulk_pull.side_effect = [
resources,
[resources[0]],
[resources[1]],
[resources[1]]
]
self.rcache._flood_cache_for_query('goose', id=(66, 67),
name=('a', 'b'))
self._pullmock.bulk_pull.assert_called_once_with(
mock.ANY, 'goose',
filter_kwargs={'id': (66, 67), 'name': ('a', 'b')})
self._pullmock.bulk_pull.reset_mock()
self.rcache._flood_cache_for_query('goose', id=(66, ), name=('a', ))
self.assertFalse(self._pullmock.called)
self.rcache._flood_cache_for_query('goose', id=(67, ), name=('b', ))
self.assertFalse(self._pullmock.called)
# querying by just ID should trigger a new call since ID+name is a more
# specific query
self.rcache._flood_cache_for_query('goose', id=(67, ))
self._pullmock.bulk_pull.assert_called_once_with(
mock.ANY, 'goose', filter_kwargs={'id': (67, )})
self.assertItemsEqual(
resources, [rec['updated'] for rec in received_kw])
开发者ID:cubeek,项目名称:neutron,代码行数:33,代码来源:test_resource_cache.py
示例3: test_treat_devices_removed_notify
def test_treat_devices_removed_notify(self):
handler = mock.Mock()
registry.subscribe(handler, resources.PORT_DEVICE, events.AFTER_DELETE)
devices = [DEVICE_1]
self.agent.treat_devices_removed(devices)
handler.assert_called_once_with(mock.ANY, mock.ANY, self.agent,
payload=mock.ANY)
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:test__common_agent.py
示例4: setUp
def setUp(self):
super(TestStatusBarriers, self).setUp()
self.setup_coreplugin(CORE_PLUGIN)
self.ctx = n_ctx.get_admin_context()
self.provisioned = mock.Mock()
self.port = self._make_port()
registry.subscribe(self.provisioned, resources.PORT,
pb.PROVISIONING_COMPLETE)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:8,代码来源:test_provisioning_blocks.py
示例5: __init__
def __init__(self, rcache):
self.rcache = rcache
registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup',
events.AFTER_DELETE)
registry.subscribe(self._add_child_sg_rules, 'SecurityGroup',
events.AFTER_UPDATE)
# set this attr so agent can adjust the timeout of the client
self.client = resources_rpc.ResourcesPullRpcApi().client
开发者ID:igordcard,项目名称:neutron,代码行数:8,代码来源:securitygroups_rpc.py
示例6: register
def register():
"""Register the driver."""
global DRIVER
DRIVER = OVSDriver.create()
# To set the bridge_name in a parent port's vif_details.
registry.subscribe(vif_details_bridge_name_handler,
agent_consts.OVS_BRIDGE_NAME,
events.BEFORE_READ)
LOG.debug('Open vSwitch trunk driver registered')
开发者ID:cubeek,项目名称:neutron,代码行数:9,代码来源:driver.py
示例7: _unsubscribe_callback_events
def _unsubscribe_callback_events(self):
# unsubscribe the callback that should be called on all plugins
# other that NSX-T.
registry.unsubscribe_all(
l3_db.L3_NAT_dbonly_mixin._prevent_l3_port_delete_callback)
# Instead we will subscribe our internal callback.
registry.subscribe(self._prevent_l3_port_delete_callback,
resources.PORT, events.BEFORE_DELETE)
开发者ID:openstack,项目名称:vmware-nsx,代码行数:9,代码来源:plugin.py
示例8: initialize
def initialize(self):
super(AristaHAScaleSimulationDriver, self).initialize()
self.context = context.get_admin_context_without_session()
# Subscribe to port updates to force ports to active after binding
# since a fake virt driver is being used, so OVS will never see
# the libvirt interfaces come up, triggering the OVS provisioning
self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
registry.subscribe(self._port_update_callback,
resources.PORT, events.AFTER_UPDATE)
开发者ID:openstack,项目名称:networking-arista,代码行数:9,代码来源:mechanism_ha_simulator.py
示例9: test_security_group_precommit_create_event_fail
def test_security_group_precommit_create_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP,
events.PRECOMMIT_CREATE)
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback:
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group,
self.ctx, FAKE_SECGROUP)
self.assertTrue(mock_rollback.called)
开发者ID:cubeek,项目名称:neutron,代码行数:9,代码来源:test_securitygroups_db.py
示例10: __init__
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
self._resources_to_push = {}
self._worker_pool = eventlet.GreenPool()
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:9,代码来源:ovo_rpc.py
示例11: register_legacy_notification_callbacks
def register_legacy_notification_callbacks(self, legacy_interface):
"""Emulates the server-side notifications from ml2 AgentNotifierApi.
legacy_interface is an object with 'delete'/'update' methods for
core resources.
"""
self._legacy_interface = legacy_interface
for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
for r in (resources.PORT, resources.NETWORK):
registry.subscribe(self._legacy_notifier, r, e)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:10,代码来源:rpc.py
示例12: __new__
def __new__(cls, *args, **kwargs):
# NOTE(kevinbenton): we subscribe on object construction because
# the tests blow away the callback manager for each run
new = super(AutoAllocatedTopologyMixin, cls).__new__(cls, *args,
**kwargs)
registry.subscribe(_ensure_external_network_default_value_callback,
resources.NETWORK, events.PRECOMMIT_UPDATE)
registry.subscribe(_ensure_external_network_default_value_callback,
resources.NETWORK, events.PRECOMMIT_CREATE)
return new
开发者ID:cubeek,项目名称:neutron,代码行数:10,代码来源:db.py
示例13: test_record_resource_delete_ignores_dups
def test_record_resource_delete_ignores_dups(self):
received_kw = []
receiver = lambda *a, **k: received_kw.append(k)
registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
self.rcache.record_resource_delete(self.ctx, 'goose', 3)
self.assertEqual(1, len(received_kw))
self.rcache.record_resource_delete(self.ctx, 'goose', 4)
self.assertEqual(2, len(received_kw))
self.rcache.record_resource_delete(self.ctx, 'goose', 3)
self.assertEqual(2, len(received_kw))
开发者ID:cubeek,项目名称:neutron,代码行数:10,代码来源:test_resource_cache.py
示例14: test_adding_component_for_new_resource_type
def test_adding_component_for_new_resource_type(self):
provisioned = mock.Mock()
registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE)
net = self._make_net()
# expect failed because the model was not registered for the type
with testtools.ExpectedException(RuntimeError):
pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
pb.add_model_for_resource('NETWORK', models_v2.Network)
pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent')
self.assertTrue(provisioned.called)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:11,代码来源:test_provisioning_blocks.py
示例15: initialize
def initialize(self):
self._nsxv = vcns_driver.VcnsDriver(None)
self.init_profile_id()
self.init_security_group()
self.init_security_group_in_profile()
# register an event to the end of the init to handle the first upgrade
if self._is_new_security_group:
registry.subscribe(self.init_complete,
resources.PROCESS,
events.BEFORE_SPAWN)
开发者ID:openstack,项目名称:vmware-nsx,代码行数:11,代码来源:driver.py
示例16: test_start_all_workers
def test_start_all_workers(self):
cfg.CONF.set_override('api_workers', 0)
mock.patch.object(service, '_get_rpc_workers').start()
mock.patch.object(service, '_get_plugins_workers').start()
mock.patch.object(service, '_start_workers').start()
callback = mock.Mock()
registry.subscribe(callback, resources.PROCESS, events.AFTER_SPAWN)
service.start_all_workers()
callback.assert_called_once_with(
resources.PROCESS, events.AFTER_SPAWN, mock.ANY)
开发者ID:eayunstack,项目名称:neutron,代码行数:11,代码来源:test_service.py
示例17: test__set_bridge_name_notify
def test__set_bridge_name_notify(self):
def fake_callback(resource, event, trigger, payload=None):
trigger('fake-br-name')
registry.subscribe(fake_callback, a_const.OVS_BRIDGE_NAME,
events.BEFORE_READ)
fake_vif_details = {}
self.driver._set_bridge_name('foo', fake_vif_details)
self.assertEqual(
'fake-br-name',
fake_vif_details.get(portbindings.VIF_DETAILS_BRIDGE_NAME, ''))
开发者ID:cubeek,项目名称:neutron,代码行数:12,代码来源:test_mech_openvswitch.py
示例18: register
def register(self, resource, event, trigger, payload=None):
super(NsxV3TrunkDriver, self).register(
resource, event, trigger, payload=payload)
self._handler = NsxV3TrunkHandler(self.plugin_driver)
for event in (events.AFTER_CREATE, events.AFTER_DELETE):
registry.subscribe(self._handler.trunk_event,
resources.TRUNK,
event)
registry.subscribe(self._handler.subport_event,
resources.SUBPORTS,
event)
LOG.debug("VMware NSXv3 trunk driver initialized.")
开发者ID:openstack,项目名称:vmware-nsx,代码行数:12,代码来源:driver.py
示例19: test_security_groups_created_outside_transaction
def test_security_groups_created_outside_transaction(self):
def record_after_state(r, e, t, context, *args, **kwargs):
self.was_active = context.session.is_active
registry.subscribe(record_after_state, resources.SECURITY_GROUP,
events.AFTER_CREATE)
with self.subnet() as s:
self.assertFalse(self.was_active)
self._delete(
'security-groups',
self._list('security-groups')['security_groups'][0]['id'])
with self.port(subnet=s):
self.assertFalse(self.was_active)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:13,代码来源:test_security_group.py
示例20: test_security_group_rule_precommit_create_event_fail
def test_security_group_rule_precommit_create_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP_RULE,
events.PRECOMMIT_CREATE)
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group_rule,
self.ctx, fake_rule)
self.assertTrue(mock_rollback.called)
开发者ID:cubeek,项目名称:neutron,代码行数:13,代码来源:test_securitygroups_db.py
注:本文中的neutron_lib.callbacks.registry.subscribe函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论