• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python bucket_helper.BucketOperationHelper类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中membase.helper.bucket_helper.BucketOperationHelper的典型用法代码示例。如果您正苦于以下问题:Python BucketOperationHelper类的具体用法?Python BucketOperationHelper怎么用?Python BucketOperationHelper使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



在下文中一共展示了BucketOperationHelper类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: _create_plasma_buckets

 def _create_plasma_buckets(self):
     for bucket in self.buckets:
         if bucket.name.startswith("standard"):
             BucketOperationHelper.delete_bucket_or_assert(
                 serverInfo=self.dgmServer, bucket=bucket.name)
     self.buckets = [bu for bu in self.buckets if not bu.name.startswith("standard")]
     buckets = []
     for i in range(self.num_plasma_buckets):
         name = "plasma_dgm_" + str(i)
         buckets.append(name)
     bucket_size = self._get_bucket_size(self.quota,
                                         len(self.buckets)+len(buckets))
     self._create_buckets(server=self.master, bucket_list=buckets,
                          bucket_size=bucket_size)
     testuser = []
     rolelist = []
     for bucket in buckets:
         testuser.append({'id': bucket, 'name': bucket, 'password': 'password'})
         rolelist.append({'id': bucket, 'name': bucket, 'roles': 'admin'})
     self.add_built_in_server_user(testuser=testuser, rolelist=rolelist)
     buckets = []
     for bucket in self.buckets:
         if bucket.name.startswith("plasma_dgm"):
             buckets.append(bucket)
     return buckets
开发者ID:arod1987,项目名称:testrunner,代码行数:25,代码来源:plasma_data_size.py


示例2: common_setup

 def common_setup(input, testcase):
     servers = input.servers
     RemoteUtilHelper.common_basic_setup(servers)
     BucketOperationHelper.delete_all_buckets_or_assert(servers, testcase)
     for server in servers:
         ClusterOperationHelper.cleanup_cluster([server])
     ClusterHelper.wait_for_ns_servers_or_assert(servers, testcase)
开发者ID:jchris,项目名称:testrunner,代码行数:7,代码来源:failovertests.py


示例3: setUp

 def setUp(self):
     super(SGConfigTests, self).setUp()
     for server in self.servers:
         if self.case_number == 1:
             with open('pytests/sg/resources/gateway_config_walrus_template.json', 'r') as file:
                 filedata = file.read()
                 filedata = filedata.replace('LOCAL_IP', server.ip)
             with open('pytests/sg/resources/gateway_config_walrus.json', 'w') as file:
                 file.write(filedata)
             shell = RemoteMachineShellConnection(server)
             shell.execute_command("rm -rf {0}/tmp/*".format(self.folder_prefix))
             shell.copy_files_local_to_remote('pytests/sg/resources', '{0}/tmp'.format(self.folder_prefix))
             # will install sg only the first time
             self.install(shell)
             pid = self.is_sync_gateway_process_running(shell)
             self.assertNotEqual(pid, 0)
             exist = shell.file_exists('{0}/tmp/'.format(self.folder_prefix), 'gateway.log')
             self.assertTrue(exist)
             shell.disconnect()
     if self.case_number == 1:
         shutil.copy2('pytests/sg/resources/gateway_config_backup.json', 'pytests/sg/resources/gateway_config.json')
         BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
         self.cluster = Cluster()
         self.cluster.create_default_bucket(self.master, 150)
         task = self.cluster.async_create_sasl_bucket(self.master, 'test_%E-.5', 'password', 150, 1)
         task.result()
         task = self.cluster.async_create_standard_bucket(self.master, 'db', 11219, 150, 1)
         task.result()
开发者ID:EricACooper,项目名称:testrunner,代码行数:28,代码来源:sgconfigtests.py


示例4: common_tearDown

    def common_tearDown(servers, testcase):
        log = logger.Logger.get_logger()
        log.info(
            "==============  common_tearDown was started for test #{0} {1} ==============".format(
                testcase.case_number, testcase._testMethodName
            )
        )
        RemoteUtilHelper.common_basic_setup(servers)

        log.info("10 seconds delay to wait for couchbase-server to start")
        time.sleep(10)
        ClusterOperationHelper.wait_for_ns_servers_or_assert(
            servers, testcase, wait_time=AutoFailoverBaseTest.MAX_FAIL_DETECT_TIME * 15, wait_if_warmup=True
        )
        try:
            rest = RestConnection(self._servers[0])
            buckets = rest.get_buckets()
            for bucket in buckets:
                MemcachedClientHelper.flush_bucket(servers[0], bucket.name)
        except Exception:
            pass
        BucketOperationHelper.delete_all_buckets_or_assert(servers, testcase)
        ClusterOperationHelper.cleanup_cluster(servers)
        log.info(
            "==============  common_tearDown was finished for test #{0} {1} ==============".format(
                testcase.case_number, testcase._testMethodName
            )
        )
开发者ID:jason-hou,项目名称:testrunner,代码行数:28,代码来源:autofailovertests.py


示例5: test_backup_with_spatial_data

    def test_backup_with_spatial_data(self):
        num_docs = self.helper.input.param("num-docs", 5000)
        self.log.info("description : Make limit queries on a simple "
                      "dataset with {0} docs".format(num_docs))
        data_set = SimpleDataSet(self.helper, num_docs)
        data_set.add_limit_queries()
        self._query_test_init(data_set)

        if not self.command_options:
            self.command_options = []
        options = self.command_options + [' -m full']

        self.total_backups = 1
        self.shell.execute_cluster_backup(self.couchbase_login_info, self.backup_location, options)
        time.sleep(2)

        self.buckets = RestConnection(self.master).get_buckets()
        bucket_names = [bucket.name for bucket in self.buckets]
        BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
        gc.collect()

        self.helper._create_default_bucket()
        self.shell.restore_backupFile(self.couchbase_login_info, self.backup_location, bucket_names)

        SimpleDataSet(self.helper, num_docs)._create_views()
        self._query_test_init(data_set)
开发者ID:EricACooper,项目名称:testrunner,代码行数:26,代码来源:ibr.py


示例6: tearDown

 def tearDown(self):
         try:
             if (hasattr(self, '_resultForDoCleanups') and len(self._resultForDoCleanups.failures) > 0 \
                 and TestInputSingleton.input.param("stop-on-failure", False))\
                     or self.input.param("skip_cleanup", False):
                 self.log.warn("CLEANUP WAS SKIPPED")
             else:
                 self.log.info("==============  basetestcase cleanup was started for test #{0} {1} =============="\
                       .format(self.case_number, self._testMethodName))
                 rest = RestConnection(self.master)
                 alerts = rest.get_alerts()
                 if alerts is not None and len(alerts) != 0:
                     self.log.warn("Alerts were found: {0}".format(alerts))
                 if rest._rebalance_progress_status() == 'running':
                     self.log.warning("rebalancing is still running, test should be verified")
                     stopped = rest.stop_rebalance()
                     self.assertTrue(stopped, msg="unable to stop rebalance")
                 BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
                 ClusterOperationHelper.cleanup_cluster(self.servers)
                 self.sleep(10)
                 ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
                 self.log.info("==============  basetestcase cleanup was finished for test #{0} {1} =============="\
                       .format(self.case_number, self._testMethodName))
         except BaseException:
             # increase case_number to retry tearDown in setup for the next test
             self.case_number += 1000
         finally:
             # stop all existing task manager threads
             self.cluster.shutdown()
             self._log_finish(self)
开发者ID:strategist922,项目名称:testrunner,代码行数:30,代码来源:basetestcase.py


示例7: cleanup_cluster

 def cleanup_cluster(self):
     if not "skip_cleanup" in TestInputSingleton.input.test_params:
         BucketOperationHelper.delete_all_buckets_or_assert(
             self.servers, self.testcase)
         ClusterOperationHelper.cleanup_cluster(self.servers)
         ClusterOperationHelper.wait_for_ns_servers_or_assert(
             self.servers, self.testcase)
开发者ID:EricACooper,项目名称:testrunner,代码行数:7,代码来源:subdoc_helper.py


