本文整理汇总了Python中neutron.common.rpc.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了init函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: setup_rpc_mocks
def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc.Connection.consume_in_threads',
fake_consume_in_threads))
# immediately return RPC calls
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc.RpcProxy._RpcProxy__call_rpc_method',
mock.MagicMock()))
self.useFixture(fixtures.MonkeyPatch(
'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
# NOTE(russellb) We want all calls to return immediately.
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.clear_extra_exmods)
n_rpc.add_extra_exmods('neutron.test')
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
开发者ID:absolutarin,项目名称:neutron,代码行数:25,代码来源:base.py
示例2: test_init
def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser,
mock_exmods):
notifier = mock.Mock()
transport = mock.Mock()
noti_transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
mock_exmods.return_value = ['foo']
mock_trans.return_value = transport
mock_noti_trans.return_value = noti_transport
mock_ser.return_value = serializer
mock_not.return_value = notifier
rpc.init(conf)
mock_exmods.assert_called_once_with()
mock_trans.assert_called_once_with(conf, allowed_remote_exmods=['foo'],
aliases=rpc.TRANSPORT_ALIASES)
mock_noti_trans.assert_called_once_with(conf,
allowed_remote_exmods=['foo'],
aliases=rpc.TRANSPORT_ALIASES)
mock_not.assert_called_once_with(noti_transport,
serializer=serializer)
self.assertIsNotNone(rpc.TRANSPORT)
self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNotNone(rpc.NOTIFIER)
开发者ID:Blahhhhh,项目名称:neutron,代码行数:27,代码来源:test_rpc.py
示例3: setUp
def setUp(self):
n_rpc.init(cfg.CONF)
self.p_notification = pull_notification('sc', 'conf')
self.context = TestContext().get_context_dict()
self.ev = ''
self.import_lib = 'gbpservice.nfp.lib.transport'
self.import_cast = 'oslo_messaging.rpc.client._CallContext.cast'
开发者ID:openstack,项目名称:group-based-policy,代码行数:7,代码来源:test_pull_notifications.py
示例4: setUp
def setUp(self):
super(QuarkIpamBaseFunctionalTest, self).setUp()
patcher = mock.patch("neutron.common.rpc.oslo_messaging")
patcher.start()
self.addCleanup(patcher.stop)
rpc.init(mock.MagicMock())
开发者ID:Cerberus98,项目名称:quark,代码行数:7,代码来源:test_ipam.py
示例5: __init__
def __init__(self):
# Required to bypass an error when instantiating Midonet plugin.
rpc.init(cfg.CONF)
self.ctx = ncntxt.get_admin_context()
self.client = plugin.MidonetPluginV2()
self.lb_client = loadbalancer_db.LoadBalancerPluginDb()
开发者ID:332054781,项目名称:midonet,代码行数:8,代码来源:context.py
示例6: init
def init(args, **kwargs):
product_name = 'bambuk-dispatcher-agent'
log.register_options(cfg.CONF)
cfg.CONF(args=args, project=product_name,
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
log.setup(cfg.CONF, product_name)
rpc.init(cfg.CONF)
开发者ID:lionelz,项目名称:networking-bambuk,代码行数:8,代码来源:config.py
示例7: setUp
def setUp(self, f1, f2, f3, f4, f5):
super(TestNWAAgentBase, self).setUp()
cli = mock.patch('networking_nec.nwa.nwalib.client.NwaClient').start()
self.nwacli = cli.return_value
_init_nwa_client_patch(self.nwacli)
self.agent = nwa_agent.NECNWANeutronAgent(10)
rpc.init(cfg.ConfigOpts())
开发者ID:nec-openstack,项目名称:networking-nec,代码行数:9,代码来源:base.py
示例8: init
def init(args, default_config_files=None, **kwargs):
cfg.CONF(args=args, project='neutron',
version='%%(prog)s %s' % version.version_info.release_string(),
default_config_files=default_config_files,
**kwargs)
n_rpc.init(cfg.CONF)
# Validate that the base_mac is of the correct format
msg = validators.validate_regex(cfg.CONF.base_mac, validators.MAC_PATTERN)
if msg:
msg = _("Base MAC: %s") % msg
raise Exception(msg)
开发者ID:noironetworks,项目名称:neutron,代码行数:13,代码来源:config.py
示例9: init
def init(args, **kwargs):
cfg.CONF(args=args, project="neutron", version="%%(prog)s %s" % version.version_info.release_string(), **kwargs)
# FIXME(ihrachys): if import is put in global, circular import
# failure occurs
from neutron.common import rpc as n_rpc
n_rpc.init(cfg.CONF)
# Validate that the base_mac is of the correct format
msg = attributes._validate_regex(cfg.CONF.base_mac, attributes.MAC_PATTERN)
if msg:
msg = _("Base MAC: %s") % msg
raise Exception(msg)
开发者ID:hzhou8,项目名称:neutron,代码行数:14,代码来源:config.py
示例10: setUp
def setUp(self):
super(CastExceptionTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake://'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = Exception
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.cast_context = mock.Mock()
开发者ID:huntxu,项目名称:neutron,代码行数:15,代码来源:test_rpc.py
示例11: setUp
def setUp(self):
super(ServiceTestCase, self).setUp()
self.host = 'foo'
self.topic = 'neutron-agent'
self.target_mock = mock.patch('oslo_messaging.Target')
self.target_mock.start()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
开发者ID:Blahhhhh,项目名称:neutron,代码行数:15,代码来源:test_rpc.py
示例12: setUp
def setUp(self):
super(TimeoutTestCase, self).setUp()
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(rpc.cleanup)
rpc.init(CONF)
rpc.TRANSPORT = mock.MagicMock()
rpc.TRANSPORT._send.side_effect = messaging.MessagingTimeout
target = messaging.Target(version='1.0', topic='testing')
self.client = rpc.get_client(target)
self.call_context = mock.Mock()
self.sleep = mock.patch('time.sleep').start()
rpc.TRANSPORT.conf.rpc_response_timeout = 10
开发者ID:sebrandon1,项目名称:neutron,代码行数:17,代码来源:test_rpc.py
示例13: setup_rpc_mocks
def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
mock.patch("neutron.common.rpc.Connection.consume_in_threads", return_value=[]).start()
self.useFixture(fixtures.MonkeyPatch("oslo_messaging.Notifier", fake_notifier.FakeNotifier))
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = "fake"
# NOTE(russellb) We want all calls to return immediately.
self.messaging_conf.response_timeout = 0
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.clear_extra_exmods)
n_rpc.add_extra_exmods("neutron.test")
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
开发者ID:openstack,项目名称:neutron,代码行数:17,代码来源:base.py
示例14: __init__
def __init__(self, **kwargs):
super(TestRpcReportState, self).__init__(**kwargs)
cfg.CONF([], project='neutron', default_config_files=['/etc/neutron/neutron.conf'])
rpc.init(cfg.CONF)
self.ctxt = context.get_admin_context_without_session()
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': 'fakehost.com',
'topic': 'N/A',
'configurations': {'bridge_mappings': {"physnet2": "br-bond1"},
'tunnel_types': [],
'tunneling_ip': "",
'l2_population': False},
'agent_type': "Open vSwitch agent",
'start_flag': True
}
self.state_rpc = nu_rpc.PluginReportStateAPI('q-plugin')
开发者ID:xiayuu,项目名称:neutron-performance-test,代码行数:18,代码来源:neutron_test.py
示例15: init
def init(args, **kwargs):
cfg.CONF(args=args, project='neutron',
version='%%(prog)s %s' % version.version_info.release_string(),
**kwargs)
# FIXME(ihrachys): if import is put in global, circular import
# failure occurs
from neutron.common import rpc as n_rpc
# 进行notification的初始化操作,此时没有指定publish_ip
n_rpc.init(cfg.CONF)
# Validate that the base_mac is of the correct format
# Checking mac format is right or not
msg = attributes._validate_regex(cfg.CONF.base_mac,
attributes.MAC_PATTERN)
if msg:
msg = _("Base MAC: %s") % msg
raise Exception(msg)
开发者ID:ytwxy99,项目名称:neutron,代码行数:19,代码来源:config.py
示例16: test_init
def test_init(self, mock_not, mock_noti_trans, mock_trans, mock_ser):
notifier = mock.Mock()
transport = mock.Mock()
noti_transport = mock.Mock()
serializer = mock.Mock()
conf = mock.Mock()
mock_trans.return_value = transport
mock_noti_trans.return_value = noti_transport
mock_ser.return_value = serializer
mock_not.return_value = notifier
rpc.init(conf, rpc_ext_mods=['foo'])
expected_mods = list(set(['foo'] + rpc._DFT_EXMODS))
mock_trans.assert_called_once_with(
conf, allowed_remote_exmods=expected_mods)
mock_noti_trans.assert_called_once_with(
conf, allowed_remote_exmods=expected_mods)
mock_not.assert_called_once_with(noti_transport,
serializer=serializer)
self.assertIsNotNone(rpc.TRANSPORT)
self.assertIsNotNone(rpc.NOTIFICATION_TRANSPORT)
self.assertIsNotNone(rpc.NOTIFIER)
开发者ID:huntxu,项目名称:neutron,代码行数:24,代码来源:test_rpc.py
示例17: main
def main():
username = ""
password = ""
auth_url = ""
if 'OS_USERNAME' in os.environ:
username = os.environ['OS_USERNAME']
else:
print("OS_USERNAME not defined in environment")
sys.exit(1)
if 'OS_PASSWORD' in os.environ:
password = os.environ['OS_PASSWORD']
else:
print("OS_PASSWORD not defined in environment")
sys.exit(1)
if 'OS_TENANT_NAME' in os.environ:
tenant_name = os.environ['OS_TENANT_NAME']
else:
print("OS_TENANT_NAME not defined in environment")
sys.exit(1)
if 'OS_AUTH_URL' in os.environ:
auth_url = os.environ['OS_AUTH_URL']
else:
print("OS_AUTH_URL not defined in environment")
sys.exit(1)
neutron = q_client.Client(username=username,
password=password,
tenant_name=tenant_name,
auth_url=auth_url)
subnets = neutron.list_subnets()['subnets']
for subnet in subnets:
if subnet['name'] == 'private-subnet':
lb_dict['loadbalancer']['vip_subnet_id'] = subnet['id']
lb_dict['loadbalancer']['tenant_id'] = subnet['tenant_id']
neutron.create_loadbalancer(lb_dict)
loadbalancers = neutron.list_loadbalancers()['loadbalancers']
for loadbalancer in loadbalancers:
if loadbalancer['name'] == lb_dict['loadbalancer']['name']:
break
environment_prefix = 'Test'
topic = '%s_%s'\
% (constants_v2.TOPIC_PROCESS_ON_HOST_V2, environment_prefix)
print(topic)
q_rpc.init(cfg.CONF)
transport = messaging.get_transport(cfg.CONF)
target = messaging.Target(topic=topic)
rpc_client = messaging.RPCClient(transport, target)
ctxt = context.get_admin_context().to_dict()
print(loadbalancer['id'])
time.sleep(5)
service = rpc_client.call(ctxt, 'get_service_by_loadbalancer_id',
loadbalancer_id=loadbalancer['id'],
global_routed_mode=True,
host=None)
print(service)
neutron.delete_loadbalancer(loadbalancer['id'])
开发者ID:F5Networks,项目名称:f5-openstack-lbaasv2-driver,代码行数:67,代码来源:testgetservice.py
示例18: setUp
def setUp(self):
n_rpc.init(cfg.CONF)
self.imprt_rc = 'gbpservice.nfp.lib.rest_client_over_unix'
开发者ID:openstack,项目名称:group-based-policy,代码行数:3,代码来源:test_transport.py
示例19: Controller
# License for the specific language governing permissions and limitations
# under the License.
import oslo_serialization.jsonutils as jsonutils
from neutron.common import rpc as n_rpc
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
import pecan
import pika
from gbpservice.nfp.pecan import base_controller
LOG = logging.getLogger(__name__)
n_rpc.init(cfg.CONF)
class Controller(base_controller.BaseController):
"""Implements all the APIs Invoked by HTTP requests.
Implements following HTTP methods.
-get
-post
-put
According to the HTTP request received from config-agent this class make
call/cast to configurator and return response to config-agent
"""
def __init__(self, method_name):
开发者ID:openstack,项目名称:group-based-policy,代码行数:31,代码来源:controller.py
示例20: setUp
def setUp(self):
super(BaseTestCase, self).setUp()
# Ensure plugin cleanup is triggered last so that
# test-specific cleanup has a chance to release references.
self.addCleanup(self.cleanup_core_plugin)
# Configure this first to ensure pm debugging support for setUp()
if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING:
self.addOnException(post_mortem_debug.exception_handler)
if os.environ.get('OS_DEBUG') in TRUE_STRING:
_level = std_logging.DEBUG
else:
_level = std_logging.INFO
capture_logs = os.environ.get('OS_LOG_CAPTURE') in TRUE_STRING
if not capture_logs:
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
self.log_fixture = self.useFixture(
fixtures.FakeLogger(
format=LOG_FORMAT,
level=_level,
nuke_handlers=capture_logs,
))
# suppress all but errors here
self.useFixture(
fixtures.FakeLogger(
name='neutron.api.extensions',
format=LOG_FORMAT,
level=std_logging.ERROR,
nuke_handlers=capture_logs,
))
test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
if test_timeout == -1:
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
# If someone does use tempfile directly, ensure that it's cleaned up
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.temp_dir = self.useFixture(fixtures.TempDir()).path
cfg.CONF.set_override('state_path', self.temp_dir)
self.addCleanup(mock.patch.stopall)
self.addCleanup(CONF.reset)
if os.environ.get('OS_STDOUT_CAPTURE') in TRUE_STRING:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in TRUE_STRING:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
# don't actually start RPC listeners when testing
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.rpc.Connection.consume_in_threads',
fake_consume_in_threads))
self.useFixture(fixtures.MonkeyPatch(
'oslo.messaging.Notifier', fake_notifier.FakeNotifier))
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
self.messaging_conf.transport_driver = 'fake'
self.messaging_conf.response_timeout = 15
self.useFixture(self.messaging_conf)
self.addCleanup(n_rpc.clear_extra_exmods)
n_rpc.add_extra_exmods('neutron.test')
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml':
raise self.skipException('XML Testing Skipped in Py26')
self.setup_config()
开发者ID:AsherBond,项目名称:quantum,代码行数:82,代码来源:base.py
注:本文中的neutron.common.rpc.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论