• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python resources_rpc.resource_type_versioned_topic函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.api.rpc.handlers.resources_rpc.resource_type_versioned_topic函数的典型用法代码示例。如果您正苦于以下问题:Python resource_type_versioned_topic函数的具体用法?Python resource_type_versioned_topic怎么用?Python resource_type_versioned_topic使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了resource_type_versioned_topic函数的12个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test___init__

    def test___init__(self, mocked_register):
        mock_conn = mock.MagicMock()
        with mock.patch.object(rpc.Connection, 'create_consumer',
                               new_callable=mock_conn):
            test_obj = agent.TrunkSkeleton()
            self.assertEqual(2, mocked_register.call_count)
            calls = [mock.call(test_obj.handle_trunks, resources.TRUNK),
                     mock.call(test_obj.handle_subports, resources.SUBPORT)]
            mocked_register.assert_has_calls(calls, any_order=True)

            # Test to see if the call to rpc.get_server has the correct
            # target and the correct endpoints
            topic = resources_rpc.resource_type_versioned_topic(
                resources.SUBPORT)
            subport_target = oslo_messaging.Target(
                topic=topic, server=cfg.CONF.host, fanout=True)
            topic = resources_rpc.resource_type_versioned_topic(
                resources.TRUNK)
            trunk_target = oslo_messaging.Target(
                topic=topic, server=cfg.CONF.host, fanout=True)
            calls = [mock.call(subport_target.topic, mock.ANY, fanout=True),
                     mock.call(trunk_target.topic, mock.ANY, fanout=True)]
            self.assertIn(calls[0], mock_conn().mock_calls)
            self.assertIn(calls[1], mock_conn().mock_calls)
            self.assertIn("ResourcesPushRpcCallback",
                          str(mock_conn().call_args_list))
开发者ID:cubeek,项目名称:neutron,代码行数:26,代码来源:test_agent.py


示例2: __init__

    def __init__(self):
        registry.register(self.handle_trunks, resources.TRUNK)
        registry.register(self.handle_subports, resources.SUBPORT)

        self._connection = n_rpc.Connection()
        endpoints = [resources_rpc.ResourcesPushRpcCallback()]
        topic = resources_rpc.resource_type_versioned_topic(resources.SUBPORT)
        self._connection.create_consumer(topic, endpoints, fanout=True)
        topic = resources_rpc.resource_type_versioned_topic(resources.TRUNK)
        self._connection.create_consumer(topic, endpoints, fanout=True)
        self._connection.consume_in_threads()
开发者ID:igordcard,项目名称:neutron,代码行数:11,代码来源:agent.py


示例3: _register_rpc_consumers

 def _register_rpc_consumers(self, connection):
     endpoints = [resources_rpc.ResourcesPushRpcCallback()]
     for resource_type in self.SUPPORTED_RESOURCES:
         # we assume that neutron-server always broadcasts the latest
         # version known to the agent
         topic = resources_rpc.resource_type_versioned_topic(resource_type)
         connection.create_consumer(topic, endpoints, fanout=True)
开发者ID:sckevmit,项目名称:neutron,代码行数:7,代码来源:qos.py


示例4: _init_rpc_listeners

 def _init_rpc_listeners(self):
     endpoints = [resources_rpc.ResourcesPushRpcCallback()]
     self._connection = n_rpc.Connection()
     for rtype in self.rcache.resource_types:
         registry_rpc.register(self.resource_change_handler, rtype)
         topic = resources_rpc.resource_type_versioned_topic(rtype)
         self._connection.create_consumer(topic, endpoints, fanout=True)
     self._connection.consume_in_threads()
开发者ID:cubeek,项目名称:neutron,代码行数:8,代码来源:resource_cache.py


示例5: test_resource_type_versioned_topic

 def test_resource_type_versioned_topic(self, validate_mock):
     obj_name = FakeResource.obj_name()
     expected = topics.RESOURCE_TOPIC_PATTERN % {
         'resource_type': 'FakeResource', 'version': '1.0'}
     with mock.patch.object(resources_rpc.resources, 'get_resource_cls',
             return_value=FakeResource):
         observed = resources_rpc.resource_type_versioned_topic(obj_name)
     self.assertEqual(expected, observed)
开发者ID:danielmellado,项目名称:neutron,代码行数:8,代码来源:test_resources_rpc.py


示例6: _register_rpc_consumers

    def _register_rpc_consumers(self):
        registry.register(self._handle_notification, resources.QOS_POLICY)

        self._connection = n_rpc.Connection()
        endpoints = [resources_rpc.ResourcesPushRpcCallback()]
        topic = resources_rpc.resource_type_versioned_topic(
            resources.QOS_POLICY)
        self._connection.create_consumer(topic, endpoints, fanout=True)
        self._connection.consume_in_threads()
开发者ID:cubeek,项目名称:neutron,代码行数:9,代码来源:base.py


示例7: test_initialize_subscribed_to_rpc

 def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
     self.qos_ext.initialize(self.connection, constants.EXTENSION_DRIVER_TYPE)
     self.connection.create_consumer.assert_has_calls(
         [
             mock.call(resources_rpc.resource_type_versioned_topic(resource_type), [rpc_mock()], fanout=True)
             for resource_type in self.qos_ext.SUPPORTED_RESOURCES
         ]
     )
     subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
开发者ID:neoareslinux,项目名称:neutron,代码行数:9,代码来源:test_qos.py


示例8: _register_rpc_consumers

 def _register_rpc_consumers(self, connection):
     """Allows an extension to receive notifications of updates made to
        items of interest.
     """
     endpoints = [resources_rpc.ResourcesPushRpcCallback()]
     for resource_type in self.SUPPORTED_RESOURCE_TYPES:
         # We assume that the neutron server always broadcasts the latest
         # version known to the agent
         registry.register(self._handle_notification, resource_type)
         topic = resources_rpc.resource_type_versioned_topic(resource_type)
         connection.create_consumer(topic, endpoints, fanout=True)
开发者ID:noironetworks,项目名称:neutron,代码行数:11,代码来源:qos.py


示例9: test___init__

    def test___init__(self, mocked_get_server, mocked_register):
        test_obj = agent.TrunkSkeleton()
        self.assertEqual(2, mocked_register.call_count)
        calls = [mock.call(test_obj.handle_trunks, resources.TRUNK),
                 mock.call(test_obj.handle_subports, resources.SUBPORT)]
        mocked_register.assert_has_calls(calls, any_order=True)

        # Test to see if the call to rpc.get_server has the correct
        # target and the correct endpoints
        topic = resources_rpc.resource_type_versioned_topic(resources.SUBPORT)
        subport_target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=True)
        topic = resources_rpc.resource_type_versioned_topic(resources.TRUNK)
        trunk_target = oslo_messaging.Target(
            topic=topic, server=cfg.CONF.host, fanout=True)
        calls = [mock.call(subport_target, mock.ANY),
                 mock.call(trunk_target, mock.ANY)]
        mocked_get_server.assert_has_calls(calls, any_order=True)
        self.assertIn("ResourcesPushRpcCallback",
                      str(mocked_get_server.call_args_list))
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:20,代码来源:test_agent.py


示例10: test_initialize_subscribed_to_rpc

 def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
     with mock.patch.object(n_rpc, 'Connection',
                     return_value=self.connection) as create_connection:
         self.fip_qos_ext.initialize(
             self.connection, lib_const.L3_AGENT_MODE)
         create_connection.assert_has_calls([mock.call()])
         self.connection.create_consumer.assert_has_calls(
             [mock.call(
                  resources_rpc.resource_type_versioned_topic(
                      resources.QOS_POLICY),
                  [rpc_mock()],
                  fanout=True)]
         )
         subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
开发者ID:openstack,项目名称:neutron,代码行数:14,代码来源:test_fip.py


示例11: test_initialize_subscribed_to_rpc

 def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
     call_to_patch = 'neutron.common.rpc.Connection'
     with mock.patch(call_to_patch,
                     return_value=self.connection) as create_connection:
         self.fip_pf_ext.initialize(
             self.connection, lib_const.L3_AGENT_MODE)
         create_connection.assert_has_calls([mock.call()])
         self.connection.create_consumer.assert_has_calls(
             [mock.call(
                  resources_rpc.resource_type_versioned_topic(
                      resources.PORTFORWARDING),
                  [rpc_mock()],
                  fanout=True)]
         )
         subscribe_mock.assert_called_with(
             mock.ANY, resources.PORTFORWARDING)
开发者ID:cubeek,项目名称:neutron,代码行数:16,代码来源:test_port_forwarding.py


示例12: _register_rpc_consumers

 def _register_rpc_consumers(self, connection):
     endpoints = [resources_rpc.ResourcesPushRpcCallback()]
     for resource_type in self.SUPPORTED_RESOURCE_TYPES:
         registry.register(self._handle_notification, resource_type)
         topic = resources_rpc.resource_type_versioned_topic(resource_type)
         connection.create_consumer(topic, endpoints, fanout=True)
开发者ID:cubeek,项目名称:neutron,代码行数:6,代码来源:log_extension.py



注:本文中的neutron.api.rpc.handlers.resources_rpc.resource_type_versioned_topic函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python attributes._validate_dict函数代码示例发布时间:2022-05-27
下一篇:
Python extensions.get_extensions_path函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap