• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python manager.init函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中neutron.manager.init函数的典型用法代码示例。如果您正苦于以下问题:Python init函数的具体用法?Python init怎么用?Python init使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了init函数的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: main

def main():
    cfg.CONF.register_cli_opts(cli_opts)
    config.init(sys.argv[1:])

    # Enable logging but prevent output to stderr.
    cfg.CONF.use_stderr = False
    config.setup_logging()

    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the '--config-file' option!"))

    router.APIRouter.factory({})
    manager.init()

    gbp_plugin = directory.get_plugin('GROUP_POLICY')
    if not gbp_plugin:
        sys.exit("GBP service plugin not configured.")

    result = gbp_plugin.validate_state(cfg.CONF.repair)
    if result in [api.VALIDATION_FAILED_REPAIRABLE,
                  api.VALIDATION_FAILED_UNREPAIRABLE,
                  api.VALIDATION_FAILED_WITH_EXCEPTION]:
        sys.exit(result)
    return 0
开发者ID:openstack,项目名称:group-based-policy,代码行数:26,代码来源:cli.py


示例2: setUp

    def setUp(self):
        super(TestLoggingPlugin, self).setUp()
        self.setup_coreplugin(load_plugins=False)

        mock.patch('neutron.objects.db.api.create_object').start()
        mock.patch('neutron.objects.db.api.update_object').start()
        mock.patch('neutron.objects.db.api.delete_object').start()
        mock.patch('neutron.objects.db.api.get_object').start()
        # We don't use real models as per mocks above. We also need to mock-out
        # methods that work with real data types
        mock.patch(
            'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
        ).start()

        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
        cfg.CONF.set_override("service_plugins",
            ["neutron.services.logapi.logging_plugin.LoggingPlugin"])

        manager.init()
        self.log_plugin = directory.get_plugin(plugin_const.LOG_API)
        self.log_plugin.driver_manager = mock.Mock()
        log_types = mock.PropertyMock(return_value=SUPPORTED_LOGGING_TYPES)
        self.log_plugin.driver_manager.supported_logging_types = \
            mock.patch('neutron.services.logapi.drivers.manager.'
                       'LoggingServiceDriverManager.supported_logging_types',
                       new_callable=log_types).start()
        self.ctxt = context.Context('admin', 'fake_tenant')
开发者ID:mmalchuk,项目名称:openstack-neutron,代码行数:27,代码来源:test_logging_plugin.py


示例3: setup_coreplugin

 def setup_coreplugin(self, core_plugin=None, load_plugins=True):
     cp = PluginFixture(core_plugin)
     self.useFixture(cp)
     self.patched_dhcp_periodic = cp.patched_dhcp_periodic
     self.patched_default_svc_plugins = cp.patched_default_svc_plugins
     if load_plugins:
         manager.init()
开发者ID:openstack,项目名称:neutron,代码行数:7,代码来源:base.py


示例4: setUp

    def setUp(self):
        super(TestQosPlugin, self).setUp()
        self.setup_coreplugin(load_plugins=False)

        mock.patch('neutron.objects.db.api.create_object').start()
        mock.patch('neutron.objects.db.api.update_object').start()
        mock.patch('neutron.objects.db.api.delete_object').start()
        mock.patch('neutron.objects.db.api.get_object').start()
        mock.patch(
            'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
        # We don't use real models as per mocks above. We also need to mock-out
        # methods that work with real data types
        mock.patch(
            'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
        ).start()

        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
        cfg.CONF.set_override("service_plugins", ["qos"])

        manager.init()
        self.qos_plugin = directory.get_plugin(constants.QOS)

        self.qos_plugin.driver_manager = mock.Mock()

        self.rpc_push = mock.patch('neutron.api.rpc.handlers.resources_rpc'
                                   '.ResourcesPushRpcApi.push').start()

        self.ctxt = context.Context('fake_user', 'fake_tenant')
        mock.patch.object(self.ctxt.session, 'refresh').start()
        mock.patch.object(self.ctxt.session, 'expunge').start()

        self.policy_data = {
            'policy': {'id': uuidutils.generate_uuid(),
                       'project_id': uuidutils.generate_uuid(),
                       'name': 'test-policy',
                       'description': 'Test policy description',
                       'shared': True}}

        self.rule_data = {
            'bandwidth_limit_rule': {'id': uuidutils.generate_uuid(),
                                     'max_kbps': 100,
                                     'max_burst_kbps': 150},
            'dscp_marking_rule': {'id': uuidutils.generate_uuid(),
                                  'dscp_mark': 16},
            'minimum_bandwidth_rule': {
                'id': uuidutils.generate_uuid(),
                'min_kbps': 10}}

        self.policy = policy_object.QosPolicy(
            self.ctxt, **self.policy_data['policy'])

        self.rule = rule_object.QosBandwidthLimitRule(
            self.ctxt, **self.rule_data['bandwidth_limit_rule'])

        self.dscp_rule = rule_object.QosDscpMarkingRule(
            self.ctxt, **self.rule_data['dscp_marking_rule'])

        self.min_rule = rule_object.QosMinimumBandwidthRule(
            self.ctxt, **self.rule_data['minimum_bandwidth_rule'])
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:59,代码来源:test_qos_plugin.py


示例5: setUp

    def setUp(self):
        super(QosRuleTypeObjectTestCase, self).setUp()
        self.config_parse()

        self.setup_coreplugin(load_plugins=False)
        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
        cfg.CONF.set_override("service_plugins", ["qos"])
        manager.init()
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:8,代码来源:test_rule_type.py


示例6: test_service_plugin_is_loaded

    def test_service_plugin_is_loaded(self):
        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
        cfg.CONF.set_override("service_plugins",
                              ["neutron.tests.unit.dummy_plugin."
                               "DummyServicePlugin"])
        manager.init()
        plugin = directory.get_plugin(dummy_plugin.DUMMY_SERVICE_TYPE)

        self.assertIsInstance(
            plugin, dummy_plugin.DummyServicePlugin,
            "loaded plugin should be of type neutronDummyPlugin")
开发者ID:igordcard,项目名称:neutron,代码行数:11,代码来源:test_manager.py


示例7: eventlet_rpc_server

def eventlet_rpc_server():
    LOG.info("Eventlet based AMQP RPC server starting...")

    try:
        manager.init()
        rpc_workers_launcher = service.start_all_workers()
    except NotImplementedError:
        LOG.info("RPC was already started in parent process by "
                 "plugin.")
    else:
        rpc_workers_launcher.wait()
开发者ID:eayunstack,项目名称:neutron,代码行数:11,代码来源:rpc_eventlet.py


示例8: eventlet_rpc_server

def eventlet_rpc_server():
    LOG.info("Eventlet based AMQP RPC server starting...")

    try:
        manager.init()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCES)
        rpc_workers_launcher = service.start_rpc_workers()
    except NotImplementedError:
        LOG.info("RPC was already started in parent process by "
                 "plugin.")
    else:
        rpc_workers_launcher.wait()
开发者ID:cubeek,项目名称:neutron,代码行数:13,代码来源:rpc_eventlet.py


示例9: __init__

    def __init__(self, **local_config):
        mapper = routes_mapper.Mapper()
        manager.init()
        plugin = directory.get_plugin()
        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
        ext_mgr.extend_resources("2.0", attributes.RESOURCE_ATTRIBUTE_MAP)

        col_kwargs = dict(collection_actions=COLLECTION_ACTIONS,
                          member_actions=MEMBER_ACTIONS)

        def _map_resource(collection, resource, params, parent=None):
            allow_bulk = cfg.CONF.allow_bulk
            controller = base.create_resource(
                collection, resource, plugin, params, allow_bulk=allow_bulk,
                parent=parent, allow_pagination=True,
                allow_sorting=True)
            path_prefix = None
            if parent:
                path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
                                                  parent['member_name'],
                                                  collection)
            mapper_kwargs = dict(controller=controller,
                                 requirements=REQUIREMENTS,
                                 path_prefix=path_prefix,
                                 **col_kwargs)
            return mapper.collection(collection, resource,
                                     **mapper_kwargs)

        mapper.connect('index', '/', controller=Index(RESOURCES))
        for resource in RESOURCES:
            _map_resource(RESOURCES[resource], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              RESOURCES[resource], dict()))
            resource_registry.register_resource_by_name(resource)

        for resource in SUB_RESOURCES:
            _map_resource(SUB_RESOURCES[resource]['collection_name'], resource,
                          attributes.RESOURCE_ATTRIBUTE_MAP.get(
                              SUB_RESOURCES[resource]['collection_name'],
                              dict()),
                          SUB_RESOURCES[resource]['parent'])

        # Certain policy checks require that the extensions are loaded
        # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
        # properly initialized. This can only be claimed with certainty
        # once this point in the code has been reached. In the event
        # that the policies have been initialized before this point,
        # calling reset will cause the next policy check to
        # re-initialize with all of the required data in place.
        policy.reset()
        super(APIRouter, self).__init__(mapper)
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:51,代码来源:router.py


示例10: main

def main():
    config.init(sys.argv[1:])
    config.setup_logging()

    cxt = context.get_admin_context()
    manager.init()
    plugin = directory.get_plugin()
    l3_plugin = directory.get_plugin(constants.L3)
    notifier = n_rpc.get_notifier('network')
    for network in plugin.get_networks(cxt):
        notifier.info(cxt, 'network.exists', {'network': network})
    for subnet in plugin.get_subnets(cxt):
        notifier.info(cxt, 'subnet.exists', {'subnet': subnet})
    for port in plugin.get_ports(cxt):
        notifier.info(cxt, 'port.exists', {'port': port})
    for router in l3_plugin.get_routers(cxt):
        notifier.info(cxt, 'router.exists', {'router': router})
    for floatingip in l3_plugin.get_floatingips(cxt):
        notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip})
开发者ID:AradhanaSingh,项目名称:neutron,代码行数:19,代码来源:usage_audit.py


示例11: setUp

    def setUp(self):
        super(TestPortForwardingPlugin, self).setUp()

        with mock.patch.object(
            resource_manager.ResourceCallbacksManager, '_singleton',
            new_callable=mock.PropertyMock(return_value=False)):

            self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager()
            self.prod_mgr = resource_manager.ProducerResourceCallbacksManager()
            for mgr in (self.cons_mgr, self.prod_mgr):
                mgr.clear()

        mock.patch.object(
            cons_registry, '_get_manager', return_value=self.cons_mgr).start()

        mock.patch.object(
            prod_registry, '_get_manager', return_value=self.prod_mgr).start()
        self.setup_coreplugin(load_plugins=False)

        mock.patch('neutron.objects.db.api.create_object').start()
        mock.patch('neutron.objects.db.api.update_object').start()
        mock.patch('neutron.objects.db.api.delete_object').start()
        mock.patch('neutron.objects.db.api.get_object').start()
        # We don't use real models as per mocks above. We also need to mock-out
        # methods that work with real data types
        mock.patch(
            'neutron.objects.base.NeutronDbObject.modify_fields_from_db'
        ).start()

        cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS)
        cfg.CONF.set_override("service_plugins", ["router", "port_forwarding"])

        manager.init()
        self.pf_plugin = directory.get_plugin(lib_plugin_conts.PORTFORWARDING)
        self.ctxt = context.Context('admin', 'fake_tenant')
        mock.patch.object(self.ctxt.session, 'refresh').start()
        mock.patch.object(self.ctxt.session, 'expunge').start()
开发者ID:igordcard,项目名称:neutron,代码行数:37,代码来源:test_pf_plugin.py


示例12: setup_extension

    def setup_extension(self, plugin, service_type,
                        extension_class,
                        resource_prefix, plural_mappings=None,
                        translate_resource_name=False,
                        allow_pagination=False, allow_sorting=False,
                        supported_extension_aliases=None,
                        use_quota=False):

        self._resource_prefix = resource_prefix
        self._plural_mappings = plural_mappings or {}
        self._translate_resource_name = translate_resource_name

        # Ensure existing ExtensionManager is not used
        extensions.PluginAwareExtensionManager._instance = None

        self.useFixture(fixture.APIDefinitionFixture())

        # Create the default configurations
        self.config_parse()

        core_plugin = CORE_PLUGIN if service_type else plugin
        self.setup_coreplugin(core_plugin, load_plugins=False)
        if service_type:
            cfg.CONF.set_override('service_plugins', [plugin])

        self._plugin_patcher = mock.patch(plugin, autospec=True)
        self.plugin = self._plugin_patcher.start()
        instance = self.plugin.return_value
        if service_type:
            instance.get_plugin_type.return_value = service_type
        manager.init()

        if supported_extension_aliases is not None:
            instance.supported_extension_aliases = supported_extension_aliases
        if allow_pagination:
            # instance.__native_pagination_support = True
            native_pagination_attr_name = ("_%s__native_pagination_support"
                                           % instance.__class__.__name__)
            setattr(instance, native_pagination_attr_name, True)
        if allow_sorting:
            # instance.__native_sorting_support = True
            native_sorting_attr_name = ("_%s__native_sorting_support"
                                        % instance.__class__.__name__)
            setattr(instance, native_sorting_attr_name, True)
        if use_quota:
            quota.QUOTAS._driver = None
            cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
                                  group='QUOTAS')
        setattr(instance, 'path_prefix', resource_prefix)

        class ExtensionTestExtensionManager(object):
            def get_resources(self):
                return extension_class.get_resources()

            def get_actions(self):
                return []

            def get_request_extensions(self):
                return []

        ext_mgr = ExtensionTestExtensionManager()
        self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
        self.api = webtest.TestApp(self.ext_mdw)
开发者ID:cubeek,项目名称:neutron,代码行数:63,代码来源:base.py


示例13: test_can_load_core_plugin_without_datastore

 def test_can_load_core_plugin_without_datastore(self):
     cfg.CONF.set_override("core_plugin", 'neutron.tests.unit.dummy_plugin.'
                           'DummyCorePluginWithoutDatastore')
     manager.init()
开发者ID:cubeek,项目名称:neutron,代码行数:4,代码来源:test_neutron_plugin_base_v2.py


示例14: initialize_all

def initialize_all():
    manager.init()
    ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
    ext_mgr.extend_resources("2.0", attributes.RESOURCES)
    # At this stage we have a fully populated resource attribute map;
    # build Pecan controllers and routes for all core resources
    plugin = directory.get_plugin()
    for resource, collection in RESOURCES.items():
        resource_registry.register_resource_by_name(resource)
        new_controller = res_ctrl.CollectionsController(collection, resource,
                                                        plugin=plugin)
        manager.NeutronManager.set_controller_for_resource(
            collection, new_controller)
        manager.NeutronManager.set_plugin_for_resource(collection, plugin)

    pecanized_resources = ext_mgr.get_pecan_resources()
    for pec_res in pecanized_resources:
        manager.NeutronManager.set_controller_for_resource(
            pec_res.collection, pec_res.controller)
        manager.NeutronManager.set_plugin_for_resource(
            pec_res.collection, pec_res.plugin)

    # Now build Pecan Controllers and routes for all extensions
    resources = ext_mgr.get_resources()
    # Extensions controller is already defined, we don't need it.
    resources.pop(0)
    for ext_res in resources:
        path_prefix = ext_res.path_prefix.strip('/')
        collection = ext_res.collection
        # Retrieving the parent resource.  It is expected the format of
        # the parent resource to be:
        # {'collection_name': 'name-of-collection',
        #  'member_name': 'name-of-resource'}
        # collection_name does not appear to be used in the legacy code
        # inside the controller logic, so we can assume we do not need it.
        parent = ext_res.parent or {}
        parent_resource = parent.get('member_name')
        collection_key = collection
        if parent_resource:
            collection_key = '/'.join([parent_resource, collection])
        collection_actions = ext_res.collection_actions
        member_actions = ext_res.member_actions
        if manager.NeutronManager.get_controller_for_resource(collection_key):
            # This is a collection that already has a pecan controller, we
            # do not need to do anything else
            continue
        legacy_controller = getattr(ext_res.controller, 'controller',
                                    ext_res.controller)
        new_controller = None
        if isinstance(legacy_controller, base.Controller):
            resource = legacy_controller.resource
            plugin = legacy_controller.plugin
            attr_info = legacy_controller.attr_info
            member_actions = legacy_controller.member_actions
            pagination = legacy_controller.allow_pagination
            sorting = legacy_controller.allow_sorting
            # NOTE(blogan): legacy_controller and ext_res both can both have
            # member_actions.  the member_actions for ext_res are strictly for
            # routing, while member_actions for legacy_controller are used for
            # handling the request once the routing has found the controller.
            # They're always the same so we will just use the ext_res
            # member_action.
            new_controller = res_ctrl.CollectionsController(
                collection, resource, resource_info=attr_info,
                parent_resource=parent_resource, member_actions=member_actions,
                plugin=plugin, allow_pagination=pagination,
                allow_sorting=sorting, collection_actions=collection_actions)
            # new_controller.collection has replaced hyphens with underscores
            manager.NeutronManager.set_plugin_for_resource(
                new_controller.collection, plugin)
            if path_prefix:
                manager.NeutronManager.add_resource_for_path_prefix(
                    collection, path_prefix)
        else:
            new_controller = utils.ShimCollectionsController(
                collection, None, legacy_controller,
                collection_actions=collection_actions,
                member_actions=member_actions,
                action_status=ext_res.controller.action_status,
                collection_methods=ext_res.collection_methods)

        manager.NeutronManager.set_controller_for_resource(
            collection_key, new_controller)

    # Certain policy checks require that the extensions are loaded
    # and the RESOURCE_ATTRIBUTE_MAP populated before they can be
    # properly initialized. This can only be claimed with certainty
    # once this point in the code has been reached. In the event
    # that the policies have been initialized before this point,
    # calling reset will cause the next policy check to
    # re-initialize with all of the required data in place.
    policy.reset()
开发者ID:cubeek,项目名称:neutron,代码行数:92,代码来源:startup.py


示例15: setUp

 def setUp(self):
     super(TestPluginWorker, self).setUp()
     self.setup_coreplugin('ml2', load_plugins=False)
     self._plugin_patcher = mock.patch(TARGET_PLUGIN, autospec=True)
     self.plugin = self._plugin_patcher.start()
     manager.init()
开发者ID:igordcard,项目名称:neutron,代码行数:6,代码来源:test_server.py



注:本文中的neutron.manager.init函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python manager.NeutronManager类代码示例发布时间:2022-05-27
下一篇:
Python utils.generate_pools函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap