• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python rpc.init函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.common.rpc.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: setup_rpc_mocks

    def setup_rpc_mocks(self):
        # don't actually start RPC listeners when testing
        self.useFixture(fixtures.MonkeyPatch(
            'neutron.common.rpc.Connection.consume_in_threads',
            fake_consume_in_threads))

        # immediately return RPC calls
        self.useFixture(fixtures.MonkeyPatch(
            'neutron.common.rpc.RpcProxy._RpcProxy__call_rpc_method',
            mock.MagicMock()))

        self.useFixture(fixtures.MonkeyPatch(
            'oslo.messaging.Notifier', fake_notifier.FakeNotifier))

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_driver = 'fake'
        # NOTE(russellb) We want all calls to return immediately.
        self.messaging_conf.response_timeout = 0
        self.useFixture(self.messaging_conf)

        self.addCleanup(n_rpc.clear_extra_exmods)
        n_rpc.add_extra_exmods('neutron.test')

        self.addCleanup(n_rpc.cleanup)
        n_rpc.init(CONF)
开发者ID:absolutarin,项目名称:neutron,代码行数:25,代码来源:base.py


示例2: test_init

    def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser,
                  mock_exmods):
        notifier = mock.Mock()
        transport = mock.Mock()
        noti_transport = mock.Mock()
        serializer = mock.Mock()
        conf = mock.Mock()

        mock_exmods.return_value = ['foo']
        mock_trans.return_value = transport
        mock_noti_trans.return_value = noti_transport
        mock_ser.return_value = serializer
        mock_not.return_value = notifier

        rpc.init(conf)

        mock_exmods.assert_called_once_with()
        mock_trans.assert_called_once_with(conf, allowed_remote_exmods=['foo'],
                                           aliases=rpc.TRANSPORT_ALIASES)
        mock_noti_trans.assert_called_once_with(conf,
                                                allowed_remote_exmods=['foo'],
                                                aliases=rpc.TRANSPORT_ALIASES)
        mock_not.assert_called_once_with(noti_transport,
                                         serializer=serializer)
        self.assertIsNotNone(rpc.TRANSPORT)
        self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
        self.assertIsNotNone(rpc.NOTIFIER)
开发者ID:Blahhhhh,项目名称:neutron,代码行数:27,代码来源:test_rpc.py


示例3: setUp

 def setUp(self):
     n_rpc.init(cfg.CONF)
     self.p_notification = pull_notification('sc', 'conf')
     self.context = TestContext().get_context_dict()
     self.ev = ''
     self.import_lib = 'gbpservice.nfp.lib.transport'
     self.import_cast = 'oslo_messaging.rpc.client._CallContext.cast'
开发者ID:openstack,项目名称:group-based-policy,代码行数:7,代码来源:test_pull_notifications.py


示例4: setUp

    def setUp(self):
        super(QuarkIpamBaseFunctionalTest, self).setUp()

        patcher = mock.patch("neutron.common.rpc.oslo_messaging")
        patcher.start()
        self.addCleanup(patcher.stop)
        rpc.init(mock.MagicMock())
开发者ID:Cerberus98,项目名称:quark,代码行数:7,代码来源:test_ipam.py


示例5: __init__

    def __init__(self):

        # Required to bypass an error when instantiating Midonet plugin.
        rpc.init(cfg.CONF)

        self.ctx = ncntxt.get_admin_context()
        self.client = plugin.MidonetPluginV2()
        self.lb_client = loadbalancer_db.LoadBalancerPluginDb()
开发者ID:332054781,项目名称:midonet,代码行数:8,代码来源:context.py


示例6: init

def init(args, **kwargs):
    product_name = 'bambuk-dispatcher-agent'
    log.register_options(cfg.CONF)
    cfg.CONF(args=args, project=product_name,
             version='%%(prog)s %s' % version.version_info.release_string(),
             **kwargs)
    log.setup(cfg.CONF, product_name)
    rpc.init(cfg.CONF)
开发者ID:lionelz,项目名称:networking-bambuk,代码行数:8,代码来源:config.py


示例7: setUp

    def setUp(self, f1, f2, f3, f4, f5):
        super(TestNWAAgentBase, self).setUp()

        cli = mock.patch('networking_nec.nwa.nwalib.client.NwaClient').start()
        self.nwacli = cli.return_value
        _init_nwa_client_patch(self.nwacli)

        self.agent = nwa_agent.NECNWANeutronAgent(10)
        rpc.init(cfg.ConfigOpts())
开发者ID:nec-openstack,项目名称:networking-nec,代码行数:9,代码来源:base.py


示例8: init

def init(args, default_config_files=None, **kwargs):
    cfg.CONF(args=args, project='neutron',
             version='%%(prog)s %s' % version.version_info.release_string(),
             default_config_files=default_config_files,
             **kwargs)

    n_rpc.init(cfg.CONF)

    # Validate that the base_mac is of the correct format
    msg = validators.validate_regex(cfg.CONF.base_mac, validators.MAC_PATTERN)
    if msg:
        msg = _("Base MAC: %s") % msg
        raise Exception(msg)
开发者ID:noironetworks,项目名称:neutron,代码行数:13,代码来源:config.py


示例9: init

def init(args, **kwargs):
    cfg.CONF(args=args, project="neutron", version="%%(prog)s %s" % version.version_info.release_string(), **kwargs)

    # FIXME(ihrachys): if import is put in global, circular import
    # failure occurs
    from neutron.common import rpc as n_rpc

    n_rpc.init(cfg.CONF)

    # Validate that the base_mac is of the correct format
    msg = attributes._validate_regex(cfg.CONF.base_mac, attributes.MAC_PATTERN)
    if msg:
        msg = _("Base MAC: %s") % msg
        raise Exception(msg)
开发者ID:hzhou8,项目名称:neutron,代码行数:14,代码来源:config.py


示例10: setUp

    def setUp(self):
        super(CastExceptionTestCase, self).setUp()

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_url = 'fake://'
        self.messaging_conf.response_timeout = 0
        self.useFixture(self.messaging_conf)

        self.addCleanup(rpc.cleanup)
        rpc.init(CONF)
        rpc.TRANSPORT = mock.MagicMock()
        rpc.TRANSPORT._send.side_effect = Exception
        target = messaging.Target(version='1.0', topic='testing')
        self.client = rpc.get_client(target)
        self.cast_context = mock.Mock()
开发者ID:huntxu,项目名称:neutron,代码行数:15,代码来源:test_rpc.py


示例11: setUp

    def setUp(self):
        super(ServiceTestCase, self).setUp()
        self.host = 'foo'
        self.topic = 'neutron-agent'

        self.target_mock = mock.patch('oslo_messaging.Target')
        self.target_mock.start()

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_driver = 'fake'
        self.messaging_conf.response_timeout = 0
        self.useFixture(self.messaging_conf)

        self.addCleanup(rpc.cleanup)
        rpc.init(CONF)
开发者ID:Blahhhhh,项目名称:neutron,代码行数:15,代码来源:test_rpc.py


示例12: setUp

    def setUp(self):
        super(TimeoutTestCase, self).setUp()

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_driver = 'fake'
        self.messaging_conf.response_timeout = 0
        self.useFixture(self.messaging_conf)

        self.addCleanup(rpc.cleanup)
        rpc.init(CONF)
        rpc.TRANSPORT = mock.MagicMock()
        rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
        target = messaging.Target(version='1.0', topic='testing')
        self.client = rpc.get_client(target)
        self.call_context = mock.Mock()
        self.sleep = mock.patch('time.sleep').start()
        rpc.TRANSPORT.conf.rpc_response_timeout = 10
开发者ID:sebrandon1,项目名称:neutron,代码行数:17,代码来源:test_rpc.py


示例13: setup_rpc_mocks

    def setup_rpc_mocks(self):
        # don't actually start RPC listeners when testing
        mock.patch("neutron.common.rpc.Connection.consume_in_threads", return_value=[]).start()

        self.useFixture(fixtures.MonkeyPatch("oslo_messaging.Notifier", fake_notifier.FakeNotifier))

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_driver = "fake"
        # NOTE(russellb) We want all calls to return immediately.
        self.messaging_conf.response_timeout = 0
        self.useFixture(self.messaging_conf)

        self.addCleanup(n_rpc.clear_extra_exmods)
        n_rpc.add_extra_exmods("neutron.test")

        self.addCleanup(n_rpc.cleanup)
        n_rpc.init(CONF)
开发者ID:openstack,项目名称:neutron,代码行数:17,代码来源:base.py


