• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testing_helpers.build_test_backup_info函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testing_helpers.build_test_backup_info函数的典型用法代码示例。如果您正苦于以下问题:Python build_test_backup_info函数的具体用法?Python build_test_backup_info怎么用?Python build_test_backup_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了build_test_backup_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_delete_running_backup

    def test_delete_running_backup(self, delete_mock, get_first_backup_mock, tmpdir, capsys):
        """
        Simple test for the deletion of a running backup.
        We want to test the behaviour of the server.delete_backup method
        when invoked on a running backup
        """
        # Test the removal of a running backup. status STARTED
        server = build_real_server({"barman_home": tmpdir.strpath})
        backup_info_started = build_test_backup_info(status=BackupInfo.STARTED, server_name=server.config.name)
        get_first_backup_mock.return_value = backup_info_started.backup_id
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_started)
            out, err = capsys.readouterr()
            assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err

        # Test the removal of a running backup. status EMPTY
        backup_info_empty = build_test_backup_info(status=BackupInfo.EMPTY, server_name=server.config.name)
        get_first_backup_mock.return_value = backup_info_empty.backup_id
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_empty)
            out, err = capsys.readouterr()
            assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err

        # Test the removal of a running backup. status DONE
        backup_info_done = build_test_backup_info(status=BackupInfo.DONE, server_name=server.config.name)
        with ServerBackupLock(tmpdir.strpath, server.config.name):
            server.delete_backup(backup_info_done)
            delete_mock.assert_called_with(backup_info_done)

        # Test the removal of a backup not running. status STARTED
        server.delete_backup(backup_info_started)
        delete_mock.assert_called_with(backup_info_started)
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:32,代码来源:test_server.py


示例2: test_backup_copy_with_included_files

 def test_backup_copy_with_included_files(self, rsync_moc, tmpdir, capsys):
     backup_manager = build_backup_manager(global_conf={
         'barman_home': tmpdir.mkdir('home').strpath
     })
     # Create a backup info with additional configuration files
     backup_info = build_test_backup_info(
         server=backup_manager.server,
         pgdata="/pg/data",
         config_file="/etc/postgresql.conf",
         hba_file="/pg/data/pg_hba.conf",
         ident_file="/pg/data/pg_ident.conf",
         begin_xlog="0/2000028",
         begin_wal="000000010000000000000002",
         included_files=["/tmp/config/file.conf"],
         begin_offset=28)
     backup_info.save()
     # This is to check that all the preparation is done correctly
     assert os.path.exists(backup_info.filename)
     # Execute a backup
     backup_manager.executor.backup_copy(backup_info)
     out, err = capsys.readouterr()
     # check for the presence of the warning in the stderr
     assert "WARNING: The usage of include directives is not supported" in err
     # check that the additional configuration file is present in the output
     assert backup_info.included_files[0] in err
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py


示例3: test_get_wal_info

 def test_get_wal_info(self, get_wal_mock, tmpdir):
     """
     Basic test for get_wal_info method
     Test the wals per second and total time in seconds values.
     :return:
     """
     # Build a test server with a test path
     server = build_real_server(global_conf={
         'barman_home': tmpdir.strpath
     })
     # Mock method get_wal_until_next_backup for returning a list of
     # 3 fake WAL. the first one is the start and stop WAL of the backup
     wal_list = [
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000002\t16777216\t1434450086.53\tNone\n"),
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000003\t16777216\t1434450087.54\tNone\n"),
         WalFileInfo.from_xlogdb_line(
             "000000010000000000000004\t16777216\t1434450088.55\tNone\n")]
     get_wal_mock.return_value = wal_list
     backup_info = build_test_backup_info(
         server=server,
         begin_wal=wal_list[0].name,
         end_wal=wal_list[0].name)
     backup_info.save()
     # Evaluate total time in seconds:
     # last_wal_timestamp - first_wal_timestamp
     wal_total_seconds = wal_list[-1].time - wal_list[0].time
     # Evaluate the wals_per_second value:
     # wals_in_backup + wals_until_next_backup / total_time_in_seconds
     wals_per_second = len(wal_list) / wal_total_seconds
     wal_info = server.get_wal_info(backup_info)
     assert wal_info
     assert wal_info['wal_total_seconds'] == wal_total_seconds
     assert wal_info['wals_per_second'] == wals_per_second
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_server.py


示例4: test_backup_cache_add

    def test_backup_cache_add(self, tmpdir):
        """
        Check the method responsible for the registration of a BackupInfo obj
        into the backups cache
        """
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })


        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )
        b_info.save()

        assert backup_manager._backup_cache is None

        # Register the object to cache. The cache is not initialized, so it
        # must load the cache from disk.
        backup_manager.backup_cache_add(b_info)
        # Check that the test backup is in the cache
        assert backup_manager.get_backup(b_info.backup_id) is b_info

        # Initialize an empty cache
        backup_manager._backup_cache = {}
        # Add the backup again
        backup_manager.backup_cache_add(b_info)
        assert backup_manager.get_backup(b_info.backup_id) is b_info
开发者ID:terrorobe,项目名称:barman,代码行数:33,代码来源:test_backup.py


示例5: test_backup_copy

    def test_backup_copy(self, rsync_mock, tmpdir):
        """
        Test the execution of a rsync copy

        :param rsync_mock: mock for the rsync command
        :param tmpdir: temporary dir
        """
        backup_manager = build_backup_manager(global_conf={
            'barman_home': tmpdir.mkdir('home').strpath
        })
        backup_info = build_test_backup_info(
            server=backup_manager.server,
            pgdata="/pg/data",
            config_file="/etc/postgresql.conf",
            hba_file="/pg/data/pg_hba.conf",
            ident_file="/pg/data/pg_ident.conf",
            begin_xlog="0/2000028",
            begin_wal="000000010000000000000002",
            begin_offset=28)
        backup_info.save()
        # This is to check that all the preparation is done correctly
        assert os.path.exists(backup_info.filename)

        backup_manager.executor.backup_copy(backup_info)

        assert rsync_mock.mock_calls == [
            mock.call(check=True, network_compression=False, args=[],
                      bwlimit=None, ssh='ssh',
                      ssh_options=['-c', '"arcfour"', '-p', '22',
                                   '[email protected]', '-o',
                                   'BatchMode=yes', '-o',
                                   'StrictHostKeyChecking=no']),
            mock.call().smart_copy(':/fake/location/',
                                   backup_info.get_data_directory(16387),
                                   None, None),
            mock.call(check=True, network_compression=False, args=[],
                      bwlimit=None, ssh='ssh',
                      ssh_options=['-c', '"arcfour"', '-p', '22',
                                   '[email protected]', '-o',
                                   'BatchMode=yes', '-o',
                                   'StrictHostKeyChecking=no']),
            mock.call().smart_copy(':/another/location/',
                                   backup_info.get_data_directory(16405),
                                   None, None),
            mock.call(network_compression=False,
                      exclude_and_protect=['/pg_tblspc/16387',
                                           '/pg_tblspc/16405'],
                      args=[], bwlimit=None, ssh='ssh',
                      ssh_options=['-c', '"arcfour"', '-p', '22',
                                   '[email protected]',
                                   '-o', 'BatchMode=yes',
                                   '-o', 'StrictHostKeyChecking=no']),
            mock.call().smart_copy(':/pg/data/',
                                   backup_info.get_data_directory(),
                                   None, None),
            mock.call()(
                ':/pg/data/global/pg_control',
                '%s/global/pg_control' % backup_info.get_data_directory()),
            mock.call()(':/etc/postgresql.conf',
                        backup_info.get_data_directory())]
开发者ID:forndb,项目名称:barman,代码行数:60,代码来源:test_executor.py


示例6: test_concurrent_stop_backup

    def test_concurrent_stop_backup(self, label_mock, stop_mock,):
        """
        Basic test for the start_backup method

        :param label_mock: mimic the response of _write_backup_label
        :param stop_mock: mimic the response of _pgespresso_stop_backup
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("000000060000A25700000044", stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/45000000'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0
        assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py


示例7: test_get_backup

    def test_get_backup(self, tmpdir):
        """
        Check the get_backup method that uses the backups cache to retrieve
        a backup using the id
        """
        # Setup temp dir and server
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })

        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )
        b_info.save()

        assert backup_manager._backup_cache is None

        # Check that the backup returned is the same
        assert backup_manager.get_backup(b_info.backup_id).to_dict() == \
            b_info.to_dict()

        # Empty the backup manager cache
        backup_manager._backup_cache = {}

        # Check that the backup returned is None
        assert backup_manager.get_backup(b_info.backup_id) is None
开发者ID:terrorobe,项目名称:barman,代码行数:31,代码来源:test_backup.py


示例8: test_backup_cache_remove

    def test_backup_cache_remove(self, tmpdir):
        """
        Check the method responsible for the removal of a BackupInfo object from
        the backups cache
        """
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })

        assert backup_manager._backup_cache is None

        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )

        # Remove the backup from the uninitialized cache
        backup_manager.backup_cache_remove(b_info)
        # Check that the test backup is still not initialized
        assert backup_manager._backup_cache is None

        # Initialize the cache
        backup_manager._backup_cache = {b_info.backup_id: b_info}
        # Remove the backup from the cache
        backup_manager.backup_cache_remove(b_info)
        assert b_info.backup_id not in backup_manager._backup_cache
开发者ID:terrorobe,项目名称:barman,代码行数:30,代码来源:test_backup.py


示例9: test_recover_basebackup_copy

 def test_recover_basebackup_copy(self, rsync_pg_mock, tmpdir):
     """
     Test the copy of a content of a backup during a recovery
     :param rsync_pg_mock: Mock rsync object for the purpose if this test
     """
     # Build basic folder/files structure
     dest = tmpdir.mkdir('destination')
     server = testing_helpers.build_real_server()
     backup_info = testing_helpers.build_test_backup_info(
         server=server,
         tablespaces=[('tbs1', 16387, '/fake/location')])
     # Build a executor
     executor = RecoveryExecutor(server.backup_manager)
     executor.config.tablespace_bandwidth_limit = {'tbs1': ''}
     executor.config.bandwidth_limit = 10
     executor.basebackup_copy(
         backup_info, dest.strpath, tablespaces=None)
     rsync_pg_mock.assert_called_with(
         network_compression=False, bwlimit=10, ssh=None, path=None,
         exclude_and_protect=['/pg_tblspc/16387'])
     rsync_pg_mock.assert_any_call(
         network_compression=False, bwlimit='', ssh=None, path=None,
         check=True)
     rsync_pg_mock.return_value.smart_copy.assert_any_call(
         '/some/barman/home/main/base/1234567890/16387/',
         '/fake/location', None)
     rsync_pg_mock.return_value.smart_copy.assert_called_with(
         '/some/barman/home/main/base/1234567890/data/',
         dest.strpath, None)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:29,代码来源:test_recovery_executor.py


示例10: test_map_temporary_config_files

    def test_map_temporary_config_files(self, tmpdir):
        """
        Test the method that prepares configuration files
        for the final steps of a recovery
        """
        # Build directory/files structure for testing
        tempdir = tmpdir.mkdir("tempdir")
        recovery_info = {
            "configuration_files": ["postgresql.conf", "postgresql.auto.conf"],
            "tempdir": tempdir.strpath,
            "temporary_configuration_files": [],
            "results": {"changes": [], "warnings": [], "missing_files": []},
        }

        backup_info = testing_helpers.build_test_backup_info()
        backup_info.config.basebackups_directory = tmpdir.strpath
        datadir = tmpdir.mkdir(backup_info.backup_id).mkdir("data")
        postgresql_conf_local = datadir.join("postgresql.conf")
        postgresql_auto_local = datadir.join("postgresql.auto.conf")
        postgresql_conf_local.write("archive_command = something\n" "data_directory = something")
        postgresql_auto_local.write("archive_command = something\n" "data_directory = something")
        # Build a RecoveryExecutor object (using a mock as server and backup
        # manager.
        backup_manager = testing_helpers.build_backup_manager()
        executor = RecoveryExecutor(backup_manager)
        executor._map_temporary_config_files(recovery_info, backup_info, "[email protected]")
        # check that configuration files have been moved by the method
        assert tempdir.join("postgresql.conf").check()
        assert tempdir.join("postgresql.conf").computehash() == postgresql_conf_local.computehash()
        assert tempdir.join("postgresql.auto.conf").check()
        assert tempdir.join("postgresql.auto.conf").computehash() == postgresql_auto_local.computehash()
        assert recovery_info["results"]["missing_files"] == ["pg_hba.conf", "pg_ident.conf"]
开发者ID:moench-tegeder,项目名称:barman,代码行数:32,代码来源:test_recovery_executor.py


示例11: test_setup

    def test_setup(self, rsync_mock):
        """
        Test the method that set up a recovery
        """
        backup_info = testing_helpers.build_test_backup_info()
        backup_manager = testing_helpers.build_backup_manager()
        executor = RecoveryExecutor(backup_manager)
        backup_info.version = 90300

        # setup should create a temporary directory
        # and teardown should delete it
        ret = executor._setup(backup_info, None, "/tmp")
        assert os.path.exists(ret["tempdir"])
        executor._teardown(ret)
        assert not os.path.exists(ret["tempdir"])

        # no postgresql.auto.conf on version 9.3
        ret = executor._setup(backup_info, None, "/tmp")
        executor._teardown(ret)
        assert "postgresql.auto.conf" not in ret["configuration_files"]

        # Check the present for postgresql.auto.conf on version 9.4
        backup_info.version = 90400
        ret = executor._setup(backup_info, None, "/tmp")
        executor._teardown(ret)
        assert "postgresql.auto.conf" in ret["configuration_files"]

        # Receive a error if the remote command is invalid
        with pytest.raises(SystemExit):
            executor.server.path = None
            executor._setup(backup_info, "invalid", "/tmp")
开发者ID:moench-tegeder,项目名称:barman,代码行数:31,代码来源:test_recovery_executor.py


示例12: test_set_pitr_targets

    def test_set_pitr_targets(self, tmpdir):
        """
        Evaluate targets for point in time recovery
        """
        # Build basic folder/files structure
        tempdir = tmpdir.mkdir("temp_dir")
        dest = tmpdir.mkdir("dest")
        wal_dest = tmpdir.mkdir("wal_dest")
        recovery_info = {
            "configuration_files": ["postgresql.conf", "postgresql.auto.conf"],
            "tempdir": tempdir.strpath,
            "results": {"changes": [], "warnings": []},
            "is_pitr": False,
            "wal_dest": wal_dest.strpath,
            "get_wal": False,
        }
        backup_info = testing_helpers.build_test_backup_info()
        backup_manager = testing_helpers.build_backup_manager()
        # Build a recovery executor
        executor = RecoveryExecutor(backup_manager)
        executor._set_pitr_targets(recovery_info, backup_info, dest.strpath, "", "", "", "")
        # Test with empty values (no PITR)
        assert recovery_info["target_epoch"] is None
        assert recovery_info["target_datetime"] is None
        assert recovery_info["wal_dest"] == wal_dest.strpath
        # Test for PITR targets
        executor._set_pitr_targets(
            recovery_info, backup_info, dest.strpath, "target_name", "2015-06-03 16:11:03.71038+02", "2", None
        )
        target_datetime = dateutil.parser.parse("2015-06-03 16:11:03.710380+02:00")
        target_epoch = time.mktime(target_datetime.timetuple()) + (target_datetime.microsecond / 1000000.0)

        assert recovery_info["target_datetime"] == target_datetime
        assert recovery_info["target_epoch"] == target_epoch
        assert recovery_info["wal_dest"] == dest.join("barman_xlog").strpath
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_recovery_executor.py


示例13: test_prepare_tablespaces

 def test_prepare_tablespaces(self, tmpdir):
     """
     Test tablespaces preparation for recovery
     """
     # Prepare basic directory/files structure
     dest = tmpdir.mkdir('destination')
     wals = tmpdir.mkdir('wals')
     backup_info = testing_helpers.build_test_backup_info(
         tablespaces=[('tbs1', 16387, '/fake/location')])
     # build an executor
     server = testing_helpers.build_real_server(
         main_conf={'wals_directory': wals.strpath})
     executor = RecoveryExecutor(server.backup_manager)
     # use a mock as cmd obj
     cmd_mock = Mock()
     executor.prepare_tablespaces(backup_info, cmd_mock, dest.strpath, {})
     cmd_mock.create_dir_if_not_exists.assert_any_call(
         dest.join('pg_tblspc').strpath)
     cmd_mock.create_dir_if_not_exists.assert_any_call(
         '/fake/location')
     cmd_mock.delete_if_exists.assert_called_once_with(
         dest.join('pg_tblspc').join('16387').strpath)
     cmd_mock.create_symbolic_link.assert_called_once_with(
         '/fake/location',
         dest.join('pg_tblspc').join('16387').strpath)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:25,代码来源:test_recovery_executor.py


示例14: test_pgespresso_stop_backup

    def test_pgespresso_stop_backup(self, tbs_map_mock, label_mock):
        """
        Basic test for the pgespresso_stop_backup method
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        server.postgres.server_version = 90500
        server.postgres.pgespresso_stop_backup.return_value = {
            'end_wal': "000000060000A25700000044",
            'timestamp': stop_time
        }

        backup_info = build_test_backup_info(timeline=6)
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/44FFFFFF'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0xFFFFFF
        assert backup_info.end_time == stop_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:26,代码来源:test_executor.py


示例15: test_load_backup_cache

    def test_load_backup_cache(self, tmpdir):
        """
        Check the loading of backups inside the backup_cache
        """
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })

        # Make sure the cache is uninitialized
        assert backup_manager._backup_cache is None

        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )
        b_info.save()

        # Load backups inside the cache
        backup_manager._load_backup_cache()

        # Check that the test backup is inside the backups_cache
        assert backup_manager._backup_cache[b_info.backup_id].to_dict() == \
               b_info.to_dict()
开发者ID:terrorobe,项目名称:barman,代码行数:27,代码来源:test_backup.py


示例16: test_exclusive_stop_backup

    def test_exclusive_stop_backup(self, stop_mock):
        """
        Basic test for the start_backup method

        :param stop_mock: mimic the response od _pg_stop_backup
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.EXCLUSIVE_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        # Mock executor._pg_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("266/4A9C1EF8",
                                  "00000010000002660000004A",
                                  10231544,
                                  stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.strategy.stop_backup(backup_info)

        # check that the submitted values are stored inside the BackupInfo obj
        assert backup_info.end_xlog == '266/4A9C1EF8'
        assert backup_info.end_wal == '00000010000002660000004A'
        assert backup_info.end_offset == 10231544
        assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:27,代码来源:test_executor.py


示例17: test_first_backup

    def test_first_backup(self):
        server = build_mocked_server()
        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'RECOVERY WINDOW OF 4 WEEKS')
        assert isinstance(rp, RecoveryWindowRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.get_available_backups.return_value = {
            "test_backup": backup_info
        }
        rp.server.config.minimum_redundancy = 1
        # execute retention policy report
        report = rp.first_backup()

        assert report == 'test_backup'

        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'REDUNDANCY 2')
        assert isinstance(rp, RedundancyRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.get_available_backups.return_value = {
            "test_backup": backup_info
        }
        rp.server.config.minimum_redundancy = 1
        # execute retention policy report
        report = rp.first_backup()

        assert report == 'test_backup'
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:47,代码来源:test_retention_policies.py


示例18: test_get_wal_until_next_backup

    def test_get_wal_until_next_backup(self, get_backup_mock, tmpdir):
        """
        Simple test for the management of .history files
        """
        # build a WalFileInfo object
        wfile_info = WalFileInfo()
        wfile_info.name = '000000010000000000000003'
        wfile_info.size = 42
        wfile_info.time = 43
        wfile_info.compression = None

        # build a WalFileInfo history object
        history_info = WalFileInfo()
        history_info.name = '00000001.history'
        history_info.size = 42
        history_info.time = 43
        history_info.compression = None

        # create a xlog.db and add the 2 entries
        wals_dir = tmpdir.mkdir("wals")
        xlog = wals_dir.join("xlog.db")
        xlog.write(wfile_info.to_xlogdb_line() + history_info.to_xlogdb_line())
        # fake backup
        backup = build_test_backup_info(
            begin_wal='000000010000000000000001',
            end_wal='000000010000000000000004')

        # mock a server object and mock a return call to get_next_backup method
        server = build_real_server(
            global_conf={
                "barman_lock_directory": tmpdir.mkdir('lock').strpath
            },
            main_conf={
                "wals_directory": wals_dir.strpath
            })
        get_backup_mock.return_value = build_test_backup_info(
            backup_id="1234567899",
            begin_wal='000000010000000000000005',
            end_wal='000000010000000000000009')

        wals = []
        for wal_file in server.get_wal_until_next_backup(backup,
                                                         include_history=True):
            # get the result of the xlogdb read
            wals.append(wal_file.name)
        # check for the presence of the .history file
        assert history_info.name in wals
开发者ID:moench-tegeder,项目名称:barman,代码行数:47,代码来源:test_server.py


示例19: test_recovery_window_report

    def test_recovery_window_report(self, caplog):
        """
        Basic unit test of RecoveryWindowRetentionPolicy

        Given a mock simulating a Backup with status DONE and
        the end_date not over the point of recoverability,
        the report method of the RecoveryWindowRetentionPolicy class must mark
        it as valid
        """
        server = build_mocked_server()
        rp = RetentionPolicyFactory.create(
            server,
            'retention_policy',
            'RECOVERY WINDOW OF 4 WEEKS')
        assert isinstance(rp, RecoveryWindowRetentionPolicy)

        # Build a BackupInfo object with status to DONE
        backup_info = build_test_backup_info(
            server=rp.server,
            backup_id='test1',
            end_time=datetime.now(tzlocal()))

        backup_source = {'test_backup3': backup_info}
        # Add a obsolete backup
        backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=5)
        backup_source['test_backup2'] = backup_info
        # Add a second obsolete backup
        backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=6)
        backup_source['test_backup'] = backup_info
        rp.server.get_available_backups.return_value = backup_source
        # instruct the get_available_backups method to return a map with
        # our mock as result and minimum_redundancy = 1
        rp.server.config.minimum_redundancy = 1
        rp.server.config.name = "test"
        # execute retention policy report
        report = rp.report()
        # check that our mock is valid for the retention policy
        assert report == {'test_backup3': 'VALID',
                          'test_backup2': 'OBSOLETE',
                          'test_backup': 'OBSOLETE'}

        # Expect a ValueError if passed context is invalid
        with pytest.raises(ValueError):
            rp.report(context='invalid')
        # Set a new minimum_redundancy parameter, enforcing the usage of the
        # configuration parameter instead of the retention policy default
        rp.server.config.minimum_redundancy = 4
        # execute retention policy report
        rp.report()
        # Check for the warning inside the log
        caplog.set_level(logging.WARNING)
        log = caplog.text
        warn = "WARNING  Keeping obsolete backup test_backup2 for " \
               "server test (older than %s) due to minimum redundancy " \
               "requirements (4)\n" % rp._point_of_recoverability()
        assert log.find(warn)
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:56,代码来源:test_retention_policies.py


示例20: test_stop_backup

    def test_stop_backup(self, espressostop_mock, stop_mock):
        """
        Basic test for the start_backup method

        :param espressostop_mock: mimic the response of pg_espresso_stop_backup
        :param stop_mock: mimic the response od pg_stop_backup
        """
        # Build a backup info and configure the mocks
        backup_manager = build_backup_manager()

        # Test 1: stop exclusive backup
        # Mock executor.pg_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("266/4A9C1EF8",
                                  "00000010000002660000004A",
                                  10231544,
                                  stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.stop_backup(backup_info)

        # check that the submitted values are stored inside the BackupInfo obj
        assert backup_info.end_xlog == '266/4A9C1EF8'
        assert backup_info.end_wal == '00000010000002660000004A'
        assert backup_info.end_offset == 10231544
        assert backup_info.end_time == stop_time

        # Test 2: stop concurrent backup
        # change the configuration to concurrent backup
        backup_manager.executor.config.backup_options = [
            BackupOptions.CONCURRENT_BACKUP]

        # Mock executor.pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        espressostop_mock.return_value = ("000000060000A25700000044", stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/45000000'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0
        assert backup_info.end_time == stop_time
开发者ID:terrorobe,项目名称:barman,代码行数:43,代码来源:test_executor.py



注:本文中的testing_helpers.build_test_backup_info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testing_helpers.expect函数代码示例发布时间:2022-05-27
下一篇:
Python testing_helpers.build_real_server函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap