本文整理汇总了Python中testing_helpers.build_test_backup_info函数的典型用法代码示例。如果您正苦于以下问题:Python build_test_backup_info函数的具体用法?Python build_test_backup_info怎么用?Python build_test_backup_info使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了build_test_backup_info函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_delete_running_backup
def test_delete_running_backup(self, delete_mock, get_first_backup_mock, tmpdir, capsys):
"""
Simple test for the deletion of a running backup.
We want to test the behaviour of the server.delete_backup method
when invoked on a running backup
"""
# Test the removal of a running backup. status STARTED
server = build_real_server({"barman_home": tmpdir.strpath})
backup_info_started = build_test_backup_info(status=BackupInfo.STARTED, server_name=server.config.name)
get_first_backup_mock.return_value = backup_info_started.backup_id
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_started)
out, err = capsys.readouterr()
assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err
# Test the removal of a running backup. status EMPTY
backup_info_empty = build_test_backup_info(status=BackupInfo.EMPTY, server_name=server.config.name)
get_first_backup_mock.return_value = backup_info_empty.backup_id
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_empty)
out, err = capsys.readouterr()
assert "Cannot delete a running backup (%s %s)" % (server.config.name, backup_info_started.backup_id) in err
# Test the removal of a running backup. status DONE
backup_info_done = build_test_backup_info(status=BackupInfo.DONE, server_name=server.config.name)
with ServerBackupLock(tmpdir.strpath, server.config.name):
server.delete_backup(backup_info_done)
delete_mock.assert_called_with(backup_info_done)
# Test the removal of a backup not running. status STARTED
server.delete_backup(backup_info_started)
delete_mock.assert_called_with(backup_info_started)
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:32,代码来源:test_server.py
示例2: test_backup_copy_with_included_files
def test_backup_copy_with_included_files(self, rsync_moc, tmpdir, capsys):
backup_manager = build_backup_manager(global_conf={
'barman_home': tmpdir.mkdir('home').strpath
})
# Create a backup info with additional configuration files
backup_info = build_test_backup_info(
server=backup_manager.server,
pgdata="/pg/data",
config_file="/etc/postgresql.conf",
hba_file="/pg/data/pg_hba.conf",
ident_file="/pg/data/pg_ident.conf",
begin_xlog="0/2000028",
begin_wal="000000010000000000000002",
included_files=["/tmp/config/file.conf"],
begin_offset=28)
backup_info.save()
# This is to check that all the preparation is done correctly
assert os.path.exists(backup_info.filename)
# Execute a backup
backup_manager.executor.backup_copy(backup_info)
out, err = capsys.readouterr()
# check for the presence of the warning in the stderr
assert "WARNING: The usage of include directives is not supported" in err
# check that the additional configuration file is present in the output
assert backup_info.included_files[0] in err
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py
示例3: test_get_wal_info
def test_get_wal_info(self, get_wal_mock, tmpdir):
"""
Basic test for get_wal_info method
Test the wals per second and total time in seconds values.
:return:
"""
# Build a test server with a test path
server = build_real_server(global_conf={
'barman_home': tmpdir.strpath
})
# Mock method get_wal_until_next_backup for returning a list of
# 3 fake WAL. the first one is the start and stop WAL of the backup
wal_list = [
WalFileInfo.from_xlogdb_line(
"000000010000000000000002\t16777216\t1434450086.53\tNone\n"),
WalFileInfo.from_xlogdb_line(
"000000010000000000000003\t16777216\t1434450087.54\tNone\n"),
WalFileInfo.from_xlogdb_line(
"000000010000000000000004\t16777216\t1434450088.55\tNone\n")]
get_wal_mock.return_value = wal_list
backup_info = build_test_backup_info(
server=server,
begin_wal=wal_list[0].name,
end_wal=wal_list[0].name)
backup_info.save()
# Evaluate total time in seconds:
# last_wal_timestamp - first_wal_timestamp
wal_total_seconds = wal_list[-1].time - wal_list[0].time
# Evaluate the wals_per_second value:
# wals_in_backup + wals_until_next_backup / total_time_in_seconds
wals_per_second = len(wal_list) / wal_total_seconds
wal_info = server.get_wal_info(backup_info)
assert wal_info
assert wal_info['wal_total_seconds'] == wal_total_seconds
assert wal_info['wals_per_second'] == wals_per_second
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_server.py
示例4: test_backup_cache_add
def test_backup_cache_add(self, tmpdir):
"""
Check the method responsible for the registration of a BackupInfo obj
into the backups cache
"""
# build a backup_manager and setup a basic configuration
backup_manager = build_backup_manager(
name='TestServer',
global_conf={
'barman_home': tmpdir.strpath
})
# Create a BackupInfo object with status DONE
b_info = build_test_backup_info(
backup_id='fake_backup_id',
server=backup_manager.server,
)
b_info.save()
assert backup_manager._backup_cache is None
# Register the object to cache. The cache is not initialized, so it
# must load the cache from disk.
backup_manager.backup_cache_add(b_info)
# Check that the test backup is in the cache
assert backup_manager.get_backup(b_info.backup_id) is b_info
# Initialize an empty cache
backup_manager._backup_cache = {}
# Add the backup again
backup_manager.backup_cache_add(b_info)
assert backup_manager.get_backup(b_info.backup_id) is b_info
开发者ID:terrorobe,项目名称:barman,代码行数:33,代码来源:test_backup.py
示例5: test_backup_copy
def test_backup_copy(self, rsync_mock, tmpdir):
"""
Test the execution of a rsync copy
:param rsync_mock: mock for the rsync command
:param tmpdir: temporary dir
"""
backup_manager = build_backup_manager(global_conf={
'barman_home': tmpdir.mkdir('home').strpath
})
backup_info = build_test_backup_info(
server=backup_manager.server,
pgdata="/pg/data",
config_file="/etc/postgresql.conf",
hba_file="/pg/data/pg_hba.conf",
ident_file="/pg/data/pg_ident.conf",
begin_xlog="0/2000028",
begin_wal="000000010000000000000002",
begin_offset=28)
backup_info.save()
# This is to check that all the preparation is done correctly
assert os.path.exists(backup_info.filename)
backup_manager.executor.backup_copy(backup_info)
assert rsync_mock.mock_calls == [
mock.call(check=True, network_compression=False, args=[],
bwlimit=None, ssh='ssh',
ssh_options=['-c', '"arcfour"', '-p', '22',
'[email protected]', '-o',
'BatchMode=yes', '-o',
'StrictHostKeyChecking=no']),
mock.call().smart_copy(':/fake/location/',
backup_info.get_data_directory(16387),
None, None),
mock.call(check=True, network_compression=False, args=[],
bwlimit=None, ssh='ssh',
ssh_options=['-c', '"arcfour"', '-p', '22',
'[email protected]', '-o',
'BatchMode=yes', '-o',
'StrictHostKeyChecking=no']),
mock.call().smart_copy(':/another/location/',
backup_info.get_data_directory(16405),
None, None),
mock.call(network_compression=False,
exclude_and_protect=['/pg_tblspc/16387',
'/pg_tblspc/16405'],
args=[], bwlimit=None, ssh='ssh',
ssh_options=['-c', '"arcfour"', '-p', '22',
'[email protected]',
'-o', 'BatchMode=yes',
'-o', 'StrictHostKeyChecking=no']),
mock.call().smart_copy(':/pg/data/',
backup_info.get_data_directory(),
None, None),
mock.call()(
':/pg/data/global/pg_control',
'%s/global/pg_control' % backup_info.get_data_directory()),
mock.call()(':/etc/postgresql.conf',
backup_info.get_data_directory())]
开发者ID:forndb,项目名称:barman,代码行数:60,代码来源:test_executor.py
示例6: test_concurrent_stop_backup
def test_concurrent_stop_backup(self, label_mock, stop_mock,):
"""
Basic test for the start_backup method
:param label_mock: mimic the response of _write_backup_label
:param stop_mock: mimic the response of _pgespresso_stop_backup
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pgespresso_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
stop_mock.return_value = ("000000060000A25700000044", stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.strategy.stop_backup(backup_info)
assert backup_info.end_xlog == 'A257/45000000'
assert backup_info.end_wal == '000000060000A25700000044'
assert backup_info.end_offset == 0
assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py
示例7: test_get_backup
def test_get_backup(self, tmpdir):
"""
Check the get_backup method that uses the backups cache to retrieve
a backup using the id
"""
# Setup temp dir and server
# build a backup_manager and setup a basic configuration
backup_manager = build_backup_manager(
name='TestServer',
global_conf={
'barman_home': tmpdir.strpath
})
# Create a BackupInfo object with status DONE
b_info = build_test_backup_info(
backup_id='fake_backup_id',
server=backup_manager.server,
)
b_info.save()
assert backup_manager._backup_cache is None
# Check that the backup returned is the same
assert backup_manager.get_backup(b_info.backup_id).to_dict() == \
b_info.to_dict()
# Empty the backup manager cache
backup_manager._backup_cache = {}
# Check that the backup returned is None
assert backup_manager.get_backup(b_info.backup_id) is None
开发者ID:terrorobe,项目名称:barman,代码行数:31,代码来源:test_backup.py
示例8: test_backup_cache_remove
def test_backup_cache_remove(self, tmpdir):
"""
Check the method responsible for the removal of a BackupInfo object from
the backups cache
"""
# build a backup_manager and setup a basic configuration
backup_manager = build_backup_manager(
name='TestServer',
global_conf={
'barman_home': tmpdir.strpath
})
assert backup_manager._backup_cache is None
# Create a BackupInfo object with status DONE
b_info = build_test_backup_info(
backup_id='fake_backup_id',
server=backup_manager.server,
)
# Remove the backup from the uninitialized cache
backup_manager.backup_cache_remove(b_info)
# Check that the test backup is still not initialized
assert backup_manager._backup_cache is None
# Initialize the cache
backup_manager._backup_cache = {b_info.backup_id: b_info}
# Remove the backup from the cache
backup_manager.backup_cache_remove(b_info)
assert b_info.backup_id not in backup_manager._backup_cache
开发者ID:terrorobe,项目名称:barman,代码行数:30,代码来源:test_backup.py
示例9: test_recover_basebackup_copy
def test_recover_basebackup_copy(self, rsync_pg_mock, tmpdir):
"""
Test the copy of a content of a backup during a recovery
:param rsync_pg_mock: Mock rsync object for the purpose if this test
"""
# Build basic folder/files structure
dest = tmpdir.mkdir('destination')
server = testing_helpers.build_real_server()
backup_info = testing_helpers.build_test_backup_info(
server=server,
tablespaces=[('tbs1', 16387, '/fake/location')])
# Build a executor
executor = RecoveryExecutor(server.backup_manager)
executor.config.tablespace_bandwidth_limit = {'tbs1': ''}
executor.config.bandwidth_limit = 10
executor.basebackup_copy(
backup_info, dest.strpath, tablespaces=None)
rsync_pg_mock.assert_called_with(
network_compression=False, bwlimit=10, ssh=None, path=None,
exclude_and_protect=['/pg_tblspc/16387'])
rsync_pg_mock.assert_any_call(
network_compression=False, bwlimit='', ssh=None, path=None,
check=True)
rsync_pg_mock.return_value.smart_copy.assert_any_call(
'/some/barman/home/main/base/1234567890/16387/',
'/fake/location', None)
rsync_pg_mock.return_value.smart_copy.assert_called_with(
'/some/barman/home/main/base/1234567890/data/',
dest.strpath, None)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:29,代码来源:test_recovery_executor.py
示例10: test_map_temporary_config_files
def test_map_temporary_config_files(self, tmpdir):
"""
Test the method that prepares configuration files
for the final steps of a recovery
"""
# Build directory/files structure for testing
tempdir = tmpdir.mkdir("tempdir")
recovery_info = {
"configuration_files": ["postgresql.conf", "postgresql.auto.conf"],
"tempdir": tempdir.strpath,
"temporary_configuration_files": [],
"results": {"changes": [], "warnings": [], "missing_files": []},
}
backup_info = testing_helpers.build_test_backup_info()
backup_info.config.basebackups_directory = tmpdir.strpath
datadir = tmpdir.mkdir(backup_info.backup_id).mkdir("data")
postgresql_conf_local = datadir.join("postgresql.conf")
postgresql_auto_local = datadir.join("postgresql.auto.conf")
postgresql_conf_local.write("archive_command = something\n" "data_directory = something")
postgresql_auto_local.write("archive_command = something\n" "data_directory = something")
# Build a RecoveryExecutor object (using a mock as server and backup
# manager.
backup_manager = testing_helpers.build_backup_manager()
executor = RecoveryExecutor(backup_manager)
executor._map_temporary_config_files(recovery_info, backup_info, "[email protected]")
# check that configuration files have been moved by the method
assert tempdir.join("postgresql.conf").check()
assert tempdir.join("postgresql.conf").computehash() == postgresql_conf_local.computehash()
assert tempdir.join("postgresql.auto.conf").check()
assert tempdir.join("postgresql.auto.conf").computehash() == postgresql_auto_local.computehash()
assert recovery_info["results"]["missing_files"] == ["pg_hba.conf", "pg_ident.conf"]
开发者ID:moench-tegeder,项目名称:barman,代码行数:32,代码来源:test_recovery_executor.py
示例11: test_setup
def test_setup(self, rsync_mock):
"""
Test the method that set up a recovery
"""
backup_info = testing_helpers.build_test_backup_info()
backup_manager = testing_helpers.build_backup_manager()
executor = RecoveryExecutor(backup_manager)
backup_info.version = 90300
# setup should create a temporary directory
# and teardown should delete it
ret = executor._setup(backup_info, None, "/tmp")
assert os.path.exists(ret["tempdir"])
executor._teardown(ret)
assert not os.path.exists(ret["tempdir"])
# no postgresql.auto.conf on version 9.3
ret = executor._setup(backup_info, None, "/tmp")
executor._teardown(ret)
assert "postgresql.auto.conf" not in ret["configuration_files"]
# Check the present for postgresql.auto.conf on version 9.4
backup_info.version = 90400
ret = executor._setup(backup_info, None, "/tmp")
executor._teardown(ret)
assert "postgresql.auto.conf" in ret["configuration_files"]
# Receive a error if the remote command is invalid
with pytest.raises(SystemExit):
executor.server.path = None
executor._setup(backup_info, "invalid", "/tmp")
开发者ID:moench-tegeder,项目名称:barman,代码行数:31,代码来源:test_recovery_executor.py
示例12: test_set_pitr_targets
def test_set_pitr_targets(self, tmpdir):
"""
Evaluate targets for point in time recovery
"""
# Build basic folder/files structure
tempdir = tmpdir.mkdir("temp_dir")
dest = tmpdir.mkdir("dest")
wal_dest = tmpdir.mkdir("wal_dest")
recovery_info = {
"configuration_files": ["postgresql.conf", "postgresql.auto.conf"],
"tempdir": tempdir.strpath,
"results": {"changes": [], "warnings": []},
"is_pitr": False,
"wal_dest": wal_dest.strpath,
"get_wal": False,
}
backup_info = testing_helpers.build_test_backup_info()
backup_manager = testing_helpers.build_backup_manager()
# Build a recovery executor
executor = RecoveryExecutor(backup_manager)
executor._set_pitr_targets(recovery_info, backup_info, dest.strpath, "", "", "", "")
# Test with empty values (no PITR)
assert recovery_info["target_epoch"] is None
assert recovery_info["target_datetime"] is None
assert recovery_info["wal_dest"] == wal_dest.strpath
# Test for PITR targets
executor._set_pitr_targets(
recovery_info, backup_info, dest.strpath, "target_name", "2015-06-03 16:11:03.71038+02", "2", None
)
target_datetime = dateutil.parser.parse("2015-06-03 16:11:03.710380+02:00")
target_epoch = time.mktime(target_datetime.timetuple()) + (target_datetime.microsecond / 1000000.0)
assert recovery_info["target_datetime"] == target_datetime
assert recovery_info["target_epoch"] == target_epoch
assert recovery_info["wal_dest"] == dest.join("barman_xlog").strpath
开发者ID:moench-tegeder,项目名称:barman,代码行数:35,代码来源:test_recovery_executor.py
示例13: test_prepare_tablespaces
def test_prepare_tablespaces(self, tmpdir):
"""
Test tablespaces preparation for recovery
"""
# Prepare basic directory/files structure
dest = tmpdir.mkdir('destination')
wals = tmpdir.mkdir('wals')
backup_info = testing_helpers.build_test_backup_info(
tablespaces=[('tbs1', 16387, '/fake/location')])
# build an executor
server = testing_helpers.build_real_server(
main_conf={'wals_directory': wals.strpath})
executor = RecoveryExecutor(server.backup_manager)
# use a mock as cmd obj
cmd_mock = Mock()
executor.prepare_tablespaces(backup_info, cmd_mock, dest.strpath, {})
cmd_mock.create_dir_if_not_exists.assert_any_call(
dest.join('pg_tblspc').strpath)
cmd_mock.create_dir_if_not_exists.assert_any_call(
'/fake/location')
cmd_mock.delete_if_exists.assert_called_once_with(
dest.join('pg_tblspc').join('16387').strpath)
cmd_mock.create_symbolic_link.assert_called_once_with(
'/fake/location',
dest.join('pg_tblspc').join('16387').strpath)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:25,代码来源:test_recovery_executor.py
示例14: test_pgespresso_stop_backup
def test_pgespresso_stop_backup(self, tbs_map_mock, label_mock):
"""
Basic test for the pgespresso_stop_backup method
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pgespresso_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
server.postgres.server_version = 90500
server.postgres.pgespresso_stop_backup.return_value = {
'end_wal': "000000060000A25700000044",
'timestamp': stop_time
}
backup_info = build_test_backup_info(timeline=6)
backup_manager.executor.strategy.stop_backup(backup_info)
assert backup_info.end_xlog == 'A257/44FFFFFF'
assert backup_info.end_wal == '000000060000A25700000044'
assert backup_info.end_offset == 0xFFFFFF
assert backup_info.end_time == stop_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:26,代码来源:test_executor.py
示例15: test_load_backup_cache
def test_load_backup_cache(self, tmpdir):
"""
Check the loading of backups inside the backup_cache
"""
# build a backup_manager and setup a basic configuration
backup_manager = build_backup_manager(
name='TestServer',
global_conf={
'barman_home': tmpdir.strpath
})
# Make sure the cache is uninitialized
assert backup_manager._backup_cache is None
# Create a BackupInfo object with status DONE
b_info = build_test_backup_info(
backup_id='fake_backup_id',
server=backup_manager.server,
)
b_info.save()
# Load backups inside the cache
backup_manager._load_backup_cache()
# Check that the test backup is inside the backups_cache
assert backup_manager._backup_cache[b_info.backup_id].to_dict() == \
b_info.to_dict()
开发者ID:terrorobe,项目名称:barman,代码行数:27,代码来源:test_backup.py
示例16: test_exclusive_stop_backup
def test_exclusive_stop_backup(self, stop_mock):
"""
Basic test for the start_backup method
:param stop_mock: mimic the response od _pg_stop_backup
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.EXCLUSIVE_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pg_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
stop_mock.return_value = ("266/4A9C1EF8",
"00000010000002660000004A",
10231544,
stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.strategy.stop_backup(backup_info)
# check that the submitted values are stored inside the BackupInfo obj
assert backup_info.end_xlog == '266/4A9C1EF8'
assert backup_info.end_wal == '00000010000002660000004A'
assert backup_info.end_offset == 10231544
assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:27,代码来源:test_executor.py
示例17: test_first_backup
def test_first_backup(self):
server = build_mocked_server()
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'RECOVERY WINDOW OF 4 WEEKS')
assert isinstance(rp, RecoveryWindowRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.get_available_backups.return_value = {
"test_backup": backup_info
}
rp.server.config.minimum_redundancy = 1
# execute retention policy report
report = rp.first_backup()
assert report == 'test_backup'
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'REDUNDANCY 2')
assert isinstance(rp, RedundancyRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.get_available_backups.return_value = {
"test_backup": backup_info
}
rp.server.config.minimum_redundancy = 1
# execute retention policy report
report = rp.first_backup()
assert report == 'test_backup'
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:47,代码来源:test_retention_policies.py
示例18: test_get_wal_until_next_backup
def test_get_wal_until_next_backup(self, get_backup_mock, tmpdir):
"""
Simple test for the management of .history files
"""
# build a WalFileInfo object
wfile_info = WalFileInfo()
wfile_info.name = '000000010000000000000003'
wfile_info.size = 42
wfile_info.time = 43
wfile_info.compression = None
# build a WalFileInfo history object
history_info = WalFileInfo()
history_info.name = '00000001.history'
history_info.size = 42
history_info.time = 43
history_info.compression = None
# create a xlog.db and add the 2 entries
wals_dir = tmpdir.mkdir("wals")
xlog = wals_dir.join("xlog.db")
xlog.write(wfile_info.to_xlogdb_line() + history_info.to_xlogdb_line())
# fake backup
backup = build_test_backup_info(
begin_wal='000000010000000000000001',
end_wal='000000010000000000000004')
# mock a server object and mock a return call to get_next_backup method
server = build_real_server(
global_conf={
"barman_lock_directory": tmpdir.mkdir('lock').strpath
},
main_conf={
"wals_directory": wals_dir.strpath
})
get_backup_mock.return_value = build_test_backup_info(
backup_id="1234567899",
begin_wal='000000010000000000000005',
end_wal='000000010000000000000009')
wals = []
for wal_file in server.get_wal_until_next_backup(backup,
include_history=True):
# get the result of the xlogdb read
wals.append(wal_file.name)
# check for the presence of the .history file
assert history_info.name in wals
开发者ID:moench-tegeder,项目名称:barman,代码行数:47,代码来源:test_server.py
示例19: test_recovery_window_report
def test_recovery_window_report(self, caplog):
"""
Basic unit test of RecoveryWindowRetentionPolicy
Given a mock simulating a Backup with status DONE and
the end_date not over the point of recoverability,
the report method of the RecoveryWindowRetentionPolicy class must mark
it as valid
"""
server = build_mocked_server()
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'RECOVERY WINDOW OF 4 WEEKS')
assert isinstance(rp, RecoveryWindowRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
backup_source = {'test_backup3': backup_info}
# Add a obsolete backup
backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=5)
backup_source['test_backup2'] = backup_info
# Add a second obsolete backup
backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=6)
backup_source['test_backup'] = backup_info
rp.server.get_available_backups.return_value = backup_source
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.config.minimum_redundancy = 1
rp.server.config.name = "test"
# execute retention policy report
report = rp.report()
# check that our mock is valid for the retention policy
assert report == {'test_backup3': 'VALID',
'test_backup2': 'OBSOLETE',
'test_backup': 'OBSOLETE'}
# Expect a ValueError if passed context is invalid
with pytest.raises(ValueError):
rp.report(context='invalid')
# Set a new minimum_redundancy parameter, enforcing the usage of the
# configuration parameter instead of the retention policy default
rp.server.config.minimum_redundancy = 4
# execute retention policy report
rp.report()
# Check for the warning inside the log
caplog.set_level(logging.WARNING)
log = caplog.text
warn = "WARNING Keeping obsolete backup test_backup2 for " \
"server test (older than %s) due to minimum redundancy " \
"requirements (4)\n" % rp._point_of_recoverability()
assert log.find(warn)
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:56,代码来源:test_retention_policies.py
示例20: test_stop_backup
def test_stop_backup(self, espressostop_mock, stop_mock):
"""
Basic test for the start_backup method
:param espressostop_mock: mimic the response of pg_espresso_stop_backup
:param stop_mock: mimic the response od pg_stop_backup
"""
# Build a backup info and configure the mocks
backup_manager = build_backup_manager()
# Test 1: stop exclusive backup
# Mock executor.pg_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
stop_mock.return_value = ("266/4A9C1EF8",
"00000010000002660000004A",
10231544,
stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.stop_backup(backup_info)
# check that the submitted values are stored inside the BackupInfo obj
assert backup_info.end_xlog == '266/4A9C1EF8'
assert backup_info.end_wal == '00000010000002660000004A'
assert backup_info.end_offset == 10231544
assert backup_info.end_time == stop_time
# Test 2: stop concurrent backup
# change the configuration to concurrent backup
backup_manager.executor.config.backup_options = [
BackupOptions.CONCURRENT_BACKUP]
# Mock executor.pgespresso_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
espressostop_mock.return_value = ("000000060000A25700000044", stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.stop_backup(backup_info)
assert backup_info.end_xlog == 'A257/45000000'
assert backup_info.end_wal == '000000060000A25700000044'
assert backup_info.end_offset == 0
assert backup_info.end_time == stop_time
开发者ID:terrorobe,项目名称:barman,代码行数:43,代码来源:test_executor.py
注:本文中的testing_helpers.build_test_backup_info函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论