示例14: __init__

    def __init__(self, **kwargs):
        super(TestRpcReportState, self).__init__(**kwargs)
        cfg.CONF([], project='neutron', default_config_files=['/etc/neutron/neutron.conf'])
        rpc.init(cfg.CONF)
        self.ctxt = context.get_admin_context_without_session()
        self.agent_state = {
                           'binary': 'neutron-openvswitch-agent',
                           'host': 'fakehost.com',
                           'topic': 'N/A',
                           'configurations': {'bridge_mappings': {"physnet2": "br-bond1"},
                                              'tunnel_types': [],
                                              'tunneling_ip': "",
                                              'l2_population': False},
                           'agent_type': "Open vSwitch agent",
                           'start_flag': True
                          }

        self.state_rpc = nu_rpc.PluginReportStateAPI('q-plugin')
开发者ID:xiayuu,项目名称:neutron-performance-test,代码行数:18,代码来源:neutron_test.py


示例15: init

def init(args, **kwargs):
    cfg.CONF(args=args, project='neutron',
             version='%%(prog)s %s' % version.version_info.release_string(),
             **kwargs)

    # FIXME(ihrachys): if import is put in global, circular import
    # failure occurs
    from neutron.common import rpc as n_rpc
    # 进行notification的初始化操作,此时没有指定publish_ip
    n_rpc.init(cfg.CONF)

    # Validate that the base_mac is of the correct format

    # Checking mac format is right or not
    msg = attributes._validate_regex(cfg.CONF.base_mac,
                                     attributes.MAC_PATTERN)
    if msg:
        msg = _("Base MAC: %s") % msg
        raise Exception(msg)
开发者ID:ytwxy99,项目名称:neutron,代码行数:19,代码来源:config.py


示例16: test_init

    def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser):
        notifier = mock.Mock()
        transport = mock.Mock()
        noti_transport = mock.Mock()
        serializer = mock.Mock()
        conf = mock.Mock()

        mock_trans.return_value = transport
        mock_noti_trans.return_value = noti_transport
        mock_ser.return_value = serializer
        mock_not.return_value = notifier

        rpc.init(conf, rpc_ext_mods=['foo'])

        expected_mods = list(set(['foo'] + rpc._DFT_EXMODS))
        mock_trans.assert_called_once_with(
            conf, allowed_remote_exmods=expected_mods)
        mock_noti_trans.assert_called_once_with(
            conf, allowed_remote_exmods=expected_mods)
        mock_not.assert_called_once_with(noti_transport,
                                         serializer=serializer)
        self.assertIsNotNone(rpc.TRANSPORT)
        self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
        self.assertIsNotNone(rpc.NOTIFIER)
开发者ID:huntxu,项目名称:neutron,代码行数:24,代码来源:test_rpc.py


示例17: main

def main():
    username = ""
    password = ""
    auth_url = ""

    if 'OS_USERNAME' in os.environ:
        username = os.environ['OS_USERNAME']
    else:
        print("OS_USERNAME not defined in environment")
        sys.exit(1)

    if 'OS_PASSWORD' in os.environ:
        password = os.environ['OS_PASSWORD']
    else:
        print("OS_PASSWORD not defined in environment")
        sys.exit(1)

    if 'OS_TENANT_NAME' in os.environ:
        tenant_name = os.environ['OS_TENANT_NAME']
    else:
        print("OS_TENANT_NAME not defined in environment")
        sys.exit(1)

    if 'OS_AUTH_URL' in os.environ:
        auth_url = os.environ['OS_AUTH_URL']
    else:
        print("OS_AUTH_URL not defined in environment")
        sys.exit(1)

    neutron = q_client.Client(username=username,
                              password=password,
                              tenant_name=tenant_name,
                              auth_url=auth_url)

    subnets = neutron.list_subnets()['subnets']
    for subnet in subnets:
        if subnet['name'] == 'private-subnet':
            lb_dict['loadbalancer']['vip_subnet_id'] = subnet['id']
            lb_dict['loadbalancer']['tenant_id'] = subnet['tenant_id']

    neutron.create_loadbalancer(lb_dict)
    loadbalancers = neutron.list_loadbalancers()['loadbalancers']
    for loadbalancer in loadbalancers:
        if loadbalancer['name'] == lb_dict['loadbalancer']['name']:
            break

    environment_prefix = 'Test'
    topic = '%s_%s'\
        % (constants_v2.TOPIC_PROCESS_ON_HOST_V2, environment_prefix)
    print(topic)

    q_rpc.init(cfg.CONF)

    transport = messaging.get_transport(cfg.CONF)
    target = messaging.Target(topic=topic)
    rpc_client = messaging.RPCClient(transport, target)

    ctxt = context.get_admin_context().to_dict()
    print(loadbalancer['id'])
    time.sleep(5)
    service = rpc_client.call(ctxt, 'get_service_by_loadbalancer_id',
                              loadbalancer_id=loadbalancer['id'],
                              global_routed_mode=True,
                              host=None)
    print(service)

    neutron.delete_loadbalancer(loadbalancer['id'])
开发者ID:F5Networks,项目名称:f5-openstack-lbaasv2-driver,代码行数:67,代码来源:testgetservice.py


示例18: setUp

 def setUp(self):
     n_rpc.init(cfg.CONF)
     self.imprt_rc = 'gbpservice.nfp.lib.rest_client_over_unix'
开发者ID:openstack,项目名称:group-based-policy,代码行数:3,代码来源:test_transport.py


示例19: Controller

#    License for the specific language governing permissions and limitations
#    under the License.

import oslo_serialization.jsonutils as jsonutils

from neutron.common import rpc as n_rpc
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
import pecan
import pika

from gbpservice.nfp.pecan import base_controller

LOG = logging.getLogger(__name__)
n_rpc.init(cfg.CONF)


class Controller(base_controller.BaseController):
    """Implements all the APIs Invoked by HTTP requests.

    Implements following HTTP methods.
        -get
        -post
        -put
    According to the HTTP request received from config-agent this class make
    call/cast to configurator and return response to config-agent

    """

    def __init__(self, method_name):
开发者ID:openstack,项目名称:group-based-policy,代码行数:31,代码来源:controller.py


示例20: setUp

    def setUp(self):
        super(BaseTestCase, self).setUp()
        # Ensure plugin cleanup is triggered last so that
        # test-specific cleanup has a chance to release references.
        self.addCleanup(self.cleanup_core_plugin)

        # Configure this first to ensure pm debugging support for setUp()
        if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING:
            self.addOnException(post_mortem_debug.exception_handler)

        if os.environ.get('OS_DEBUG') in TRUE_STRING:
            _level = std_logging.DEBUG
        else:
            _level = std_logging.INFO
        capture_logs = os.environ.get('OS_LOG_CAPTURE') in TRUE_STRING
        if not capture_logs:
            std_logging.basicConfig(format=LOG_FORMAT, level=_level)
        self.log_fixture = self.useFixture(
            fixtures.FakeLogger(
                format=LOG_FORMAT,
                level=_level,
                nuke_handlers=capture_logs,
            ))

        # suppress all but errors here
        self.useFixture(
            fixtures.FakeLogger(
                name='neutron.api.extensions',
                format=LOG_FORMAT,
                level=std_logging.ERROR,
                nuke_handlers=capture_logs,
            ))

        test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
        if test_timeout == -1:
            test_timeout = 0
        if test_timeout > 0:
            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))

        # If someone does use tempfile directly, ensure that it's cleaned up
        self.useFixture(fixtures.NestedTempfile())
        self.useFixture(fixtures.TempHomeDir())

        self.temp_dir = self.useFixture(fixtures.TempDir()).path
        cfg.CONF.set_override('state_path', self.temp_dir)

        self.addCleanup(mock.patch.stopall)
        self.addCleanup(CONF.reset)

        if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_STRING:
            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
        if os.environ.get('OS_STDERR_CAPTURE') in TRUE_STRING:
            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
        self.useFixture(fixtures.MonkeyPatch(
            'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
            fake_use_fatal_exceptions))

        # don't actually start RPC listeners when testing
        self.useFixture(fixtures.MonkeyPatch(
            'neutron.common.rpc.Connection.consume_in_threads',
            fake_consume_in_threads))

        self.useFixture(fixtures.MonkeyPatch(
            'oslo.messaging.Notifier', fake_notifier.FakeNotifier))

        self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
        self.messaging_conf.transport_driver = 'fake'
        self.messaging_conf.response_timeout = 15
        self.useFixture(self.messaging_conf)

        self.addCleanup(n_rpc.clear_extra_exmods)
        n_rpc.add_extra_exmods('neutron.test')

        self.addCleanup(n_rpc.cleanup)
        n_rpc.init(CONF)

        if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
            raise self.skipException('XML Testing Skipped in Py26')

        self.setup_config()
开发者ID:AsherBond,项目名称:quantum,代码行数:82,代码来源:base.py



注:本文中的neutron.common.rpc.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python topics.get_topic_name函数代码示例发布时间:2022-05-27
下一篇:
Python rpc.get_notifier函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap