本文整理汇总了Python中testing_helpers.build_mocked_server函数的典型用法代码示例。如果您正苦于以下问题:Python build_mocked_server函数的具体用法?Python build_mocked_server怎么用?Python build_mocked_server使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了build_mocked_server函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_concurrent_stop_backup
def test_concurrent_stop_backup(self, label_mock, stop_mock,):
"""
Basic test for the start_backup method
:param label_mock: mimic the response of _write_backup_label
:param stop_mock: mimic the response of _pgespresso_stop_backup
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pgespresso_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
stop_mock.return_value = ("000000060000A25700000044", stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.strategy.stop_backup(backup_info)
assert backup_info.end_xlog == 'A257/45000000'
assert backup_info.end_wal == '000000060000A25700000044'
assert backup_info.end_offset == 0
assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:25,代码来源:test_executor.py
示例2: test_setup
def test_setup(self):
"""
Test the method that set up a recovery
"""
backup_info = testing_helpers.build_test_backup_info()
server = testing_helpers.build_mocked_server()
backup_manager = Mock(server=server, config=server.config)
executor = RecoveryExecutor(backup_manager)
backup_info.version = 90300
# setup should create a temporary directory
# and teardown should delete it
ret = executor.setup(backup_info, None, "/tmp")
assert os.path.exists(ret['tempdir'])
executor.teardown(ret)
assert not os.path.exists(ret['tempdir'])
# no postgresql.auto.conf on version 9.3
ret = executor.setup(backup_info, None, "/tmp")
executor.teardown(ret)
assert "postgresql.auto.conf" not in ret['configuration_files']
# Check the present for postgresql.auto.conf on version 9.4
backup_info.version = 90400
ret = executor.setup(backup_info, None, "/tmp")
executor.teardown(ret)
assert "postgresql.auto.conf" in ret['configuration_files']
# Receive a error if the remote command is invalid
with pytest.raises(SystemExit):
executor.server.path = None
executor.setup(backup_info, "invalid", "/tmp")
开发者ID:gcalacoci,项目名称:barman,代码行数:32,代码来源:test_recovery_executor.py
示例3: test_exclusive_stop_backup
def test_exclusive_stop_backup(self, stop_mock):
"""
Basic test for the start_backup method
:param stop_mock: mimic the response od _pg_stop_backup
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.EXCLUSIVE_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pg_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
stop_mock.return_value = ("266/4A9C1EF8",
"00000010000002660000004A",
10231544,
stop_time)
backup_info = build_test_backup_info()
backup_manager.executor.strategy.stop_backup(backup_info)
# check that the submitted values are stored inside the BackupInfo obj
assert backup_info.end_xlog == '266/4A9C1EF8'
assert backup_info.end_wal == '00000010000002660000004A'
assert backup_info.end_offset == 10231544
assert backup_info.end_time == stop_time
开发者ID:forndb,项目名称:barman,代码行数:27,代码来源:test_executor.py
示例4: test_backup_info_from_backup_id
def test_backup_info_from_backup_id(self, tmpdir):
"""
Test the initialization of a BackupInfo object
using a backup_id as argument
"""
# We want to test the loading system using a backup_id.
# So we create a backup.info file into the tmpdir then
# we instruct the configuration on the position of the
# testing backup.info file
server = build_mocked_server(
main_conf={
'basebackups_directory': tmpdir.strpath
},
)
infofile = tmpdir.mkdir('fake_name').join('backup.info')
infofile.write(BASE_BACKUP_INFO)
# Load the backup.info file using the backup_id
b_info = BackupInfo(server, backup_id="fake_name")
assert b_info
assert b_info.begin_offset == 40
assert b_info.begin_wal == '000000010000000000000004'
assert b_info.timeline == 1
assert isinstance(b_info.tablespaces, list)
assert b_info.tablespaces[0].name == 'fake_tbs'
assert b_info.tablespaces[0].oid == 16384
assert b_info.tablespaces[0].location == '/fake_tmp/tbs'
开发者ID:zacchiro,项目名称:barman,代码行数:26,代码来源:test_infofile.py
示例5: test_pgespresso_stop_backup
def test_pgespresso_stop_backup(self, tbs_map_mock, label_mock):
"""
Basic test for the pgespresso_stop_backup method
"""
# Build a backup info and configure the mocks
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock executor._pgespresso_stop_backup(backup_info) call
stop_time = datetime.datetime.now()
server.postgres.server_version = 90500
server.postgres.pgespresso_stop_backup.return_value = {
'end_wal': "000000060000A25700000044",
'timestamp': stop_time
}
backup_info = build_test_backup_info(timeline=6)
backup_manager.executor.strategy.stop_backup(backup_info)
assert backup_info.end_xlog == 'A257/44FFFFFF'
assert backup_info.end_wal == '000000060000A25700000044'
assert backup_info.end_offset == 0xFFFFFF
assert backup_info.end_time == stop_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:26,代码来源:test_executor.py
示例6: test_pgespresso_start_backup
def test_pgespresso_start_backup(self):
"""
Simple test for _pgespresso_start_backup method
of the RsyncBackupExecutor class
"""
# Build and configure a server using a mock
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
backup_label = 'test label'
# Expect an exception because pgespresso is not installed
backup_manager.server.pgespresso_installed.return_value = False
with pytest.raises(Exception):
backup_manager.executor.pgespresso_start_backup(backup_label)
# Report pgespresso installed. Expect no error and the correct call
# sequence
backup_manager.server.reset_mock()
backup_manager.executor.server.pgespresso_installed.return_value = True
backup_manager.executor.strategy._pgespresso_start_backup(backup_label)
pg_connect = mock.call.pg_connect()
with_pg_connect = pg_connect.__enter__()
cursor = with_pg_connect.cursor()
assert backup_manager.server.mock_calls == [
pg_connect,
with_pg_connect,
pg_connect.__enter__().rollback(),
cursor,
cursor.execute(ANY, ANY),
cursor.fetchone(),
pg_connect.__exit__(None, None, None)]
开发者ID:forndb,项目名称:barman,代码行数:35,代码来源:test_executor.py
示例7: test_rsync_backup_executor_init
def test_rsync_backup_executor_init(self):
"""
Test the construction of a RecoveryExecutor
"""
# Test
server = testing_helpers.build_mocked_server()
backup_manager = Mock(server=server, config=server.config)
assert RecoveryExecutor(backup_manager)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:9,代码来源:test_recovery_executor.py
示例8: test_postgres_backup_executor_init
def test_postgres_backup_executor_init(self):
"""
Test the construction of a PostgresBackupExecutor
"""
server = build_mocked_server(global_conf={'backup_method': 'postgres'})
executor = PostgresBackupExecutor(server.backup_manager)
assert executor
assert executor.strategy
# Expect an error if the tablespace_bandwidth_limit option
# is set for this server.
server = build_mocked_server(
global_conf={'backup_method': 'postgres',
'tablespace_bandwidth_limit': 1})
executor = PostgresBackupExecutor(server.backup_manager)
assert executor
assert executor.strategy
assert server.config.disabled
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:18,代码来源:test_executor.py
示例9: test_exclusive_start_backup
def test_exclusive_start_backup(self):
"""
Basic test for the start_backup method
:param start_mock: mock for the _pgespresso_start_backup
:param start_mock: mock for the pg_start_backup
"""
# Build a backup_manager using a mocked server
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.EXCLUSIVE_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock server.get_pg_setting('data_directory') call
backup_manager.server.postgres.get_setting.return_value = '/pg/data'
# Mock server.get_pg_configuration_files() call
server.postgres.get_configuration_files.return_value = dict(
config_file="/etc/postgresql.conf",
hba_file="/pg/pg_hba.conf",
ident_file="/pg/pg_ident.conf",
)
# Mock server.get_pg_tablespaces() call
tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
server.postgres.get_tablespaces.return_value = tablespaces
# Test 1: start exclusive backup
# Mock executor.pg_start_backup(label) call
start_time = datetime.datetime.now()
server.postgres.start_exclusive_backup.return_value = (
"A257/44B4C0D8",
"000000060000A25700000044",
11845848,
start_time)
# Build a test empty backup info
backup_info = BackupInfo(server=backup_manager.server,
backup_id='fake_id')
backup_manager.executor.strategy.start_backup(backup_info)
# Check that all the values are correctly saved inside the BackupInfo
assert backup_info.pgdata == '/pg/data'
assert backup_info.config_file == "/etc/postgresql.conf"
assert backup_info.hba_file == "/pg/pg_hba.conf"
assert backup_info.ident_file == "/pg/pg_ident.conf"
assert backup_info.tablespaces == tablespaces
assert backup_info.status == 'STARTED'
assert backup_info.timeline == 6
assert backup_info.begin_xlog == 'A257/44B4C0D8'
assert backup_info.begin_wal == '000000060000A25700000044'
assert backup_info.begin_offset == 11845848
assert backup_info.begin_time == start_time
# Check that the correct call to pg_start_backup has been made
server.postgres.start_exclusive_backup.assert_called_with(
'Barman backup main fake_id')
开发者ID:zacchiro,项目名称:barman,代码行数:56,代码来源:test_executor.py
示例10: test_recovery_window_report
def test_recovery_window_report(self, caplog):
"""
Basic unit test of RecoveryWindowRetentionPolicy
Given a mock simulating a Backup with status DONE and
the end_date not over the point of recoverability,
the report method of the RecoveryWindowRetentionPolicy class must mark
it as valid
"""
server = build_mocked_server()
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'RECOVERY WINDOW OF 4 WEEKS')
assert isinstance(rp, RecoveryWindowRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
backup_source = {'test_backup3': backup_info}
# Add a obsolete backup
backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=5)
backup_source['test_backup2'] = backup_info
# Add a second obsolete backup
backup_info.end_time = datetime.now(tzlocal()) - timedelta(weeks=6)
backup_source['test_backup'] = backup_info
rp.server.get_available_backups.return_value = backup_source
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.config.minimum_redundancy = 1
rp.server.config.name = "test"
# execute retention policy report
report = rp.report()
# check that our mock is valid for the retention policy
assert report == {'test_backup3': 'VALID',
'test_backup2': 'OBSOLETE',
'test_backup': 'OBSOLETE'}
# Expect a ValueError if passed context is invalid
with pytest.raises(ValueError):
rp.report(context='invalid')
# Set a new minimum_redundancy parameter, enforcing the usage of the
# configuration parameter instead of the retention policy default
rp.server.config.minimum_redundancy = 4
# execute retention policy report
rp.report()
# Check for the warning inside the log
caplog.set_level(logging.WARNING)
log = caplog.text
warn = "WARNING Keeping obsolete backup test_backup2 for " \
"server test (older than %s) due to minimum redundancy " \
"requirements (4)\n" % rp._point_of_recoverability()
assert log.find(warn)
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:56,代码来源:test_retention_policies.py
示例11: test_to_json
def test_to_json(self, tmpdir):
server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})
# Build a fake backup
backup_dir = tmpdir.mkdir("fake_backup_id")
info_file = backup_dir.join("backup.info")
info_file.write(BASE_BACKUP_INFO)
b_info = BackupInfo(server, backup_id="fake_backup_id")
# This call should not raise
assert json.dumps(b_info.to_json())
开发者ID:moench-tegeder,项目名称:barman,代码行数:11,代码来源:test_infofile.py
示例12: test_pgespresso_start_backup
def test_pgespresso_start_backup(self):
"""
Test concurrent backup using pgespresso
"""
# Test: start concurrent backup
# Build a backup_manager using a mocked server
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock server.get_pg_setting('data_directory') call
backup_manager.server.postgres.get_setting.return_value = '/pg/data'
# Mock server.get_pg_configuration_files() call
server.postgres.get_configuration_files.return_value = dict(
config_file="/etc/postgresql.conf",
hba_file="/pg/pg_hba.conf",
ident_file="/pg/pg_ident.conf",
)
# Mock server.get_pg_tablespaces() call
tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
server.postgres.get_tablespaces.return_value = tablespaces
server.postgres.server_version = 90500
# Mock executor._pgespresso_start_backup(label) call
start_time = datetime.datetime.now(tz.tzlocal()).replace(microsecond=0)
server.postgres.pgespresso_start_backup.return_value = {
'backup_label':
"START WAL LOCATION: 266/4A9C1EF8 "
"(file 00000010000002660000004A)\n"
"START TIME: %s" % start_time.strftime('%Y-%m-%d %H:%M:%S %Z'),
}
# Build a test empty backup info
backup_info = BackupInfo(server=backup_manager.server,
backup_id='fake_id2')
backup_manager.executor.strategy.start_backup(backup_info)
# Check that all the values are correctly saved inside the BackupInfo
assert backup_info.pgdata == '/pg/data'
assert backup_info.config_file == "/etc/postgresql.conf"
assert backup_info.hba_file == "/pg/pg_hba.conf"
assert backup_info.ident_file == "/pg/pg_ident.conf"
assert backup_info.tablespaces == tablespaces
assert backup_info.status == 'STARTED'
assert backup_info.timeline == 16
assert backup_info.begin_xlog == '266/4A9C1EF8'
assert backup_info.begin_wal == '00000010000002660000004A'
assert backup_info.begin_offset == 10231544
assert backup_info.begin_time == start_time
# Check that the correct call to pg_start_backup has been made
server.postgres.pgespresso_start_backup.assert_called_with(
'Barman backup main fake_id2')
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:53,代码来源:test_executor.py
示例13: test_from_json
def test_from_json(self, tmpdir):
server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})
# Build a fake backup
backup_dir = tmpdir.mkdir("fake_backup_id")
info_file = backup_dir.join("backup.info")
info_file.write(BASE_BACKUP_INFO)
b_info = BackupInfo(server, backup_id="fake_backup_id")
# Build another BackupInfo from the json dump
new_binfo = BackupInfo.from_json(server, b_info.to_json())
assert b_info.to_dict() == new_binfo.to_dict()
开发者ID:moench-tegeder,项目名称:barman,代码行数:13,代码来源:test_infofile.py
示例14: test_concurrent_start_backup
def test_concurrent_start_backup(self):
"""
Test concurrent backup using 9.6 api
"""
# Test: start concurrent backup
# Build a backup_manager using a mocked server
server = build_mocked_server(main_conf={
'backup_options':
BackupOptions.CONCURRENT_BACKUP
})
backup_manager = build_backup_manager(server=server)
# Mock server.get_pg_setting('data_directory') call
backup_manager.server.postgres.get_setting.return_value = '/pg/data'
# Mock server.get_pg_configuration_files() call
server.postgres.get_configuration_files.return_value = dict(
config_file="/etc/postgresql.conf",
hba_file="/pg/pg_hba.conf",
ident_file="/pg/pg_ident.conf",
)
# Mock server.get_pg_tablespaces() call
tablespaces = [Tablespace._make(('test_tbs', 1234, '/tbs/test'))]
server.postgres.get_tablespaces.return_value = tablespaces
# this is a postgres 9.6
server.postgres.server_version = 90600
# Mock call to new api method
start_time = datetime.datetime.now()
server.postgres.start_concurrent_backup.return_value = {
'location': "A257/44B4C0D8",
'timeline': 6,
'timestamp': start_time,
}
# Build a test empty backup info
backup_info = BackupInfo(server=backup_manager.server,
backup_id='fake_id2')
backup_manager.executor.strategy.start_backup(backup_info)
# Check that all the values are correctly saved inside the BackupInfo
assert backup_info.pgdata == '/pg/data'
assert backup_info.config_file == "/etc/postgresql.conf"
assert backup_info.hba_file == "/pg/pg_hba.conf"
assert backup_info.ident_file == "/pg/pg_ident.conf"
assert backup_info.tablespaces == tablespaces
assert backup_info.status == 'STARTED'
assert backup_info.timeline == 6
assert backup_info.begin_xlog == 'A257/44B4C0D8'
assert backup_info.begin_wal == '000000060000A25700000044'
assert backup_info.begin_offset == 11845848
assert backup_info.begin_time == start_time
开发者ID:2ndquadrant-it,项目名称:barman,代码行数:50,代码来源:test_executor.py
示例15: test_rsync_backup_executor_init
def test_rsync_backup_executor_init(self):
"""
Test the construction of a RsyncBackupExecutor
"""
# Test
server = build_mocked_server()
backup_manager = Mock(server=server, config=server.config)
assert RsyncBackupExecutor(backup_manager)
# Test exception for the missing ssh_command
with pytest.raises(SshCommandException):
server.config.ssh_command = None
RsyncBackupExecutor(server)
开发者ID:forndb,项目名称:barman,代码行数:14,代码来源:test_executor.py
示例16: test_redundancy_report
def test_redundancy_report(self, caplog):
"""
Test of the management of the minimum_redundancy parameter
into the backup_report method of the RedundancyRetentionPolicy class
"""
server = build_mocked_server()
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'REDUNDANCY 2')
assert isinstance(rp, RedundancyRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.get_available_backups.return_value = {
"test_backup": backup_info,
"test_backup2": backup_info,
"test_backup3": backup_info,
}
rp.server.config.minimum_redundancy = 1
# execute retention policy report
report = rp.report()
# check that our mock is valid for the retention policy because
# the total number of valid backups is lower than the retention policy
# redundancy.
assert report == {'test_backup': BackupInfo.OBSOLETE,
'test_backup2': BackupInfo.VALID,
'test_backup3': BackupInfo.VALID}
# Expect a ValueError if passed context is invalid
with pytest.raises(ValueError):
rp.report(context='invalid')
# Set a new minimum_redundancy parameter, enforcing the usage of the
# configuration parameter instead of the retention policy default
rp.server.config.minimum_redundancy = 3
# execute retention policy report
rp.report()
# Check for the warning inside the log
caplog.set_level(logging.WARNING)
log = caplog.text
assert log.find("WARNING Retention policy redundancy (2) "
"is lower than the required minimum redundancy (3). "
"Enforce 3.")
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:50,代码来源:test_retention_policies.py
示例17: test_data_dir
def test_data_dir(self, tmpdir):
"""
Simple test for the method that is responsible of the build of the
path to the datadir and to the tablespaces dir according
with backup_version
"""
server = build_mocked_server(
main_conf={
'basebackups_directory': tmpdir.strpath
},
)
# Build a fake v2 backup
backup_dir = tmpdir.mkdir('fake_backup_id')
data_dir = backup_dir.mkdir('data')
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
b_info = BackupInfo(server, backup_id="fake_backup_id")
# Check that the paths are built according with version
assert b_info.backup_version == 2
assert b_info.get_data_directory() == data_dir.strpath
assert b_info.get_data_directory(16384) == (backup_dir.strpath +
'/16384')
# Build a fake v1 backup
backup_dir = tmpdir.mkdir('another_fake_backup_id')
pgdata_dir = backup_dir.mkdir('pgdata')
info_file = backup_dir.join('backup.info')
info_file.write(BASE_BACKUP_INFO)
b_info = BackupInfo(server, backup_id="another_fake_backup_id")
# Check that the paths are built according with version
assert b_info.backup_version == 1
assert b_info.get_data_directory(16384) == \
backup_dir.strpath + '/pgdata/pg_tblspc/16384'
assert b_info.get_data_directory() == pgdata_dir.strpath
# Check that an exception is raised if an invalid oid
# is provided to the method
with pytest.raises(ValueError):
b_info.get_data_directory(12345)
# Check that a ValueError exception is raised with an
# invalid oid when the tablespaces list is None
b_info.tablespaces = None
# and expect a value error
with pytest.raises(ValueError):
b_info.get_data_directory(16384)
开发者ID:ViktorStiskala,项目名称:pgbarman,代码行数:49,代码来源:test_infofile.py
示例18: test_first_backup
def test_first_backup(self):
server = build_mocked_server()
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'RECOVERY WINDOW OF 4 WEEKS')
assert isinstance(rp, RecoveryWindowRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.get_available_backups.return_value = {
"test_backup": backup_info
}
rp.server.config.minimum_redundancy = 1
# execute retention policy report
report = rp.first_backup()
assert report == 'test_backup'
rp = RetentionPolicyFactory.create(
server,
'retention_policy',
'REDUNDANCY 2')
assert isinstance(rp, RedundancyRetentionPolicy)
# Build a BackupInfo object with status to DONE
backup_info = build_test_backup_info(
server=rp.server,
backup_id='test1',
end_time=datetime.now(tzlocal()))
# instruct the get_available_backups method to return a map with
# our mock as result and minimum_redundancy = 1
rp.server.get_available_backups.return_value = {
"test_backup": backup_info
}
rp.server.config.minimum_redundancy = 1
# execute retention policy report
report = rp.first_backup()
assert report == 'test_backup'
开发者ID:blueninj0r,项目名称:pgbarman,代码行数:47,代码来源:test_retention_policies.py
示例19: test_analyse_temporary_config_files
def test_analyse_temporary_config_files(self, tmpdir):
"""
Test the method that identifies dangerous options into the configuration
files
"""
# Build directory/files structure for testing
tempdir = tmpdir.mkdir('tempdir')
recovery_info = {
'configuration_files': ['postgresql.conf', 'postgresql.auto.conf'],
'tempdir': tempdir.strpath,
'temporary_configuration_files': [],
'results': {'changes': [], 'warnings': []}
}
postgresql_conf = tempdir.join('postgresql.conf')
postgresql_auto = tempdir.join('postgresql.auto.conf')
postgresql_conf.write('archive_command = something\n'
'data_directory = something')
postgresql_auto.write('archive_command = something\n'
'data_directory = something')
local_rec = tmpdir.mkdir('local_rec')
recovery_info['temporary_configuration_files'].append(
postgresql_conf.strpath)
recovery_info['temporary_configuration_files'].append(
postgresql_auto.strpath)
# Build a RecoveryExecutor object (using a mock as server and backup
# manager.
server = testing_helpers.build_mocked_server()
backup_manager = Mock(server=server, config=server.config)
executor = RecoveryExecutor(backup_manager)
# Identify dangerous options into config files for remote recovery
executor.analyse_temporary_config_files(recovery_info)
assert len(recovery_info['results']['changes']) == 2
assert len(recovery_info['results']['warnings']) == 2
# Prepare for a local recovery test
recovery_info['results']['changes'] = []
recovery_info['results']['warnings'] = []
postgresql_conf_local = local_rec.join('postgresql.conf')
postgresql_auto_local = local_rec.join('postgresql.auto.conf')
postgresql_conf_local.write('archive_command = something\n'
'data_directory = something')
postgresql_auto_local.write('archive_command = something\n'
'data_directory = something')
# Identify dangerous options for local recovery
executor.analyse_temporary_config_files(recovery_info)
assert len(recovery_info['results']['changes']) == 2
assert len(recovery_info['results']['warnings']) == 2
开发者ID:stig,项目名称:barman,代码行数:46,代码来源:test_recovery_executor.py
示例20: test_backup_info_save
def test_backup_info_save(self, tmpdir):
"""
Test the save method of a BackupInfo object
"""
# Check the saving method.
# Load a backup.info file, modify the BackupInfo object
# then save it.
server = build_mocked_server(main_conf={"basebackups_directory": tmpdir.strpath})
backup_dir = tmpdir.mkdir("fake_name")
infofile = backup_dir.join("backup.info")
b_info = BackupInfo(server, backup_id="fake_name")
b_info.status = BackupInfo.FAILED
b_info.save()
# read the file looking for the modified line
for line in infofile.readlines():
if line.startswith("status"):
assert line.strip() == "status=FAILED"
开发者ID:moench-tegeder,项目名称:barman,代码行数:17,代码来源:test_infofile.py
注:本文中的testing_helpers.build_mocked_server函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论