示例8: tearDown

 def tearDown(self):
     if not self.input.param("skip_cleanup", True):
         if self.times_teardown_called > 1 :
             self.shell.disconnect()
     if self.input.param("skip_cleanup", True):
         if self.case_number > 1 or self.times_teardown_called > 1:
             self.shell.disconnect()
     self.times_teardown_called += 1
     serverInfo = self.servers[0]
     rest = RestConnection(serverInfo)
     zones = rest.get_zone_names()
     for zone in zones:
         if zone != "Group 1":
             rest.delete_zone(zone)
     self.clusters_dic = self.input.clusters
     if self.clusters_dic:
         if len(self.clusters_dic) > 1:
             self.dest_nodes = self.clusters_dic[1]
             self.dest_master = self.dest_nodes[0]
             if self.dest_nodes and len(self.dest_nodes) > 1:
                 self.log.info("======== clean up destination cluster =======")
                 rest = RestConnection(self.dest_nodes[0])
                 rest.remove_all_remote_clusters()
                 rest.remove_all_replications()
                 BucketOperationHelper.delete_all_buckets_or_assert(self.dest_nodes, self)
                 ClusterOperationHelper.cleanup_cluster(self.dest_nodes)
         elif len(self.clusters_dic) == 1:
             self.log.error("=== need 2 cluster to setup xdcr in ini file ===")
     else:
         self.log.info("**** If run xdcr test, need cluster config is setup in ini file. ****")
     super(CliBaseTest, self).tearDown()
开发者ID:arod1987,项目名称:testrunner,代码行数:31,代码来源:cli_base.py


示例9: common_setup

    def common_setup(self, replica):
        self._input = TestInputSingleton.input
        self._servers = self._input.servers
        first = self._servers[0]
        self.log = logger.Logger().get_logger()
        self.log.info(self._input)
        rest = RestConnection(first)
        for server in self._servers:
            RestHelper(RestConnection(server)).is_ns_server_running()

        ClusterOperationHelper.cleanup_cluster(self._servers)
        BucketOperationHelper.delete_all_buckets_or_assert(self._servers, self)
        ClusterOperationHelper.add_all_nodes_or_assert(self._servers[0], self._servers, self._input.membase_settings, self)
        nodes = rest.node_statuses()
        otpNodeIds = []
        for node in nodes:
            otpNodeIds.append(node.id)
        info = rest.get_nodes_self()
        bucket_ram = info.mcdMemoryReserved * 3 / 4
        rest.create_bucket(bucket="default",
                           ramQuotaMB=int(bucket_ram),
                           replicaNumber=replica,
                           proxyPort=rest.get_nodes_self().moxi)
        msg = "wait_for_memcached fails"
        ready = BucketOperationHelper.wait_for_memcached(first, "default"),
        self.assertTrue(ready, msg)
        rebalanceStarted = rest.rebalance(otpNodeIds, [])
        self.assertTrue(rebalanceStarted,
                        "unable to start rebalance on master node {0}".format(first.ip))
        self.log.info('started rebalance operation on master node {0}'.format(first.ip))
        rebalanceSucceeded = rest.monitorRebalance()
        # without a bucket this seems to fail
        self.assertTrue(rebalanceSucceeded,
                        "rebalance operation for nodes: {0} was not successful".format(otpNodeIds))
        self.awareness = VBucketAwareMemcached(rest, "default")
开发者ID:steveyen,项目名称:testrunner,代码行数:35,代码来源:syncreplicationtests.py


示例10: setUp

    def setUp(self):
        self.log = logger.Logger.get_logger()
        self.master = TestInputSingleton.input.servers[0]
        self.input = TestInputSingleton.input
        self.servers = self.input.servers
        self.num_of_docs = self.input.param("num_of_docs", 1000)

        rest = RestConnection(self.master)
        for server in self.servers:
            rest.init_cluster(server.rest_username, server.rest_password)

        info = rest.get_nodes_self()

        for server in self.servers:
            rest.init_cluster_memoryQuota(
                server.rest_username, server.rest_password, memoryQuota=info.mcdMemoryReserved
            )

        ClusterOperationHelper.cleanup_cluster(self.servers)
        BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
        self._create_default_bucket()

        # Rebalance the nodes
        ClusterOperationHelper.begin_rebalance_in(self.master, self.servers)
        ClusterOperationHelper.end_rebalance(self.master)
        self._log_start()
开发者ID:paul-guo-,项目名称:appstack,代码行数:26,代码来源:warmupcluster.py


示例11: setUp_bucket

    def setUp_bucket(self, unittest):
        self.log = logger.Logger.get_logger()
        self.input = TestInputSingleton.input
        unittest.assertTrue(self.input, msg="input parameters missing...")
        self.test = unittest
        self.master = self.input.servers[0]
        rest = RestConnection(self.master)
        rest.init_cluster(username=self.master.rest_username, password=self.master.rest_password)
        rest.init_cluster_memoryQuota(memoryQuota=rest.get_nodes_self().mcdMemoryReserved)
        ClusterOperationHelper.cleanup_cluster([self.master])
        BucketOperationHelper.delete_all_buckets_or_assert([self.master], self.test)

        serverInfo = self.master
        rest = RestConnection(serverInfo)
        info = rest.get_nodes_self()
        rest.init_cluster(username=serverInfo.rest_username,
                          password=serverInfo.rest_password)
        rest.init_cluster_memoryQuota(memoryQuota=info.memoryQuota)

        # Add built-in user
        testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'password': 'password'}]
        RbacBase().create_user_source(testuser, 'builtin', self.master)
        time.sleep(10)

        # Assign user to role
        role_list = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'roles': 'admin'}]
        RbacBase().add_user_role(role_list, RestConnection(self.master), 'builtin')
        time.sleep(10)
开发者ID:arod1987,项目名称:testrunner,代码行数:28,代码来源:setgettests.py


示例12: cleanup

 def cleanup(self):
     rest = RestConnection(self.master)
     rest.stop_rebalance()
     BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
     for server in self.servers:
         ClusterOperationHelper.cleanup_cluster([server])
     ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
开发者ID:Boggypop,项目名称:testrunner,代码行数:7,代码来源:moxi.py


示例13: setUp

    def setUp(self):
        self.log = logger.Logger().get_logger()
        self.input = TestInputSingleton.input
        self.servers = self.input.servers
        self.master = self.servers[0]
        self.ip = self.master.ip
        self.finished = False
        self.keys = []
        self.keycount = 0
        self.failure_string = ""

        self.cleanup()

        rest = RestConnection(self.master)
        info = rest.get_nodes_self()
        self.port = info.moxi+1

        rest.init_cluster(username=self.master.rest_username,
                          password=self.master.rest_password)
        rest.init_cluster_memoryQuota(memoryQuota=info.mcdMemoryReserved)
        created = BucketOperationHelper.create_multiple_buckets(self.master,
                                                                replica=1,
                                                                bucket_ram_ratio=(2.0 / 3.0),
                                                                howmany=10,
                                                                sasl=False)
        self.assertTrue(created, "bucket creation failed")

        ready = BucketOperationHelper.wait_for_memcached(self.master, "bucket-0")
        self.assertTrue(ready, "wait_for_memcached failed")
开发者ID:Boggypop,项目名称:testrunner,代码行数:29,代码来源:moxi.py


示例14: tearDown

 def tearDown(self):
     try:
         test_failed = len(self._resultForDoCleanups.errors)
         if self.driver and test_failed:
             BaseHelper(self).create_screenshot()
         if self.driver:
             self.driver.close()
         if test_failed and TestInputSingleton.input.param("stop-on-failure", False):
             print "test fails, teardown will be skipped!!!"
             return
         rest = RestConnection(self.servers[0])
         try:
             reb_status = rest._rebalance_progress_status()
         except ValueError as e:
             if e.message == 'No JSON object could be decoded':
                 print "cluster not initialized!!!"
                 return
         if reb_status == 'running':
             stopped = rest.stop_rebalance()
             self.assertTrue(stopped, msg="unable to stop rebalance")
         BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
         for server in self.servers:
             ClusterOperationHelper.cleanup_cluster([server])
         ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
     except Exception as e:
         raise e
     finally:
         if self.driver:
             self.shell.disconnect()
开发者ID:arod1987,项目名称:testrunner,代码行数:29,代码来源:uibasetest.py


示例15: tearDown

 def tearDown(self):
     try:
         if self.driver:
             path_screen = self.input.ui_conf['screenshots'] or 'logs/screens'
             full_path = '{1}/screen_{0}.png'.format(time.time(), path_screen)
             self.log.info('screenshot is available: %s' % full_path)
             if not os.path.exists(path_screen):
                 os.mkdir(path_screen)
             self.driver.get_screenshot_as_file(os.path.abspath(full_path))
         rest = RestConnection(self.servers[0])
         if rest._rebalance_progress_status() == 'running':
             stopped = rest.stop_rebalance()
             self.assertTrue(stopped, msg="unable to stop rebalance")
         BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
         for server in self.servers:
             ClusterOperationHelper.cleanup_cluster([server])
         ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
         if self.driver:
             self.driver.close()
     except Exception as e:
         raise e
     finally:
         if self.driver:
             self.shell.disconnect()
         self.cluster.shutdown()
开发者ID:DavidAlphaFox,项目名称:couchbase,代码行数:25,代码来源:uibasetest.py


示例16: _cluster_setup

    def _cluster_setup(self):
        replicas = self.input.param("replicas", 1)
        keys_count = self.input.param("keys-count", 0)
        num_buckets = self.input.param("num-buckets", 1)

        bucket_name = "default"
        master = self.servers[0]
        credentials = self.input.membase_settings
        rest = RestConnection(self.master)
        info = rest.get_nodes_self()
        rest.init_cluster(username=self.master.rest_username,
                          password=self.master.rest_password)
        rest.init_cluster_memoryQuota(memoryQuota=info.mcdMemoryReserved)
        rest.reset_autofailover()
        ClusterOperationHelper.add_and_rebalance(self.servers, True)

        if num_buckets == 1:
            bucket_ram = info.memoryQuota * 2 / 3
            rest.create_bucket(bucket=bucket_name,
                               ramQuotaMB=bucket_ram,
                               replicaNumber=replicas,
                               proxyPort=info.moxi)
        else:
            created = BucketOperationHelper.create_multiple_buckets(self.master, replicas, howmany=num_buckets)
            self.assertTrue(created, "unable to create multiple buckets")

        buckets = rest.get_buckets()
        for bucket in buckets:
                ready = BucketOperationHelper.wait_for_memcached(self.master, bucket.name)
                self.assertTrue(ready, msg="wait_for_memcached failed")

        for bucket in buckets:
            inserted_keys_cnt = self.load_data(self.master, bucket.name, keys_count)
            log.info('inserted {0} keys'.format(inserted_keys_cnt))
开发者ID:arod1987,项目名称:testrunner,代码行数:34,代码来源:autofailovertests.py


示例17: reset

 def reset(self):
     self.log.info(
         "==============  SwapRebalanceBase cleanup was started for test #{0} {1} ==============".format(
             self.case_number, self._testMethodName
         )
     )
     self.log.info("Stopping load in Teardown")
     SwapRebalanceBase.stop_load(self.loaders)
     for server in self.servers:
         rest = RestConnection(server)
         if rest._rebalance_progress_status() == "running":
             self.log.warning("rebalancing is still running, test should be verified")
             stopped = rest.stop_rebalance()
             self.assertTrue(stopped, msg="unable to stop rebalance")
     BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
     for server in self.servers:
         ClusterOperationHelper.cleanup_cluster([server])
         if server.data_path:
             rest = RestConnection(server)
             rest.set_data_path(data_path=server.data_path)
     ClusterOperationHelper.wait_for_ns_servers_or_assert(self.servers, self)
     self.log.info(
         "==============  SwapRebalanceBase cleanup was finished for test #{0} {1} ==============".format(
             self.case_number, self._testMethodName
         )
     )
开发者ID:jason-hou,项目名称:testrunner,代码行数:26,代码来源:swaprebalance.py


示例18: setUp

 def setUp(self):
     self.log = logger.Logger.get_logger()
     self.input = TestInputSingleton.input
     self.assertTrue(self.input, msg="input parameters missing...")
     self.servers = self.input.servers
     self.master = self.servers[0]
     rest = RestConnection(self.master)
     rest.init_cluster(username=self.master.rest_username,
                       password=self.master.rest_password)
     info = rest.get_nodes_self()
     node_ram_ratio = BucketOperationHelper.base_bucket_ratio(self.servers)
     rest.init_cluster_memoryQuota(memoryQuota=int(info.mcdMemoryReserved * node_ram_ratio))
     BucketOperationHelper.delete_all_buckets_or_assert(servers=self.servers, test_case=self)
     ClusterOperationHelper.cleanup_cluster(servers=self.servers)
     credentials = self.input.membase_settings
     ClusterOperationHelper.add_all_nodes_or_assert(master=self.master, all_servers=self.servers, rest_settings=credentials, test_case=self)
     rest = RestConnection(self.master)
     nodes = rest.node_statuses()
     otpNodeIds = []
     for node in nodes:
         otpNodeIds.append(node.id)
     rebalanceStarted = rest.rebalance(otpNodeIds, [])
     self.assertTrue(rebalanceStarted,
                     "unable to start rebalance on master node {0}".format(self.master.ip))
     self.log.info('started rebalance operation on master node {0}'.format(self.master.ip))
     rebalanceSucceeded = rest.monitorRebalance()
开发者ID:jchris,项目名称:testrunner,代码行数:26,代码来源:createtests.py


示例19: test_oom_delete_bucket

 def test_oom_delete_bucket(self):
     """
     1. Get OOM
     2. Delete a bucket
     3. Verify if state of indexes is changed
     :return:
     """
     self.assertTrue(self._push_indexer_off_the_cliff(), "OOM Can't be achieved")
     for i in range(len(self.buckets)):
         log.info("Deleting bucket {0}...".format(self.buckets[i].name))
         BucketOperationHelper.delete_bucket_or_assert(serverInfo=self.oomServer, bucket=self.buckets[i].name)
         self.sleep(120)
         check = self._validate_indexer_status_oom()
         if not check:
             if i < len(self.buckets):
                 self.buckets = self.buckets[i+1:]
             else:
                 #TODO: Pras: Need better solution here
                 self.buckets = []
             break
         log.info("Indexer Still in OOM...")
     self.sleep(120)
     self.assertFalse(self._validate_indexer_status_oom(), "Indexer still in OOM")
     self._verify_bucket_count_with_index_count(self.load_query_definitions)
     self.multi_query_using_index(buckets=self.buckets,
                     query_definitions=self.load_query_definitions)
开发者ID:chethanrao,项目名称:testrunner-archive,代码行数:26,代码来源:memdb_oom_2i.py


示例20: setUp

 def setUp(self):
     super(XDCRTests, self).setUp()
     self.bucket = Bucket()
     self._initialize_nodes()
     self.master = self.servers[0]
     for server in self.servers:
         rest=RestConnection(server)
         cluster_status = rest.cluster_status()
         self.log.info("Initial status of {0} cluster is {1}".format(server.ip,
                                                                     cluster_status['nodes'][0]['status']))
         while cluster_status['nodes'][0]['status'] == 'warmup':
             self.log.info("Waiting for cluster to become healthy")
             self.sleep(5)
             cluster_status = rest.cluster_status()
         self.log.info("current status of {0}  is {1}".format(server.ip,
                                                              cluster_status['nodes'][0]['status']))
     # Delete all buckets before creating new buckets
     self.log.info("Deleting all existing buckets")
     BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
     self.log.info("Creating new buckets")
     src_bucket = self.input.param('src_bucket', self.bucket)
     dest_bucket = self.input.param('dest_bucket', self.bucket)
     if src_bucket:
         RestConnection(self.servers[0]).create_bucket(bucket='default', ramQuotaMB=500)
     if dest_bucket:
         RestConnection(self.servers[1]).create_bucket(bucket='default', ramQuotaMB=500)
     helper = BaseHelper(self)
     helper.login()
开发者ID:EricACooper,项目名称:testrunner,代码行数:28,代码来源:uixdcrtests.py



注:本文中的membase.helper.bucket_helper.BucketOperationHelper类示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python cluster_helper.ClusterOperationHelper类代码示例发布时间:2022-05-27
下一篇:
Python rest_client.RestHelper类代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap