• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python config.CONFIG类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中trove.tests.config.CONFIG的典型用法代码示例。如果您正苦于以下问题:Python CONFIG类的具体用法?Python CONFIG怎么用?Python CONFIG使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了CONFIG类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: create_client_args

def create_client_args(user):

    auth_strategy = None

    kwargs = {"service_type": "trove", "insecure": CONFIG.values["trove_client_insecure"]}

    def set_optional(kwargs_name, test_conf_name):
        value = CONFIG.values.get(test_conf_name, None)
        if value is not None:
            kwargs[kwargs_name] = value

    service_url = CONFIG.get("override_trove_api_url", None)
    if user.requirements.is_admin:
        service_url = CONFIG.get("override_admin_trove_api_url", service_url)
    if service_url:
        kwargs["service_url"] = service_url

    auth_strategy = None
    if user.requirements.is_admin:
        auth_strategy = CONFIG.get("admin_auth_strategy", CONFIG.auth_strategy)
    else:
        auth_strategy = CONFIG.auth_strategy
    set_optional("region_name", "trove_client_region_name")
    if CONFIG.values.get("override_trove_api_url_append_tenant", False):
        kwargs["service_url"] += "/" + user.tenant

    if auth_strategy == "fake":
        from troveclient.compat import auth

        class FakeAuth(auth.Authenticator):
            def authenticate(self):
                class FakeCatalog(object):
                    def __init__(self, auth):
                        self.auth = auth

                    def get_public_url(self):
                        return "%s/%s" % (CONFIG.dbaas_url, self.auth.tenant)

                    def get_token(self):
                        return self.auth.tenant

                return FakeCatalog(self)

        auth_strategy = FakeAuth

    if auth_strategy:
        kwargs["auth_strategy"] = auth_strategy

    if not user.requirements.is_admin:
        auth_url = CONFIG.trove_auth_url
    else:
        auth_url = CONFIG.values.get("trove_admin_auth_url", CONFIG.trove_auth_url)

    if CONFIG.values.get("trove_client_cls"):
        cls_name = CONFIG.trove_client_cls
        kwargs["client_cls"] = import_class(cls_name)

    kwargs["tenant"] = user.tenant
    kwargs["auth_url"] = auth_url
    return (user.auth_user, user.auth_key), kwargs
开发者ID:pombredanne,项目名称:RDS,代码行数:60,代码来源:snippets.py


示例2: main

def main(import_func):
    try:
        wsgi_install()
        add_support_for_localization()
        # Load Trove app
        # Paste file needs absolute path
        config_file = os.path.realpath('etc/trove/trove.conf.test')
        # 'etc/trove/test-api-paste.ini'
        app = initialize_trove(config_file)
        # Initialize sqlite database.
        initialize_database()
        # Swap out WSGI, httplib, and other components with test doubles.
        initialize_fakes(app)

        # Initialize the test configuration.
        test_config_file, repl = parse_args_for_test_config()
        CONFIG.load_from_file(test_config_file)

        import_func()

        from trove.tests.util import event_simulator
        event_simulator.run_main(functools.partial(run_tests, repl))

    except Exception as e:
        # Printing the error manually like this is necessary due to oddities
        # with sys.excepthook.
        print("Run tests failed: %s" % e)
        traceback.print_exc()
        raise
开发者ID:cretta,项目名称:trove,代码行数:29,代码来源:run_tests.py


示例3: expected_default_datastore_configs

 def expected_default_datastore_configs():
     """Returns the expected test configurations for the default datastore
     defined in the Test Config as dbaas_datastore.
     """
     default_datatstore = CONFIG.get('dbaas_datastore', None)
     datastore_test_configs = CONFIG.get(default_datatstore, {})
     return datastore_test_configs.get("configurations", {})
开发者ID:tangguochang,项目名称:trove,代码行数:7,代码来源:configurations.py


示例4: test_delete

    def test_delete(self):
        if do_not_delete_instance():
            CONFIG.get_report().log("TESTS_DO_NOT_DELETE_INSTANCE=True was "
                                    "specified, skipping delete...")
            raise SkipTest("TESTS_DO_NOT_DELETE_INSTANCE was specified.")
        global dbaas
        if not hasattr(instance_info, "initial_result"):
            raise SkipTest("Instance was never created, skipping test...")
        # Update the report so the logs inside the instance will be saved.
        CONFIG.get_report().update()
        dbaas.instances.delete(instance_info.id)
        instance_info.deleted_at = timeutils.utcnow().isoformat()

        attempts = 0
        try:
            time.sleep(1)
            result = True
            while result is not None:
                attempts += 1
                time.sleep(1)
                result = dbaas.instances.get(instance_info.id)
                assert_equal(200, dbaas.last_http_code)
                assert_equal("SHUTDOWN", result.status)
        except exceptions.NotFound:
            pass
        except Exception as ex:
            fail("A failure occured when trying to GET instance %s for the %d"
                 " time: %s" % (str(instance_info.id), attempts, str(ex)))
开发者ID:jeredding,项目名称:trove,代码行数:28,代码来源:instances.py


示例5: __cb

 def __cb(*args, **kwargs):
     # While %s turns a var into a string but in some rare cases explicit
     # str() is less likely to raise an exception.
     arg_strs = [repr(arg) for arg in args]
     arg_strs += ["%s=%s" % (repr(key), repr(value)) for (key, value) in kwargs.items()]
     CONFIG.get_reporter().log("[RDC] Calling : %s(%s)..." % (name, ",".join(arg_strs)))
     value = func(*args, **kwargs)
     CONFIG.get_reporter.log("[RDC]     returned %s." % str(value))
     return value
开发者ID:ReVolly,项目名称:trove,代码行数:9,代码来源:client.py


示例6: resize_should_not_delete_users

 def resize_should_not_delete_users(self):
     """Resize should not delete users."""
     # Resize has an incredibly weird bug where users are deleted after
     # a resize. The code below is an attempt to catch this while proceeding
     # with the rest of the test (note the use of runs_after).
     if USE_IP:
         self.connection.connect()
         if not self.connection.is_connected():
             # Ok, this is def. a failure, but before we toss up an error
             # lets recreate to see how far we can get.
             CONFIG.get_report().log("Having to recreate the test_user! Resizing killed it!")
             self.log_current_users()
             self.create_user()
             fail("Somehow, the resize made the test user disappear.")
开发者ID:ReVolly,项目名称:trove,代码行数:14,代码来源:instances_actions.py


示例7: run_initialized_instance_create

    def run_initialized_instance_create(
            self, with_dbs=True, with_users=True, configuration_id=None,
            expected_states=['BUILD', 'ACTIVE'], expected_http_code=200):
        # TODO(pmalik): Instance create should return 202 Accepted (cast)
        # rather than 200 OK (call).
        name = self.instance_info.name
        flavor = self._get_instance_flavor()
        trove_volume_size = CONFIG.get('trove_volume_size', 1)
        self.init_inst_dbs = (self.test_helper.get_valid_database_definitions()
                              if with_dbs else [])
        self.init_inst_users = (self.test_helper.get_valid_user_definitions()
                                if with_users else [])
        if configuration_id:
            self.init_config_group_id = configuration_id

        if (self.init_inst_dbs or self.init_inst_users or
                self.init_config_group_id):
            info = self.assert_instance_create(
                name, flavor, trove_volume_size,
                self.init_inst_dbs, self.init_inst_users,
                self.init_config_group_id, None,
                CONFIG.dbaas_datastore, CONFIG.dbaas_datastore_version,
                expected_states, expected_http_code)

            self.init_inst_id = info.id
        else:
            # There is no need to run this test as it's effectively the same as
            # the empty instance test.
            raise SkipTest("No testable initial properties provided.")
开发者ID:magictour,项目名称:trove,代码行数:29,代码来源:instance_create_runners.py


示例8: run_initialized_instance_create

    def run_initialized_instance_create(
            self, with_dbs=True, with_users=True, configuration_id=None,
            expected_states=['BUILD', 'ACTIVE'], expected_http_code=200,
            create_helper_user=True):
        name = self.instance_info.name + '_init'
        flavor = self._get_instance_flavor()
        trove_volume_size = CONFIG.get('trove_volume_size', 1)
        self.init_inst_dbs = (self.test_helper.get_valid_database_definitions()
                              if with_dbs else [])
        self.init_inst_users = (self.test_helper.get_valid_user_definitions()
                                if with_users else [])
        if configuration_id:
            self.init_config_group_id = configuration_id

        if self.is_using_existing_instance:
            raise SkipTest("Using existing instance.")

        if (self.init_inst_dbs or self.init_inst_users or
                self.init_config_group_id):
            info = self.assert_instance_create(
                name, flavor, trove_volume_size,
                self.init_inst_dbs, self.init_inst_users,
                self.init_config_group_id, None,
                CONFIG.dbaas_datastore, CONFIG.dbaas_datastore_version,
                expected_states, expected_http_code,
                create_helper_user=create_helper_user)

            self.init_inst_id = info.id
        else:
            # There is no need to run this test as it's effectively the same as
            # the empty instance test.
            raise SkipTest("No testable initial properties provided.")
开发者ID:zjtheone,项目名称:trove,代码行数:32,代码来源:instance_create_runners.py


示例9: test_create

    def test_create(self):
        databases = []
        databases.append({"name": "firstdb", "character_set": "latin2",
                          "collate": "latin2_general_ci"})
        databases.append({"name": "db2"})
        instance_info.databases = databases
        users = []
        users.append({"name": "lite", "password": "litepass",
                      "databases": [{"name": "firstdb"}]})
        instance_info.users = users
        if VOLUME_SUPPORT:
            instance_info.volume = {'size': 1}
        else:
            instance_info.volume = None

        if create_new_instance():
            instance_info.initial_result = dbaas.instances.create(
                instance_info.name,
                instance_info.dbaas_flavor_href,
                instance_info.volume,
                databases,
                users,
                availability_zone="nova")
            assert_equal(200, dbaas.last_http_code)
        else:
            id = existing_instance()
            instance_info.initial_result = dbaas.instances.get(id)

        result = instance_info.initial_result
        instance_info.id = result.id

        report = CONFIG.get_report()
        report.log("Instance UUID = %s" % instance_info.id)
        if create_new_instance():
            assert_equal("BUILD", instance_info.initial_result.status)

        else:
            report.log("Test was invoked with TESTS_USE_INSTANCE_ID=%s, so no "
                       "instance was actually created." % id)

        # Check these attrs only are returned in create response
        expected_attrs = ['created', 'flavor', 'addresses', 'id', 'links',
                          'name', 'status', 'updated']
        if ROOT_ON_CREATE:
            expected_attrs.append('password')
        if VOLUME_SUPPORT:
            expected_attrs.append('volume')
        if CONFIG.trove_dns_support:
            expected_attrs.append('hostname')

        with CheckInstance(result._info) as check:
            if create_new_instance():
                check.attrs_exist(result._info, expected_attrs,
                                  msg="Create response")
            # Don't CheckInstance if the instance already exists.
            check.flavor()
            check.links(result._info['links'])
            if VOLUME_SUPPORT:
                check.volume()
开发者ID:jeredding,项目名称:trove,代码行数:59,代码来源:instances.py


示例10: __init__

 def __init__(self, sleep_time=10, timeout=1200):
     self.def_sleep_time = sleep_time
     self.def_timeout = timeout
     self.instance_info = instance_info
     self.auth_client = create_dbaas_client(self.instance_info.user)
     self.unauth_client = None
     self.report = CONFIG.get_report()
     self._test_helper = None
开发者ID:jjmob,项目名称:trove,代码行数:8,代码来源:test_runners.py


示例11: expected_instance_datastore_configs

 def expected_instance_datastore_configs(instance_id):
     """Given an instance retrieve the expected test configurations for
     instance's datastore.
     """
     instance = instance_info.dbaas.instances.get(instance_id)
     datastore_type = instance.datastore['type']
     datastore_test_configs = CONFIG.get(datastore_type, {})
     return datastore_test_configs.get("configurations", {})
开发者ID:tangguochang,项目名称:trove,代码行数:8,代码来源:configurations.py


示例12: load_config_file

def load_config_file():
    global conf
    if CONFIG.get("examples", None) is None:
        fail("Missing 'examples' config in test config.")
    conf = CONFIG.examples
    global normal_user
    normal_user = CONFIG.users.find_user_by_name(conf['normal_user_name'])
    global admin_user
    admin_user = CONFIG.users.find_user_by_name(conf['admin_user_name'])
开发者ID:Tesora,项目名称:tesora-trove,代码行数:9,代码来源:snippets.py


示例13: call_xmllint

def call_xmllint(name, body):
    try:
        with open(CONFIG.xml_temp_file, "w") as file:
            file.write(body)

        # if CONFIG.get('xml_xsd', None):
        args = [CONFIG.xml_temp_file]
        if CONFIG.get("xml_xsd", None):
            args += ["--schema", CONFIG.xml_xsd]
        processutils.execute(CONFIG.xmllint_bin, *args, check_exit_code=0, shell=False)
    except processutils.ProcessExecutionError as pe:
        fail("Error validating XML! %s" % pe)
开发者ID:ReVolly,项目名称:trove,代码行数:12,代码来源:client.py


示例14: create_instance

 def create_instance(self):
     volume = None
     if VOLUME_SUPPORT:
         volume = {'size': 1}
     nics = None
     shared_network = CONFIG.get('shared_network', None)
     if shared_network:
         nics = [{'net-id': shared_network}]
     initial = self.client.instances.create(self.name, self.flavor_id,
                                            volume, [], [],
                                            nics=nics)
     self.id = initial.id
     self._wait_for_active()
开发者ID:Tesora,项目名称:tesora-trove,代码行数:13,代码来源:instances_mysql_down.py


示例15: __init__

    def __init__(self, sleep_time=10, timeout=1200):
        self.def_sleep_time = sleep_time
        self.def_timeout = timeout

        self.instance_info.name = "TEST_" + datetime.datetime.strftime(
            datetime.datetime.now(), '%Y_%m_%d__%H_%M_%S')
        self.instance_info.dbaas_datastore = CONFIG.dbaas_datastore
        self.instance_info.dbaas_datastore_version = (
            CONFIG.dbaas_datastore_version)
        self.instance_info.user = CONFIG.users.find_user_by_name('alt_demo')
        if self.VOLUME_SUPPORT:
            self.instance_info.volume_size = CONFIG.get('trove_volume_size', 1)
            self.instance_info.volume = {
                'size': self.instance_info.volume_size}
        else:
            self.instance_info.volume_size = None
            self.instance_info.volume = None
        self.instance_info.nics = None
        shared_network = CONFIG.get('shared_network', None)
        if shared_network:
            self.instance_info.nics = [{'net-id': shared_network}]

        self._auth_client = None
        self._unauth_client = None
        self._admin_client = None
        self._swift_client = None
        self._nova_client = None
        self._test_helper = None
        self._servers = {}

        # Attempt to register the main instance.  If it doesn't
        # exist, this will still set the 'report' and 'client' objects
        # correctly in LogOnFail
        inst_ids = []
        if hasattr(self.instance_info, 'id') and self.instance_info.id:
            inst_ids = [self.instance_info.id]
        self.register_debug_inst_ids(inst_ids)
开发者ID:Tesora,项目名称:tesora-trove,代码行数:37,代码来源:test_runners.py


示例16: setUp

    def setUp(self):
        rd_user = test_config.users.find_user(
            Requirements(is_admin=False, services=["trove"]))
        self.rd_client = create_dbaas_client(rd_user)

        self.datastore = self.rd_client.datastores.get(
            test_config.dbaas_datastore)
        self.name1 = "test_instance1"
        self.name2 = "test_instance2"
        self.volume = {'size': 2}
        self.instance_id = None
        self.nics = None
        shared_network = CONFIG.get('shared_network', None)
        if shared_network:
            self.nics = [{'net-id': shared_network}]
开发者ID:Tesora,项目名称:tesora-trove,代码行数:15,代码来源:flavors.py


示例17: __init__

    def __init__(self, sleep_time=10, timeout=1200):
        self.def_sleep_time = sleep_time
        self.def_timeout = timeout

        self.instance_info = instance_info
        instance_info.dbaas_datastore = CONFIG.dbaas_datastore
        instance_info.dbaas_datastore_version = CONFIG.dbaas_datastore_version
        if self.VOLUME_SUPPORT:
            instance_info.volume = {'size': CONFIG.get('trove_volume_size', 1)}
        else:
            instance_info.volume = None

        self.auth_client = create_dbaas_client(self.instance_info.user)
        self.unauth_client = None
        self._test_helper = None
开发者ID:pombredanne,项目名称:trove,代码行数:15,代码来源:test_runners.py


示例18: test_assign_name_to_instance_using_patch

 def test_assign_name_to_instance_using_patch(self):
     # test assigning a name to an instance
     new_name = 'new_name_1'
     report = CONFIG.get_report()
     report.log("instance_info.id: %s" % instance_info.id)
     report.log("instance name:%s" % instance_info.name)
     report.log("instance new name:%s" % new_name)
     instance_info.dbaas.instances.edit(instance_info.id, name=new_name)
     assert_equal(202, instance_info.dbaas.last_http_code)
     check = instance_info.dbaas.instances.get(instance_info.id)
     assert_equal(200, instance_info.dbaas.last_http_code)
     assert_equal(check.name, new_name)
     # Restore instance name
     instance_info.dbaas.instances.edit(instance_info.id,
                                        name=instance_info.name)
     assert_equal(202, instance_info.dbaas.last_http_code)
开发者ID:tangguochang,项目名称:trove,代码行数:16,代码来源:configurations.py


示例19: test_assign_config_and_name_to_instance_using_patch

    def test_assign_config_and_name_to_instance_using_patch(self):
        # test assigning a configuration and name to an instance
        new_name = 'new_name'
        report = CONFIG.get_report()
        report.log("instance_info.id: %s" % instance_info.id)
        report.log("configuration_info: %s" % configuration_info)
        report.log("configuration_info.id: %s" % configuration_info.id)
        report.log("instance name:%s" % instance_info.name)
        report.log("instance new name:%s" % new_name)
        saved_name = instance_info.name
        config_id = configuration_info.id
        instance_info.dbaas.instances.edit(instance_info.id,
                                           configuration=config_id,
                                           name=new_name)
        assert_equal(202, instance_info.dbaas.last_http_code)
        check = instance_info.dbaas.instances.get(instance_info.id)
        assert_equal(200, instance_info.dbaas.last_http_code)
        assert_equal(check.name, new_name)

        # restore instance name
        instance_info.dbaas.instances.edit(instance_info.id,
                                           name=saved_name)
        assert_equal(202, instance_info.dbaas.last_http_code)

        instance = instance_info.dbaas.instances.get(instance_info.id)
        assert_equal('RESTART_REQUIRED', instance.status)
        # restart to be sure configuration is applied
        instance_info.dbaas.instances.restart(instance_info.id)
        assert_equal(202, instance_info.dbaas.last_http_code)
        sleep(2)

        def result_is_active():
            instance = instance_info.dbaas.instances.get(
                instance_info.id)
            if instance.status == "ACTIVE":
                return True
            else:
                assert_equal("REBOOT", instance.status)
                return False
        poll_until(result_is_active)
        # test assigning a configuration to an instance that
        # already has an assigned configuration with patch
        config_id = configuration_info.id
        assert_raises(exceptions.BadRequest,
                      instance_info.dbaas.instances.edit,
                      instance_info.id, configuration=config_id)
开发者ID:tangguochang,项目名称:trove,代码行数:46,代码来源:configurations.py


示例20: run

 def run(self):
     # Print out everything to make it
     print("Looking for config value %s..." % self.service_path_root)
     print(CONFIG.values[self.service_path_root])
     path = self.service_path % CONFIG.values[self.service_path_root]
     print("Path = %s" % path)
     if not os.path.exists(path):
         path = self.alternate_path
     if path is None:
         fail("Could not find path to %s" % self.service_path_root)
     conf_path = str(CONFIG.values[self.conf_file_name])
     cmds = CONFIG.python_cmd_list() + [path] + self.extra_cmds + \
            [conf_path]
     print("Running cmds: %s" % cmds)
     self.service = Service(cmds)
     if not self.service.is_service_alive():
         self.service.start()
开发者ID:Simplit-openapps,项目名称:trove-integration,代码行数:17,代码来源:initialize.py



注:本文中的trove.tests.config.CONFIG类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python util.init_db函数代码示例发布时间:2022-05-27
下一篇:
Python test_utils.get_instance_id_bytenant函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap