• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testing_helpers.build_mocked_server函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testing_helpers.build_mocked_server函数的典型用法代码示例。如果您正苦于以下问题:Python build_mocked_server函数的具体用法?Python build_mocked_server怎么用?Python build_mocked_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了build_mocked_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_concurrent_stop_backup

    def test_concurrent_stop_backup(self, label_mock, stop_mock,):
        """
        Basic test for the start_backup method

        :param label_mock: mimic the response of _write_backup_label
        :param stop_mock: mimic the response of _pgespresso_stop_backup
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("000000060000A25700000044", stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/45000000'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0
        assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py


示例2: test_setup

    def test_setup(self):
        """
        Test the method that set up a recovery
        """
        backup_info = testing_helpers.build_test_backup_info()
        server = testing_helpers.build_mocked_server()
        backup_manager = Mock(server=server, config=server.config)
        executor = RecoveryExecutor(backup_manager)
        backup_info.version = 90300

        # setup should create a temporary directory
        # and teardown should delete it
        ret = executor.setup(backup_info, None, "/tmp")
        assert os.path.exists(ret['tempdir'])
        executor.teardown(ret)
        assert not os.path.exists(ret['tempdir'])

        # no postgresql.auto.conf on version 9.3
        ret = executor.setup(backup_info, None, "/tmp")
        executor.teardown(ret)
        assert "postgresql.auto.conf" not in ret['configuration_files']

        # Check the present for postgresql.auto.conf on version 9.4
        backup_info.version = 90400
        ret = executor.setup(backup_info, None, "/tmp")
        executor.teardown(ret)
        assert "postgresql.auto.conf" in ret['configuration_files']

        # Receive a error if the remote command is invalid
        with pytest.raises(SystemExit):
            executor.server.path = None
            executor.setup(backup_info, "invalid", "/tmp")
开发者ID:gcalacoci,项目名称:barman,代码行数:32,代码来源:test_recovery_executor.py


示例3: test_exclusive_stop_backup

    def test_exclusive_stop_backup(self, stop_mock):
        """
        Basic test for the start_backup method

        :param stop_mock: mimic the response od _pg_stop_backup
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.EXCLUSIVE_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        # Mock executor._pg_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("266/4A9C1EF8",
                                  "00000010000002660000004A",
                                  10231544,
                                  stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.strategy.stop_backup(backup_info)

        # check that the submitted values are stored inside the BackupInfo obj
        assert backup_info.end_xlog == '266/4A9C1EF8'
        assert backup_info.end_wal == '00000010000002660000004A'
        assert backup_info.end_offset == 10231544
        assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:27,代码来源:test_executor.py


示例4: test_backup_info_from_backup_id

 def test_backup_info_from_backup_id(self, tmpdir):
     """
     Test the initialization of a BackupInfo object
     using a backup_id as argument
     """
     # We want to test the loading system using a backup_id.
     # So we create a backup.info file into the tmpdir then
     # we instruct the configuration on the position of the
     # testing backup.info file
     server = build_mocked_server(
         main_conf={
             'basebackups_directory': tmpdir.strpath
         },
     )
     infofile = tmpdir.mkdir('fake_name').join('backup.info')
     infofile.write(BASE_BACKUP_INFO)
     # Load the backup.info file using the backup_id
     b_info = BackupInfo(server, backup_id="fake_name")
     assert b_info
     assert b_info.begin_offset == 40
     assert b_info.begin_wal == '000000010000000000000004'
     assert b_info.timeline == 1
     assert isinstance(b_info.tablespaces, list)
     assert b_info.tablespaces[0].name == 'fake_tbs'
     assert b_info.tablespaces[0].oid == 16384
     assert b_info.tablespaces[0].location == '/fake_tmp/tbs'
开发者ID:zacchiro,项目名称:barman,代码行数:26,代码来源:test_infofile.py


示例5: test_pgespresso_stop_backup

    def test_pgespresso_stop_backup(self, tbs_map_mock, label_mock):
        """
        Basic test for the pgespresso_stop_backup method
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        server.postgres.server_version = 90500
        server.postgres.pgespresso_stop_backup.return_value = {
            'end_wal': "000000060000A25700000044",
            'timestamp': stop_time
        }

        backup_info = build_test_backup_info(timeline=6)
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/44FFFFFF'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0xFFFFFF
        assert backup_info.end_time == stop_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:26,代码来源:test_executor.py


示例6: test_pgespresso_start_backup

    def test_pgespresso_start_backup(self):
        """
        Simple test for _pgespresso_start_backup method
        of the RsyncBackupExecutor class
        """
        # Build and configure a server using a mock
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        backup_label = 'test label'

        # Expect an exception because pgespresso is not installed
        backup_manager.server.pgespresso_installed.return_value = False
        with pytest.raises(Exception):
            backup_manager.executor.pgespresso_start_backup(backup_label)

        # Report pgespresso installed. Expect no error and the correct call
        # sequence
        backup_manager.server.reset_mock()
        backup_manager.executor.server.pgespresso_installed.return_value = True
        backup_manager.executor.strategy._pgespresso_start_backup(backup_label)

        pg_connect = mock.call.pg_connect()
        with_pg_connect = pg_connect.__enter__()
        cursor = with_pg_connect.cursor()
        assert backup_manager.server.mock_calls == [
            pg_connect,
            with_pg_connect,
            pg_connect.__enter__().rollback(),
            cursor,
            cursor.execute(ANY, ANY),
            cursor.fetchone(),
            pg_connect.__exit__(None, None, None)]
开发者ID:forndb,项目名称:barman,代码行数:35,代码来源:test_executor.py


示例7: test_rsync_backup_executor_init

    def test_rsync_backup_executor_init(self):
        """
        Test the construction of a RecoveryExecutor
        """

        # Test
        server = testing_helpers.build_mocked_server()
        backup_manager = Mock(server=server, config=server.config)
        assert RecoveryExecutor(backup_manager)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:9,代码来源:test_recovery_executor.py


示例8: test_postgres_backup_executor_init

    def test_postgres_backup_executor_init(self):
        """
        Test the construction of a PostgresBackupExecutor
        """
        server = build_mocked_server(global_conf={'backup_method': 'postgres'})
        executor = PostgresBackupExecutor(server.backup_manager)
        assert executor
        assert executor.strategy

        # Expect an error if the tablespace_bandwidth_limit option
        # is set for this server.
        server = build_mocked_server(
            global_conf={'backup_method': 'postgres',
                         'tablespace_bandwidth_limit': 1})
        executor = PostgresBackupExecutor(server.backup_manager)
        assert executor
        assert executor.strategy
        assert server.config.disabled
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:18,代码来源:test_executor.py


示例9: test_exclusive_start_backup

    def test_exclusive_start_backup(self):
        """
        Basic test for the start_backup method

        :param start_mock: mock for the _pgespresso_start_backup
        :param start_mock: mock for the pg_start_backup
        """
        # Build a backup_manager using a mocked server
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.EXCLUSIVE_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock server.get_pg_setting('data_directory') call
        backup_manager.server.postgres.get_setting.return_value = '/pg/data'
        # Mock server.get_pg_configuration_files() call
        server.postgres.get_configuration_files.return_value = dict(
            config_file="/etc/postgresql.conf",
            hba_file="/pg/pg_hba.conf",
            ident_file="/pg/pg_ident.conf",
        )
        # Mock server.get_pg_tablespaces() call
        tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
        server.postgres.get_tablespaces.return_value = tablespaces

        # Test 1: start exclusive backup
        # Mock executor.pg_start_backup(label) call
        start_time = datetime.datetime.now()
        server.postgres.start_exclusive_backup.return_value = (
            "A257/44B4C0D8",
            "000000060000A25700000044",
            11845848,
            start_time)

        # Build a test empty backup info
        backup_info = BackupInfo(server=backup_manager.server,
                                 backup_id='fake_id')

        backup_manager.executor.strategy.start_backup(backup_info)

        # Check that all the values are correctly saved inside the BackupInfo
        assert backup_info.pgdata == '/pg/data'
        assert backup_info.config_file == "/etc/postgresql.conf"
        assert backup_info.hba_file == "/pg/pg_hba.conf"
        assert backup_info.ident_file == "/pg/pg_ident.conf"
        assert backup_info.tablespaces == tablespaces
        assert backup_info.status == 'STARTED'
        assert backup_info.timeline == 6
        assert backup_info.begin_xlog == 'A257/44B4C0D8'
        assert backup_info.begin_wal == '000000060000A25700000044'
        assert backup_info.begin_offset == 11845848
        assert backup_info.begin_time == start_time
        # Check that the correct call to pg_start_backup has been made
        server.postgres.start_exclusive_backup.assert_called_with(
            'Barman backup main fake_id')
开发者ID:zacchiro,项目名称:barman,代码行数:56,代码来源:test_executor.py


示例10: test_recovery_window_report

    def test_recovery_window_report(self, caplog):
        """
        Basic unit test of RecoveryWindowRetentionPolicy

        Given a mock simulating a Backup with status DONE and
        the end_date not over the point of recoverability,
        the report method of the RecoveryWindowRetentionPolicy class must mark
        it as valid
        """
        server = build_mocked_server()
        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'RECOVERY WINDOW OF 4 WEEKS')
        assert isinstance(rp, RecoveryWindowRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        backup_source = {'test_backup3': backup_info}
        # Add a obsolete backup
        backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=5)
        backup_source['test_backup2'] = backup_info
        # Add a second obsolete backup
        backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=6)
        backup_source['test_backup'] = backup_info
        rp.server.get_available_backups.return_value = backup_source
        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.config.minimum_redundancy = 1
        rp.server.config.name = "test"
        # execute retention policy report
        report = rp.report()
        # check that our mock is valid for the retention policy
        assert report == {'test_backup3': 'VALID',
                          'test_backup2': 'OBSOLETE',
                          'test_backup': 'OBSOLETE'}

        # Expect a ValueError if passed context is invalid
        with pytest.raises(ValueError):
            rp.report(context='invalid')
        # Set a new minimum_redundancy parameter, enforcing the usage of the
        # configuration parameter instead of the retention policy default
        rp.server.config.minimum_redundancy = 4
        # execute retention policy report
        rp.report()
        # Check for the warning inside the log
        caplog.set_level(logging.WARNING)
        log = caplog.text
        warn = "WARNING  Keeping obsolete backup test_backup2 for " \
               "server test (older than %s) due to minimum redundancy " \
               "requirements (4)\n" % rp._point_of_recoverability()
        assert log.find(warn)
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:56,代码来源:test_retention_policies.py


示例11: test_to_json

    def test_to_json(self, tmpdir):
        server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})

        # Build a fake backup
        backup_dir = tmpdir.mkdir("fake_backup_id")
        info_file = backup_dir.join("backup.info")
        info_file.write(BASE_BACKUP_INFO)
        b_info = BackupInfo(server, backup_id="fake_backup_id")

        # This call should not raise
        assert json.dumps(b_info.to_json())
开发者ID:moench-tegeder,项目名称:barman,代码行数:11,代码来源:test_infofile.py


示例12: test_pgespresso_start_backup

    def test_pgespresso_start_backup(self):
        """
        Test concurrent backup using pgespresso
        """
        # Test: start concurrent backup
        # Build a backup_manager using a mocked server
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        # Mock server.get_pg_setting('data_directory') call
        backup_manager.server.postgres.get_setting.return_value = '/pg/data'
        # Mock server.get_pg_configuration_files() call
        server.postgres.get_configuration_files.return_value = dict(
            config_file="/etc/postgresql.conf",
            hba_file="/pg/pg_hba.conf",
            ident_file="/pg/pg_ident.conf",
        )
        # Mock server.get_pg_tablespaces() call
        tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
        server.postgres.get_tablespaces.return_value = tablespaces
        server.postgres.server_version = 90500

        # Mock executor._pgespresso_start_backup(label) call
        start_time = datetime.datetime.now(tz.tzlocal()).replace(microsecond=0)
        server.postgres.pgespresso_start_backup.return_value = {
            'backup_label':
                "START WAL LOCATION: 266/4A9C1EF8 "
                "(file 00000010000002660000004A)\n"
                "START TIME: %s" % start_time.strftime('%Y-%m-%d %H:%M:%S %Z'),
        }
        # Build a test empty backup info
        backup_info = BackupInfo(server=backup_manager.server,
                                 backup_id='fake_id2')

        backup_manager.executor.strategy.start_backup(backup_info)

        # Check that all the values are correctly saved inside the BackupInfo
        assert backup_info.pgdata == '/pg/data'
        assert backup_info.config_file == "/etc/postgresql.conf"
        assert backup_info.hba_file == "/pg/pg_hba.conf"
        assert backup_info.ident_file == "/pg/pg_ident.conf"
        assert backup_info.tablespaces == tablespaces
        assert backup_info.status == 'STARTED'
        assert backup_info.timeline == 16
        assert backup_info.begin_xlog == '266/4A9C1EF8'
        assert backup_info.begin_wal == '00000010000002660000004A'
        assert backup_info.begin_offset == 10231544
        assert backup_info.begin_time == start_time
        # Check that the correct call to pg_start_backup has been made
        server.postgres.pgespresso_start_backup.assert_called_with(
            'Barman backup main fake_id2')
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:53,代码来源:test_executor.py


示例13: test_from_json

    def test_from_json(self, tmpdir):
        server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})

        # Build a fake backup
        backup_dir = tmpdir.mkdir("fake_backup_id")
        info_file = backup_dir.join("backup.info")
        info_file.write(BASE_BACKUP_INFO)
        b_info = BackupInfo(server, backup_id="fake_backup_id")

        # Build another BackupInfo from the json dump
        new_binfo = BackupInfo.from_json(server, b_info.to_json())

        assert b_info.to_dict() == new_binfo.to_dict()
开发者ID:moench-tegeder,项目名称:barman,代码行数:13,代码来源:test_infofile.py


示例14: test_concurrent_start_backup

    def test_concurrent_start_backup(self):
        """
        Test concurrent backup using 9.6 api
        """
        # Test: start concurrent backup
        # Build a backup_manager using a mocked server
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        # Mock server.get_pg_setting('data_directory') call
        backup_manager.server.postgres.get_setting.return_value = '/pg/data'
        # Mock server.get_pg_configuration_files() call
        server.postgres.get_configuration_files.return_value = dict(
            config_file="/etc/postgresql.conf",
            hba_file="/pg/pg_hba.conf",
            ident_file="/pg/pg_ident.conf",
        )
        # Mock server.get_pg_tablespaces() call
        tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
        server.postgres.get_tablespaces.return_value = tablespaces
        # this is a postgres 9.6
        server.postgres.server_version = 90600

        # Mock call to new api method
        start_time = datetime.datetime.now()
        server.postgres.start_concurrent_backup.return_value = {
            'location': "A257/44B4C0D8",
            'timeline': 6,
            'timestamp': start_time,
        }
        # Build a test empty backup info
        backup_info = BackupInfo(server=backup_manager.server,
                                 backup_id='fake_id2')

        backup_manager.executor.strategy.start_backup(backup_info)

        # Check that all the values are correctly saved inside the BackupInfo
        assert backup_info.pgdata == '/pg/data'
        assert backup_info.config_file == "/etc/postgresql.conf"
        assert backup_info.hba_file == "/pg/pg_hba.conf"
        assert backup_info.ident_file == "/pg/pg_ident.conf"
        assert backup_info.tablespaces == tablespaces
        assert backup_info.status == 'STARTED'
        assert backup_info.timeline == 6
        assert backup_info.begin_xlog == 'A257/44B4C0D8'
        assert backup_info.begin_wal == '000000060000A25700000044'
        assert backup_info.begin_offset == 11845848
        assert backup_info.begin_time == start_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:50,代码来源:test_executor.py


示例15: test_rsync_backup_executor_init

    def test_rsync_backup_executor_init(self):
        """
        Test the construction of a RsyncBackupExecutor
        """

        # Test
        server = build_mocked_server()
        backup_manager = Mock(server=server, config=server.config)
        assert RsyncBackupExecutor(backup_manager)

        # Test exception for the missing ssh_command
        with pytest.raises(SshCommandException):
            server.config.ssh_command = None
            RsyncBackupExecutor(server)
开发者ID:forndb,项目名称:barman,代码行数:14,代码来源:test_executor.py


示例16: test_redundancy_report

    def test_redundancy_report(self, caplog):
        """
        Test of the management of the minimum_redundancy parameter
        into the backup_report method of the RedundancyRetentionPolicy class

        """
        server = build_mocked_server()
        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'REDUNDANCY 2')
        assert isinstance(rp, RedundancyRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.get_available_backups.return_value = {
            "test_backup": backup_info,
            "test_backup2": backup_info,
            "test_backup3": backup_info,
        }
        rp.server.config.minimum_redundancy = 1
        # execute retention policy report
        report = rp.report()
        # check that our mock is valid for the retention policy because
        # the total number of valid backups is lower than the retention policy
        # redundancy.
        assert report == {'test_backup': BackupInfo.OBSOLETE,
                          'test_backup2': BackupInfo.VALID,
                          'test_backup3': BackupInfo.VALID}
        # Expect a ValueError if passed context is invalid
        with pytest.raises(ValueError):
            rp.report(context='invalid')
        # Set a new minimum_redundancy parameter, enforcing the usage of the
        # configuration parameter instead of the retention policy default
        rp.server.config.minimum_redundancy = 3
        # execute retention policy report
        rp.report()
        # Check for the warning inside the log
        caplog.set_level(logging.WARNING)

        log = caplog.text
        assert log.find("WARNING  Retention policy redundancy (2) "
                        "is lower than the required minimum redundancy (3). "
                        "Enforce 3.")
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:50,代码来源:test_retention_policies.py


示例17: test_data_dir

    def test_data_dir(self, tmpdir):
        """
        Simple test for the method that is responsible of the build of the
        path to the datadir and to the tablespaces dir according
        with backup_version
        """
        server = build_mocked_server(
            main_conf={
                'basebackups_directory': tmpdir.strpath
            },
        )

        # Build a fake v2 backup
        backup_dir = tmpdir.mkdir('fake_backup_id')
        data_dir = backup_dir.mkdir('data')
        info_file = backup_dir.join('backup.info')
        info_file.write(BASE_BACKUP_INFO)
        b_info = BackupInfo(server, backup_id="fake_backup_id")

        # Check that the paths are built according with version
        assert b_info.backup_version == 2
        assert b_info.get_data_directory() == data_dir.strpath
        assert b_info.get_data_directory(16384) == (backup_dir.strpath +
                                                    '/16384')

        # Build a fake v1 backup
        backup_dir = tmpdir.mkdir('another_fake_backup_id')
        pgdata_dir = backup_dir.mkdir('pgdata')
        info_file = backup_dir.join('backup.info')
        info_file.write(BASE_BACKUP_INFO)
        b_info = BackupInfo(server, backup_id="another_fake_backup_id")

        # Check that the paths are built according with version
        assert b_info.backup_version == 1
        assert b_info.get_data_directory(16384) == \
            backup_dir.strpath + '/pgdata/pg_tblspc/16384'
        assert b_info.get_data_directory() == pgdata_dir.strpath

        # Check that an exception is raised if an invalid oid
        # is provided to the method
        with pytest.raises(ValueError):
            b_info.get_data_directory(12345)

        # Check that a ValueError exception is raised with an
        # invalid oid when the tablespaces list is None
        b_info.tablespaces = None
        # and expect a value error
        with pytest.raises(ValueError):
            b_info.get_data_directory(16384)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:49,代码来源:test_infofile.py


示例18: test_first_backup

    def test_first_backup(self):
        server = build_mocked_server()
        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'RECOVERY WINDOW OF 4 WEEKS')
        assert isinstance(rp, RecoveryWindowRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.get_available_backups.return_value = {
            "test_backup": backup_info
        }
        rp.server.config.minimum_redundancy = 1
        # execute retention policy report
        report = rp.first_backup()

        assert report == 'test_backup'

        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'REDUNDANCY 2')
        assert isinstance(rp, RedundancyRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.get_available_backups.return_value = {
            "test_backup": backup_info
        }
        rp.server.config.minimum_redundancy = 1
        # execute retention policy report
        report = rp.first_backup()

        assert report == 'test_backup'
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:47,代码来源:test_retention_policies.py


示例19: test_analyse_temporary_config_files

 def test_analyse_temporary_config_files(self, tmpdir):
     """
     Test the method that identifies dangerous options into the configuration
      files
     """
     # Build directory/files structure for testing
     tempdir = tmpdir.mkdir('tempdir')
     recovery_info = {
         'configuration_files': ['postgresql.conf', 'postgresql.auto.conf'],
         'tempdir': tempdir.strpath,
         'temporary_configuration_files': [],
         'results': {'changes': [], 'warnings': []}
     }
     postgresql_conf = tempdir.join('postgresql.conf')
     postgresql_auto = tempdir.join('postgresql.auto.conf')
     postgresql_conf.write('archive_command = something\n'
                           'data_directory = something')
     postgresql_auto.write('archive_command = something\n'
                           'data_directory = something')
     local_rec = tmpdir.mkdir('local_rec')
     recovery_info['temporary_configuration_files'].append(
         postgresql_conf.strpath)
     recovery_info['temporary_configuration_files'].append(
         postgresql_auto.strpath)
     # Build a RecoveryExecutor object (using a mock as server and backup
     # manager.
     server = testing_helpers.build_mocked_server()
     backup_manager = Mock(server=server, config=server.config)
     executor = RecoveryExecutor(backup_manager)
     # Identify dangerous options into config files for remote recovery
     executor.analyse_temporary_config_files(recovery_info)
     assert len(recovery_info['results']['changes']) == 2
     assert len(recovery_info['results']['warnings']) == 2
     # Prepare for a local recovery test
     recovery_info['results']['changes'] = []
     recovery_info['results']['warnings'] = []
     postgresql_conf_local = local_rec.join('postgresql.conf')
     postgresql_auto_local = local_rec.join('postgresql.auto.conf')
     postgresql_conf_local.write('archive_command = something\n'
                                 'data_directory = something')
     postgresql_auto_local.write('archive_command = something\n'
                                 'data_directory = something')
     # Identify dangerous options for local recovery
     executor.analyse_temporary_config_files(recovery_info)
     assert len(recovery_info['results']['changes']) == 2
     assert len(recovery_info['results']['warnings']) == 2
开发者ID:stig,项目名称:barman,代码行数:46,代码来源:test_recovery_executor.py


示例20: test_backup_info_save

 def test_backup_info_save(self, tmpdir):
     """
     Test the save method of a BackupInfo object
     """
     # Check the saving method.
     # Load a backup.info file, modify the BackupInfo object
     # then save it.
     server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})
     backup_dir = tmpdir.mkdir("fake_name")
     infofile = backup_dir.join("backup.info")
     b_info = BackupInfo(server, backup_id="fake_name")
     b_info.status = BackupInfo.FAILED
     b_info.save()
     # read the file looking for the modified line
     for line in infofile.readlines():
         if line.startswith("status"):
             assert line.strip() == "status=FAILED"
开发者ID:moench-tegeder,项目名称:barman,代码行数:17,代码来源:test_infofile.py



注:本文中的testing_helpers.build_mocked_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap