• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    公众号

Python testing_helpers.build_backup_manager函数代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Python中testing_helpers.build_backup_manager函数的典型用法代码示例。如果您正苦于以下问题:Python build_backup_manager函数的具体用法?Python build_backup_manager怎么用?Python build_backup_manager使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。



在下文中一共展示了build_backup_manager函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。

示例1: test_mode

 def test_mode(self):
     # build a backup manager with a rsync executor (exclusive)
     backup_manager = build_backup_manager()
     # check the result of the mode property
     assert backup_manager.executor.mode == "rsync-exclusive"
     # build a backup manager with a postgres executor
     #  (strategy without mode)
     backup_manager = build_backup_manager(global_conf={"backup_method": "postgres"})
     # check the result of the mode property
     assert backup_manager.executor.mode == "postgres"
开发者ID:moench-tegeder,项目名称:barman,代码行数:10,代码来源:test_infofile.py


示例2: test_backup_info_exception

    def test_backup_info_exception(self, command_mock):
        # BackupManager mock
        backup_manager = build_backup_manager(name='test_server')
        backup_manager.config.pre_test_hook = 'not_existent_script'
        backup_manager.get_previous_backup = MagicMock()
        backup_manager.get_previous_backup.side_effect = \
            UnknownBackupIdException()

        # BackupInfo mock
        backup_info = MagicMock(name='backup_info')
        backup_info.get_basebackup_directory.return_value = 'backup_directory'
        backup_info.backup_id = '123456789XYZ'
        backup_info.error = None
        backup_info.status = 'OK'

        # the actual test
        script = HookScriptRunner(backup_manager, 'test_hook', 'pre')
        script.env_from_backup_info(backup_info)
        expected_env = {
            'BARMAN_PHASE': 'pre',
            'BARMAN_VERSION': version,
            'BARMAN_SERVER': 'test_server',
            'BARMAN_CONFIGURATION': 'build_config_from_dicts',
            'BARMAN_HOOK': 'test_hook',
            'BARMAN_BACKUP_DIR': 'backup_directory',
            'BARMAN_BACKUP_ID': '123456789XYZ',
            'BARMAN_ERROR': '',
            'BARMAN_STATUS': 'OK',
            'BARMAN_PREVIOUS_ID': '',
            'BARMAN_RETRY': '0',
        }
        script.run()
        assert command_mock.call_count == 1
        assert command_mock.call_args[1]['env_append'] == expected_env
开发者ID:zacchiro,项目名称:barman,代码行数:34,代码来源:test_hooks.py


示例3: test_wal_info_corner_cases

    def test_wal_info_corner_cases(self, command_mock):
        # BackupManager mock
        backup_manager = build_backup_manager(name="test_server")
        backup_manager.config.pre_test_hook = "not_existent_script"

        # WalFileInfo mock
        timestamp = time.time()
        wal_info = MagicMock(name="wal_info")
        wal_info.name = "XXYYZZAABBCC"
        wal_info.size = 1234567
        wal_info.time = timestamp
        wal_info.compression = None
        wal_info.fullpath.return_value = "/incoming/directory"

        # the actual test
        script = HookScriptRunner(backup_manager, "test_hook", "pre")
        script.env_from_wal_info(wal_info)
        expected_env = {
            "BARMAN_PHASE": "pre",
            "BARMAN_VERSION": version,
            "BARMAN_SERVER": "test_server",
            "BARMAN_CONFIGURATION": "build_config_from_dicts",
            "BARMAN_HOOK": "test_hook",
            "BARMAN_SEGMENT": "XXYYZZAABBCC",
            "BARMAN_FILE": "/incoming/directory",
            "BARMAN_SIZE": "1234567",
            "BARMAN_TIMESTAMP": str(timestamp),
            "BARMAN_COMPRESSION": "",
        }
        script.run()
        assert command_mock.call_count == 1
        assert command_mock.call_args[1]["env_append"] == expected_env
开发者ID:stig,项目名称:barman,代码行数:32,代码来源:test_hooks.py


示例4: test_backup_info_exception

    def test_backup_info_exception(self, command_mock):
        # BackupManager mock
        backup_manager = build_backup_manager(name="test_server")
        backup_manager.config.pre_test_hook = "not_existent_script"
        backup_manager.get_previous_backup = MagicMock()
        backup_manager.get_previous_backup.side_effect = UnknownBackupIdException()

        # BackupInfo mock
        backup_info = MagicMock(name="backup_info")
        backup_info.get_basebackup_directory.return_value = "backup_directory"
        backup_info.backup_id = "123456789XYZ"
        backup_info.error = None
        backup_info.status = "OK"

        # the actual test
        script = HookScriptRunner(backup_manager, "test_hook", "pre")
        script.env_from_backup_info(backup_info)
        expected_env = {
            "BARMAN_PHASE": "pre",
            "BARMAN_VERSION": version,
            "BARMAN_SERVER": "test_server",
            "BARMAN_CONFIGURATION": "build_config_from_dicts",
            "BARMAN_HOOK": "test_hook",
            "BARMAN_BACKUP_DIR": "backup_directory",
            "BARMAN_BACKUP_ID": "123456789XYZ",
            "BARMAN_ERROR": "",
            "BARMAN_STATUS": "OK",
            "BARMAN_PREVIOUS_ID": "",
        }
        script.run()
        assert command_mock.call_count == 1
        assert command_mock.call_args[1]["env_append"] == expected_env
开发者ID:stig,项目名称:barman,代码行数:32,代码来源:test_hooks.py


示例5: test_check_receivexlog_installed

    def test_check_receivexlog_installed(self, command_mock):
        """
        Test for the check method of the StreamingWalArchiver class
        """
        backup_manager = build_backup_manager()
        backup_manager.server.postgres.server_txt_version = "9.2"
        command_mock.side_effect = CommandFailedException

        archiver = StreamingWalArchiver(backup_manager)
        result = archiver.get_remote_status()

        assert result == {
            "pg_receivexlog_installed": False,
            "pg_receivexlog_path": None,
            "pg_receivexlog_compatible": None,
            'pg_receivexlog_synchronous': None,
            "pg_receivexlog_version": None,
            "pg_receivexlog_supports_slots": None,
        }

        backup_manager.server.postgres.server_txt_version = "9.2"
        command_mock.side_effect = None
        command_mock.return_value.cmd = '/some/path/to/pg_receivexlog'
        command_mock.return_value.side_effect = CommandFailedException
        archiver.reset_remote_status()
        result = archiver.get_remote_status()

        assert result == {
            "pg_receivexlog_installed": True,
            "pg_receivexlog_path": "/some/path/to/pg_receivexlog",
            "pg_receivexlog_compatible": None,
            'pg_receivexlog_synchronous': None,
            "pg_receivexlog_version": None,
            "pg_receivexlog_supports_slots": None,
        }
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_wal_archiver.py


示例6: test_check

    def test_check(self, remote_status_mock):
        """
        Very simple and basic test for the check method
        :param remote_status_mock: mock for the get_remote_status method
        """
        remote_status_mock.return_value = {
            'pg_basebackup_compatible': True,
            'pg_basebackup_installed': True,
            'pg_basebackup_path': '/fake/path',
            'pg_basebackup_bwlimit': True,
            'pg_basebackup_version': '9.5'
        }
        check_strat = CheckStrategy()
        backup_manager = build_backup_manager(global_conf={
            'backup_method': 'postgres'
        })
        backup_manager.executor.check(check_strategy=check_strat)
        # No errors detected
        assert check_strat.has_error is not True

        remote_status_mock.reset_mock()
        remote_status_mock.return_value = {
            'pg_basebackup_compatible': False,
            'pg_basebackup_installed': True,
            'pg_basebackup_path': True,
            'pg_basebackup_bwlimit': True,
            'pg_basebackup_version': '9.5'
        }

        check_strat = CheckStrategy()
        backup_manager.executor.check(check_strategy=check_strat)
        # Error present because of the 'pg_basebackup_compatible': False
        assert check_strat.has_error is True
开发者ID:jamonation,项目名称:barman,代码行数:33,代码来源:test_executor.py


示例7: test_wal_info_corner_cases

    def test_wal_info_corner_cases(self, command_mock):
        # BackupManager mock
        backup_manager = build_backup_manager(name='test_server')
        backup_manager.config.pre_test_hook = 'not_existent_script'

        # WalFileInfo mock
        timestamp = time.time()
        wal_info = MagicMock(name='wal_info')
        wal_info.name = 'XXYYZZAABBCC'
        wal_info.size = 1234567
        wal_info.time = timestamp
        wal_info.compression = None
        wal_info.fullpath.return_value = '/incoming/directory'

        # the actual test
        script = HookScriptRunner(backup_manager, 'test_hook', 'pre')
        script.env_from_wal_info(wal_info, '/somewhere', Exception('BOOM!'))
        expected_env = {
            'BARMAN_PHASE': 'pre',
            'BARMAN_VERSION': version,
            'BARMAN_SERVER': 'test_server',
            'BARMAN_CONFIGURATION': 'build_config_from_dicts',
            'BARMAN_HOOK': 'test_hook',
            'BARMAN_SEGMENT': 'XXYYZZAABBCC',
            'BARMAN_FILE': '/somewhere',
            'BARMAN_SIZE': '1234567',
            'BARMAN_TIMESTAMP': str(timestamp),
            'BARMAN_COMPRESSION': '',
            'BARMAN_RETRY': '0',
            'BARMAN_ERROR': 'BOOM!',
        }
        script.run()
        assert command_mock.call_count == 1
        assert command_mock.call_args[1]['env_append'] == expected_env
开发者ID:zacchiro,项目名称:barman,代码行数:34,代码来源:test_hooks.py


示例8: test_streamingwalarchiver_check_receivexlog_installed

    def test_streamingwalarchiver_check_receivexlog_installed(
            self, command_mock, which_mock):
        """
        Test for the check method of the StreamingWalArchiver class
        """
        backup_manager = build_backup_manager()
        backup_manager.server.postgres.server_txt_version = "9.2"
        which_mock.return_value = None

        archiver = StreamingWalArchiver(backup_manager)
        result = archiver.get_remote_status()

        which_mock.assert_called_with('pg_receivexlog', ANY)
        assert result == {
            "pg_receivexlog_installed": False,
            "pg_receivexlog_path": None,
            "pg_receivexlog_compatible": None,
            "pg_receivexlog_version": None,
        }

        backup_manager.server.postgres.server_txt_version = "9.2"
        which_mock.return_value = '/some/path/to/pg_receivexlog'
        command_mock.return_value.side_effect = CommandFailedException
        result = archiver.get_remote_status()

        assert result == {
            "pg_receivexlog_installed": True,
            "pg_receivexlog_path": "/some/path/to/pg_receivexlog",
            "pg_receivexlog_compatible": None,
            "pg_receivexlog_version": None,
        }
开发者ID:zacchiro,项目名称:barman,代码行数:31,代码来源:test_wal_archiver.py


示例9: test_archive_wal_no_backup

    def test_archive_wal_no_backup(self, tmpdir, capsys):
        """
        Test archive-wal behaviour when there are no backups.

        Expect it to trash WAL files
        """
        # Build a real backup manager
        backup_manager = build_backup_manager(name="TestServer", global_conf={"barman_home": tmpdir.strpath})
        backup_manager.compression_manager.get_compressor.return_value = None
        backup_manager.server.get_backup.return_value = None
        # Build the basic folder structure and files
        basedir = tmpdir.join("main")
        incoming_dir = basedir.join("incoming")
        archive_dir = basedir.join("wals")
        xlog_db = archive_dir.join("xlog.db")
        wal_name = "000000010000000000000001"
        wal_file = incoming_dir.join(wal_name)
        wal_file.ensure()
        archive_dir.ensure(dir=True)
        xlog_db.ensure()
        backup_manager.server.xlogdb.return_value.__enter__.return_value = xlog_db.open(mode="a")
        backup_manager.server.archivers = [FileWalArchiver(backup_manager)]

        backup_manager.archive_wal()

        # Check that the WAL file is not present inside the wal catalog
        with xlog_db.open() as f:
            line = str(f.readline())
            assert wal_name not in line
        wal_path = os.path.join(archive_dir.strpath, barman.xlog.hash_dir(wal_name), wal_name)
        # Check that the wal file have not been archived
        assert not os.path.exists(wal_path)
        out, err = capsys.readouterr()
        # Check the output for the removal of the wal file
        assert ("No base backup available. Trashing file %s" % wal_name) in out
开发者ID:girgen,项目名称:barman,代码行数:35,代码来源:test_wal_archiver.py


示例10: test_pgespresso_start_backup

    def test_pgespresso_start_backup(self):
        """
        Simple test for _pgespresso_start_backup method
        of the RsyncBackupExecutor class
        """
        # Build and configure a server using a mock
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)
        backup_label = 'test label'

        # Expect an exception because pgespresso is not installed
        backup_manager.server.pgespresso_installed.return_value = False
        with pytest.raises(Exception):
            backup_manager.executor.pgespresso_start_backup(backup_label)

        # Report pgespresso installed. Expect no error and the correct call
        # sequence
        backup_manager.server.reset_mock()
        backup_manager.executor.server.pgespresso_installed.return_value = True
        backup_manager.executor.strategy._pgespresso_start_backup(backup_label)

        pg_connect = mock.call.pg_connect()
        with_pg_connect = pg_connect.__enter__()
        cursor = with_pg_connect.cursor()
        assert backup_manager.server.mock_calls == [
            pg_connect,
            with_pg_connect,
            pg_connect.__enter__().rollback(),
            cursor,
            cursor.execute(ANY, ANY),
            cursor.fetchone(),
            pg_connect.__exit__(None, None, None)]
开发者ID:forndb,项目名称:barman,代码行数:35,代码来源:test_executor.py


示例11: test_map_temporary_config_files

    def test_map_temporary_config_files(self, tmpdir):
        """
        Test the method that prepares configuration files
        for the final steps of a recovery
        """
        # Build directory/files structure for testing
        tempdir = tmpdir.mkdir("tempdir")
        recovery_info = {
            "configuration_files": ["postgresql.conf", "postgresql.auto.conf"],
            "tempdir": tempdir.strpath,
            "temporary_configuration_files": [],
            "results": {"changes": [], "warnings": [], "missing_files": []},
        }

        backup_info = testing_helpers.build_test_backup_info()
        backup_info.config.basebackups_directory = tmpdir.strpath
        datadir = tmpdir.mkdir(backup_info.backup_id).mkdir("data")
        postgresql_conf_local = datadir.join("postgresql.conf")
        postgresql_auto_local = datadir.join("postgresql.auto.conf")
        postgresql_conf_local.write("archive_command = something\n" "data_directory = something")
        postgresql_auto_local.write("archive_command = something\n" "data_directory = something")
        # Build a RecoveryExecutor object (using a mock as server and backup
        # manager.
        backup_manager = testing_helpers.build_backup_manager()
        executor = RecoveryExecutor(backup_manager)
        executor._map_temporary_config_files(recovery_info, backup_info, "[email protected]")
        # check that configuration files have been moved by the method
        assert tempdir.join("postgresql.conf").check()
        assert tempdir.join("postgresql.conf").computehash() == postgresql_conf_local.computehash()
        assert tempdir.join("postgresql.auto.conf").check()
        assert tempdir.join("postgresql.auto.conf").computehash() == postgresql_auto_local.computehash()
        assert recovery_info["results"]["missing_files"] == ["pg_hba.conf", "pg_ident.conf"]
开发者ID:moench-tegeder,项目名称:barman,代码行数:32,代码来源:test_recovery_executor.py


示例12: test_retry

    def test_retry(self, sleep_moc):
        """
        Test the retry method

        :param sleep_moc: mimic the sleep timer
        """
        # BackupManager setup
        backup_manager = build_backup_manager()
        backup_manager.config.basebackup_retry_times = 5
        backup_manager.config.basebackup_retry_sleep = 10
        f = Mock()

        # check for correct return value
        r = backup_manager.retry_backup_copy(f, 'test string')
        f.assert_called_with('test string')
        assert f.return_value == r

        # check for correct number of calls
        expected = Mock()
        f = Mock(side_effect=[DataTransferFailure('testException'), expected])
        r = backup_manager.retry_backup_copy(f, 'test string')
        assert f.call_count == 2

        # check for correct number of tries and invocations of sleep method
        sleep_moc.reset_mock()
        e = DataTransferFailure('testException')
        f = Mock(side_effect=[e, e, e, e, e, e])
        with pytest.raises(DataTransferFailure):
            backup_manager.retry_backup_copy(f, 'test string')

        assert sleep_moc.call_count == 5
        assert f.call_count == 6
开发者ID:terrorobe,项目名称:barman,代码行数:32,代码来源:test_backup.py


示例13: test_load_backup_cache

    def test_load_backup_cache(self, tmpdir):
        """
        Check the loading of backups inside the backup_cache
        """
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })

        # Make sure the cache is uninitialized
        assert backup_manager._backup_cache is None

        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )
        b_info.save()

        # Load backups inside the cache
        backup_manager._load_backup_cache()

        # Check that the test backup is inside the backups_cache
        assert backup_manager._backup_cache[b_info.backup_id].to_dict() == \
               b_info.to_dict()
开发者ID:terrorobe,项目名称:barman,代码行数:27,代码来源:test_backup.py


示例14: test_backup_cache_add

    def test_backup_cache_add(self, tmpdir):
        """
        Check the method responsible for the registration of a BackupInfo obj
        into the backups cache
        """
        # build a backup_manager and setup a basic configuration
        backup_manager = build_backup_manager(
            name='TestServer',
            global_conf={
                'barman_home': tmpdir.strpath
            })


        # Create a BackupInfo object with status DONE
        b_info = build_test_backup_info(
            backup_id='fake_backup_id',
            server=backup_manager.server,
        )
        b_info.save()

        assert backup_manager._backup_cache is None

        # Register the object to cache. The cache is not initialized, so it
        # must load the cache from disk.
        backup_manager.backup_cache_add(b_info)
        # Check that the test backup is in the cache
        assert backup_manager.get_backup(b_info.backup_id) is b_info

        # Initialize an empty cache
        backup_manager._backup_cache = {}
        # Add the backup again
        backup_manager.backup_cache_add(b_info)
        assert backup_manager.get_backup(b_info.backup_id) is b_info
开发者ID:terrorobe,项目名称:barman,代码行数:33,代码来源:test_backup.py


示例15: test_get_next_batch

    def test_get_next_batch(self, from_file_mock, isfile_mock, glob_mock):
        """
        Test the FileWalArchiver.get_next_batch method
        """

        # WAL batch no errors
        glob_mock.return_value = ['000000010000000000000001']
        isfile_mock.return_value = True
        # This is an hack, instead of a WalFileInfo we use a simple string to
        # ease all the comparisons. The resulting string is the name enclosed
        # in colons. e.g. ":000000010000000000000001:"
        from_file_mock.side_effect = lambda wal_name: ':%s:' % wal_name

        backup_manager = build_backup_manager(
            name='TestServer'
        )
        archiver = FileWalArchiver(backup_manager)
        backup_manager.server.archivers = [archiver]

        batch = archiver.get_next_batch()
        assert [':000000010000000000000001:'] == batch

        # WAL batch with errors
        wrong_file_name = 'test_wrong_wal_file.2'
        glob_mock.return_value = ['test_wrong_wal_file.2']
        batch = archiver.get_next_batch()
        assert [wrong_file_name] == batch.errors
开发者ID:damnski,项目名称:barman,代码行数:27,代码来源:test_wal_archiver.py


示例16: test_keyboard_interrupt

    def test_keyboard_interrupt(self, mock_infofile ):
        """
        Unit test for a quick check on exception catching
        during backup operations

        Test case 1: raise a general exception, backup status in
        BackupInfo should be FAILED.

        Test case 2: raise a KeyboardInterrupt exception, simulating
        a user pressing CTRL + C while a backup is in progress,
        backup status in BackupInfo should be FAILED.
        """
        # BackupManager setup
        backup_manager = build_backup_manager()
        instance = mock_infofile.return_value
        # Instruct the patched method to raise a general exception
        backup_manager.executor.start_backup = Mock(
            side_effect=Exception('abc'))
        # invoke backup method
        backup_manager.backup()
        # verify that mock status is FAILED
        assert call.set_attribute('status', 'FAILED') in instance.mock_calls
        # Instruct the patched method to raise a KeyboardInterrupt
        backup_manager.executor.start_backup = Mock(
            side_effect=KeyboardInterrupt())
        # invoke backup method
        backup_manager.backup()
        # verify that mock status is FAILED
        assert call.set_attribute('status', 'FAILED') in instance.mock_calls
开发者ID:terrorobe,项目名称:barman,代码行数:29,代码来源:test_backup.py


示例17: test_setup

    def test_setup(self, rsync_mock):
        """
        Test the method that set up a recovery
        """
        backup_info = testing_helpers.build_test_backup_info()
        backup_manager = testing_helpers.build_backup_manager()
        executor = RecoveryExecutor(backup_manager)
        backup_info.version = 90300

        # setup should create a temporary directory
        # and teardown should delete it
        ret = executor._setup(backup_info, None, "/tmp")
        assert os.path.exists(ret["tempdir"])
        executor._teardown(ret)
        assert not os.path.exists(ret["tempdir"])

        # no postgresql.auto.conf on version 9.3
        ret = executor._setup(backup_info, None, "/tmp")
        executor._teardown(ret)
        assert "postgresql.auto.conf" not in ret["configuration_files"]

        # Check the present for postgresql.auto.conf on version 9.4
        backup_info.version = 90400
        ret = executor._setup(backup_info, None, "/tmp")
        executor._teardown(ret)
        assert "postgresql.auto.conf" in ret["configuration_files"]

        # Receive a error if the remote command is invalid
        with pytest.raises(SystemExit):
            executor.server.path = None
            executor._setup(backup_info, "invalid", "/tmp")
开发者ID:moench-tegeder,项目名称:barman,代码行数:31,代码来源:test_recovery_executor.py


示例18: test_concurrent_stop_backup

    def test_concurrent_stop_backup(self, label_mock, stop_mock,):
        """
        Basic test for the start_backup method

        :param label_mock: mimic the response of _write_backup_label
        :param stop_mock: mimic the response of _pgespresso_stop_backup
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        stop_mock.return_value = ("000000060000A25700000044", stop_time)

        backup_info = build_test_backup_info()
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/45000000'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0
        assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py


示例19: test_backup_copy_with_included_files

 def test_backup_copy_with_included_files(self, rsync_moc, tmpdir, capsys):
     backup_manager = build_backup_manager(global_conf={
         'barman_home': tmpdir.mkdir('home').strpath
     })
     # Create a backup info with additional configuration files
     backup_info = build_test_backup_info(
         server=backup_manager.server,
         pgdata="/pg/data",
         config_file="/etc/postgresql.conf",
         hba_file="/pg/data/pg_hba.conf",
         ident_file="/pg/data/pg_ident.conf",
         begin_xlog="0/2000028",
         begin_wal="000000010000000000000002",
         included_files=["/tmp/config/file.conf"],
         begin_offset=28)
     backup_info.save()
     # This is to check that all the preparation is done correctly
     assert os.path.exists(backup_info.filename)
     # Execute a backup
     backup_manager.executor.backup_copy(backup_info)
     out, err = capsys.readouterr()
     # check for the presence of the warning in the stderr
     assert "WARNING: The usage of include directives is not supported" in err
     # check that the additional configuration file is present in the output
     assert backup_info.included_files[0] in err
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py


示例20: test_pgespresso_stop_backup

    def test_pgespresso_stop_backup(self, tbs_map_mock, label_mock):
        """
        Basic test for the pgespresso_stop_backup method
        """
        # Build a backup info and configure the mocks
        server = build_mocked_server(main_conf={
            'backup_options':
            BackupOptions.CONCURRENT_BACKUP
        })
        backup_manager = build_backup_manager(server=server)

        # Mock executor._pgespresso_stop_backup(backup_info) call
        stop_time = datetime.datetime.now()
        server.postgres.server_version = 90500
        server.postgres.pgespresso_stop_backup.return_value = {
            'end_wal': "000000060000A25700000044",
            'timestamp': stop_time
        }

        backup_info = build_test_backup_info(timeline=6)
        backup_manager.executor.strategy.stop_backup(backup_info)

        assert backup_info.end_xlog == 'A257/44FFFFFF'
        assert backup_info.end_wal == '000000060000A25700000044'
        assert backup_info.end_offset == 0xFFFFFF
        assert backup_info.end_time == stop_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:26,代码来源:test_executor.py



注:本文中的testing_helpers.build_backup_manager函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Python testing_helpers.build_config_dictionary函数代码示例发布时间:2022-05-27
下一篇:
Python util.get_resource_path函数代码示例发布时间:2022-05-27
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap