• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python registry.subscribe函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron_lib.callbacks.registry.subscribe函数的典型用法代码示例。如果您正苦于以下问题:Python subscribe函数的具体用法?Python subscribe怎么用?Python subscribe使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了subscribe函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: register

def register(callback, agent_type):
    """Subscribe callback to init event for the specified agent.

    :param agent_type: an agent type as defined in neutron_lib.constants.
    :param callback: a callback that can process the agent init event.
    """
    registry.subscribe(callback, agent_type, events.AFTER_INIT)
开发者ID:cubeek,项目名称:neutron,代码行数:7,代码来源:capabilities.py


示例2: test__flood_cache_for_query_pulls_once

    def test__flood_cache_for_query_pulls_once(self):
        resources = [OVOLikeThing(66), OVOLikeThing(67)]
        received_kw = []
        receiver = lambda *a, **k: received_kw.append(k)
        registry.subscribe(receiver, 'goose', events.AFTER_UPDATE)

        self._pullmock.bulk_pull.side_effect = [
            resources,
            [resources[0]],
            [resources[1]],
            [resources[1]]
        ]

        self.rcache._flood_cache_for_query('goose', id=(66, 67),
                                           name=('a', 'b'))
        self._pullmock.bulk_pull.assert_called_once_with(
            mock.ANY, 'goose',
            filter_kwargs={'id': (66, 67), 'name': ('a', 'b')})

        self._pullmock.bulk_pull.reset_mock()
        self.rcache._flood_cache_for_query('goose', id=(66, ), name=('a', ))
        self.assertFalse(self._pullmock.called)
        self.rcache._flood_cache_for_query('goose', id=(67, ), name=('b', ))
        self.assertFalse(self._pullmock.called)

        # querying by just ID should trigger a new call since ID+name is a more
        # specific query
        self.rcache._flood_cache_for_query('goose', id=(67, ))
        self._pullmock.bulk_pull.assert_called_once_with(
            mock.ANY, 'goose', filter_kwargs={'id': (67, )})

        self.assertItemsEqual(
            resources, [rec['updated'] for rec in received_kw])
开发者ID:cubeek,项目名称:neutron,代码行数:33,代码来源:test_resource_cache.py


示例3: test_treat_devices_removed_notify

 def test_treat_devices_removed_notify(self):
     handler = mock.Mock()
     registry.subscribe(handler, resources.PORT_DEVICE, events.AFTER_DELETE)
     devices = [DEVICE_1]
     self.agent.treat_devices_removed(devices)
     handler.assert_called_once_with(mock.ANY, mock.ANY, self.agent,
                                     payload=mock.ANY)
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:test__common_agent.py


示例4: setUp

 def setUp(self):
     super(TestStatusBarriers, self).setUp()
     self.setup_coreplugin(CORE_PLUGIN)
     self.ctx = n_ctx.get_admin_context()
     self.provisioned = mock.Mock()
     self.port = self._make_port()
     registry.subscribe(self.provisioned, resources.PORT,
                        pb.PROVISIONING_COMPLETE)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:8,代码来源:test_provisioning_blocks.py


示例5: __init__

 def __init__(self, rcache):
     self.rcache = rcache
     registry.subscribe(self._clear_child_sg_rules, 'SecurityGroup',
                        events.AFTER_DELETE)
     registry.subscribe(self._add_child_sg_rules, 'SecurityGroup',
                        events.AFTER_UPDATE)
     # set this attr so agent can adjust the timeout of the client
     self.client = resources_rpc.ResourcesPullRpcApi().client
开发者ID:igordcard,项目名称:neutron,代码行数:8,代码来源:securitygroups_rpc.py


示例6: register

def register():
    """Register the driver."""
    global DRIVER
    DRIVER = OVSDriver.create()
    # To set the bridge_name in a parent port's vif_details.
    registry.subscribe(vif_details_bridge_name_handler,
                       agent_consts.OVS_BRIDGE_NAME,
                       events.BEFORE_READ)
    LOG.debug('Open vSwitch trunk driver registered')
开发者ID:cubeek,项目名称:neutron,代码行数:9,代码来源:driver.py


示例7: _unsubscribe_callback_events

    def _unsubscribe_callback_events(self):
        # unsubscribe the callback that should be called on all plugins
        # other that NSX-T.
        registry.unsubscribe_all(
            l3_db.L3_NAT_dbonly_mixin._prevent_l3_port_delete_callback)

        # Instead we will subscribe our internal callback.
        registry.subscribe(self._prevent_l3_port_delete_callback,
                           resources.PORT, events.BEFORE_DELETE)
开发者ID:openstack,项目名称:vmware-nsx,代码行数:9,代码来源:plugin.py


示例8: initialize

 def initialize(self):
     super(AristaHAScaleSimulationDriver, self).initialize()
     self.context = context.get_admin_context_without_session()
     # Subscribe to port updates to force ports to active after binding
     # since a fake virt driver is being used, so OVS will never see
     # the libvirt interfaces come up, triggering the OVS provisioning
     self.plugin_rpc = agent_rpc.PluginApi(topics.PLUGIN)
     registry.subscribe(self._port_update_callback,
                        resources.PORT, events.AFTER_UPDATE)
开发者ID:openstack,项目名称:networking-arista,代码行数:9,代码来源:mechanism_ha_simulator.py


示例9: test_security_group_precommit_create_event_fail

 def test_security_group_precommit_create_event_fail(self):
     registry.subscribe(fake_callback, resources.SECURITY_GROUP,
                        events.PRECOMMIT_CREATE)
     with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
                           'rollback') as mock_rollback:
         self.assertRaises(securitygroup.SecurityGroupConflict,
                           self.mixin.create_security_group,
                           self.ctx, FAKE_SECGROUP)
         self.assertTrue(mock_rollback.called)
开发者ID:cubeek,项目名称:neutron,代码行数:9,代码来源:test_securitygroups_db.py


示例10: __init__

 def __init__(self, resource, object_class, resource_push_api):
     self._resource = resource
     self._obj_class = object_class
     self._resource_push_api = resource_push_api
     self._resources_to_push = {}
     self._worker_pool = eventlet.GreenPool()
     for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
                   events.AFTER_DELETE):
         registry.subscribe(self.handle_event, resource, event)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:9,代码来源:ovo_rpc.py


示例11: register_legacy_notification_callbacks

    def register_legacy_notification_callbacks(self, legacy_interface):
        """Emulates the server-side notifications from ml2 AgentNotifierApi.

        legacy_interface is an object with 'delete'/'update' methods for
        core resources.
        """
        self._legacy_interface = legacy_interface
        for e in (callback_events.AFTER_UPDATE, callback_events.AFTER_DELETE):
            for r in (resources.PORT, resources.NETWORK):
                registry.subscribe(self._legacy_notifier, r, e)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:10,代码来源:rpc.py


示例12: __new__

 def __new__(cls, *args, **kwargs):
     # NOTE(kevinbenton): we subscribe on object construction because
     # the tests blow away the callback manager for each run
     new = super(AutoAllocatedTopologyMixin, cls).__new__(cls, *args,
                                                          **kwargs)
     registry.subscribe(_ensure_external_network_default_value_callback,
         resources.NETWORK, events.PRECOMMIT_UPDATE)
     registry.subscribe(_ensure_external_network_default_value_callback,
         resources.NETWORK, events.PRECOMMIT_CREATE)
     return new
开发者ID:cubeek,项目名称:neutron,代码行数:10,代码来源:db.py


示例13: test_record_resource_delete_ignores_dups

 def test_record_resource_delete_ignores_dups(self):
     received_kw = []
     receiver = lambda *a, **k: received_kw.append(k)
     registry.subscribe(receiver, 'goose', events.AFTER_DELETE)
     self.rcache.record_resource_delete(self.ctx, 'goose', 3)
     self.assertEqual(1, len(received_kw))
     self.rcache.record_resource_delete(self.ctx, 'goose', 4)
     self.assertEqual(2, len(received_kw))
     self.rcache.record_resource_delete(self.ctx, 'goose', 3)
     self.assertEqual(2, len(received_kw))
开发者ID:cubeek,项目名称:neutron,代码行数:10,代码来源:test_resource_cache.py


示例14: test_adding_component_for_new_resource_type

 def test_adding_component_for_new_resource_type(self):
     provisioned = mock.Mock()
     registry.subscribe(provisioned, 'NETWORK', pb.PROVISIONING_COMPLETE)
     net = self._make_net()
     # expect failed because the model was not registered for the type
     with testtools.ExpectedException(RuntimeError):
         pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
     pb.add_model_for_resource('NETWORK', models_v2.Network)
     pb.add_provisioning_component(self.ctx, net.id, 'NETWORK', 'ent')
     pb.provisioning_complete(self.ctx, net.id, 'NETWORK', 'ent')
     self.assertTrue(provisioned.called)
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:11,代码来源:test_provisioning_blocks.py


示例15: initialize

    def initialize(self):
        self._nsxv = vcns_driver.VcnsDriver(None)
        self.init_profile_id()
        self.init_security_group()
        self.init_security_group_in_profile()

        # register an event to the end of the init to handle the first upgrade
        if self._is_new_security_group:
            registry.subscribe(self.init_complete,
                               resources.PROCESS,
                               events.BEFORE_SPAWN)
开发者ID:openstack,项目名称:vmware-nsx,代码行数:11,代码来源:driver.py


示例16: test_start_all_workers

    def test_start_all_workers(self):
        cfg.CONF.set_override('api_workers', 0)
        mock.patch.object(service, '_get_rpc_workers').start()
        mock.patch.object(service, '_get_plugins_workers').start()
        mock.patch.object(service, '_start_workers').start()

        callback = mock.Mock()
        registry.subscribe(callback, resources.PROCESS, events.AFTER_SPAWN)
        service.start_all_workers()
        callback.assert_called_once_with(
            resources.PROCESS, events.AFTER_SPAWN, mock.ANY)
开发者ID:eayunstack,项目名称:neutron,代码行数:11,代码来源:test_service.py


示例17: test__set_bridge_name_notify

    def test__set_bridge_name_notify(self):

        def fake_callback(resource, event, trigger, payload=None):
            trigger('fake-br-name')

        registry.subscribe(fake_callback, a_const.OVS_BRIDGE_NAME,
                           events.BEFORE_READ)
        fake_vif_details = {}
        self.driver._set_bridge_name('foo', fake_vif_details)
        self.assertEqual(
            'fake-br-name',
            fake_vif_details.get(portbindings.VIF_DETAILS_BRIDGE_NAME, ''))
开发者ID:cubeek,项目名称:neutron,代码行数:12,代码来源:test_mech_openvswitch.py


示例18: register

 def register(self, resource, event, trigger, payload=None):
     super(NsxV3TrunkDriver, self).register(
         resource, event, trigger, payload=payload)
     self._handler = NsxV3TrunkHandler(self.plugin_driver)
     for event in (events.AFTER_CREATE, events.AFTER_DELETE):
         registry.subscribe(self._handler.trunk_event,
                            resources.TRUNK,
                            event)
         registry.subscribe(self._handler.subport_event,
                            resources.SUBPORTS,
                            event)
     LOG.debug("VMware NSXv3 trunk driver initialized.")
开发者ID:openstack,项目名称:vmware-nsx,代码行数:12,代码来源:driver.py


示例19: test_security_groups_created_outside_transaction

    def test_security_groups_created_outside_transaction(self):
        def record_after_state(r, e, t, context, *args, **kwargs):
            self.was_active = context.session.is_active

        registry.subscribe(record_after_state, resources.SECURITY_GROUP,
                           events.AFTER_CREATE)
        with self.subnet() as s:
            self.assertFalse(self.was_active)
            self._delete(
                'security-groups',
                self._list('security-groups')['security_groups'][0]['id'])
            with self.port(subnet=s):
                self.assertFalse(self.was_active)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:13,代码来源:test_security_group.py


示例20: test_security_group_rule_precommit_create_event_fail

 def test_security_group_rule_precommit_create_event_fail(self):
     registry.subscribe(fake_callback, resources.SECURITY_GROUP_RULE,
                        events.PRECOMMIT_CREATE)
     sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
     fake_rule = FAKE_SECGROUP_RULE
     fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
     with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
                           'rollback') as mock_rollback,\
         mock.patch.object(self.mixin, '_get_security_group'):
         self.assertRaises(securitygroup.SecurityGroupConflict,
                           self.mixin.create_security_group_rule,
                           self.ctx, fake_rule)
         self.assertTrue(mock_rollback.called)
开发者ID:cubeek,项目名称:neutron,代码行数:13,代码来源:test_securitygroups_db.py



注:本文中的neutron_lib.callbacks.registry.subscribe函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python context.get_admin_context函数代码示例发布时间:2022-05-27
下一篇:
Python registry.publish函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap