Skip to content

[pbckp-128] dry-run option for catchup #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ env:
- PG_VERSION=9.5 PG_BRANCH=REL9_5_STABLE
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=OFF MODE=archive
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=backup
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=catchup
- PG_VERSION=14 PG_BRANCH=REL_14_STABLE PTRACK_PATCH_PG_BRANCH=REL_14_STABLE MODE=catchup
- PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=REL_13_STABLE MODE=catchup
- PG_VERSION=12 PG_BRANCH=REL_12_STABLE PTRACK_PATCH_PG_BRANCH=REL_12_STABLE MODE=catchup
- PG_VERSION=11 PG_BRANCH=REL_11_STABLE PTRACK_PATCH_PG_BRANCH=REL_11_STABLE MODE=catchup
- PG_VERSION=10 PG_BRANCH=REL_10_STABLE PTRACK_PATCH_PG_BRANCH=OFF MODE=catchup
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=OFF MODE=compression
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=OFF MODE=delta
# - PG_VERSION=13 PG_BRANCH=REL_13_STABLE PTRACK_PATCH_PG_BRANCH=OFF MODE=locking
Expand Down
84 changes: 52 additions & 32 deletions src/catchup.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* catchup.c: sync DB cluster
*
* Copyright (c) 2021, Postgres Professional
* Copyright (c) 2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
Expand Down Expand Up @@ -507,16 +507,20 @@ catchup_multithreaded_copy(int num_threads,
/* Run threads */
thread_interrupted = false;
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
for (i = 0; i < num_threads; i++)
if (!dry_run)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
for (i = 0; i < num_threads; i++)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
}
}

/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (!dry_run)
pthread_join(threads[i], NULL);
all_threads_successful &= threads_args[i].completed;
transfered_bytes_result += threads_args[i].transfered_bytes;
}
Expand Down Expand Up @@ -706,9 +710,14 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

/* Start stream replication */
join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
if (!dry_run)
{
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
}
else
elog(INFO, "WAL streaming skipping with --dry-run option");

source_filelist = parray_new();

Expand Down Expand Up @@ -779,9 +788,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

/* Build the page map from ptrack information */
make_pagemap_from_ptrack_2(source_filelist, source_conn,
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
time(&end_time);
elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
difftime(end_time, start_time));
Expand Down Expand Up @@ -820,9 +829,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char dirpath[MAXPGPATH];

join_path_components(dirpath, dest_pgdata, file->rel_path);

elog(VERBOSE, "Create directory '%s'", dirpath);
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
if (!dry_run)
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
}
else
{
Expand Down Expand Up @@ -853,15 +862,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"",
linked_path, to_path);

/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
if (!dry_run)
{
/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
}
}
}

Expand Down Expand Up @@ -930,7 +942,10 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char fullpath[MAXPGPATH];

join_path_components(fullpath, dest_pgdata, file->rel_path);
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
if (!dry_run)
{
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
}
elog(VERBOSE, "Deleted file \"%s\"", fullpath);

/* shrink dest pgdata list */
Expand Down Expand Up @@ -961,7 +976,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
catchup_isok = transfered_datafiles_bytes != -1;

/* at last copy control file */
if (catchup_isok)
if (catchup_isok && !dry_run)
{
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
Expand All @@ -972,7 +987,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
transfered_datafiles_bytes += source_pg_control_file->size;
}

if (!catchup_isok)
if (!catchup_isok && !dry_run)
{
char pretty_time[20];
char pretty_transfered_data_bytes[20];
Expand Down Expand Up @@ -1010,14 +1025,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
pg_free(stop_backup_query_text);
}

wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
if (!dry_run)
wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);

#if PG_VERSION_NUM >= 90600
/* Write backup_label */
Assert(stop_backup_result.backup_label_content != NULL);
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
if (!dry_run)
{
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
}
free(stop_backup_result.backup_label_content);
stop_backup_result.backup_label_content = NULL;
stop_backup_result.backup_label_content_len = 0;
Expand All @@ -1040,6 +1059,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
#endif

/* wait for end of wal streaming and calculate wal size transfered */
if (!dry_run)
{
parray *wal_files_list = NULL;
wal_files_list = parray_new();
Expand Down Expand Up @@ -1091,17 +1111,17 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
}

/* Sync all copied files unless '--no-sync' flag is used */
if (sync_dest_files)
if (sync_dest_files && !dry_run)
catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
else
elog(WARNING, "Files are not synced to disk");

/* Cleanup */
if (dest_filelist)
if (dest_filelist && !dry_run)
{
parray_walk(dest_filelist, pgFileFree);
parray_free(dest_filelist);
}
parray_free(dest_filelist);
parray_walk(source_filelist, pgFileFree);
parray_free(source_filelist);
pgFileFree(source_pg_control_file);
Expand Down
3 changes: 3 additions & 0 deletions src/help.c
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ help_catchup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n\n"));

printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
Expand Down Expand Up @@ -1081,4 +1082,6 @@ help_catchup(void)
printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));

printf(_(" --dry-run perform a trial run without any changes\n\n"));
}
154 changes: 154 additions & 0 deletions tests/catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,3 +1455,157 @@ def test_config_exclusion(self):
dst_pg.stop()
#self.assertEqual(1, 0, 'Stop test')
self.del_test_dir(module_name, self.fname)

#########################################
# --dry-run
#########################################
def test_dry_run_catchup_full(self):
"""
Test dry-run option for full catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do full catchup
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after catchup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

def test_dry_run_catchup_ptrack(self):
"""
Test dry-run option for catchup in incremental ptrack mode
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')

# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do incremental catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

def test_dry_run_catchup_delta(self):
"""
Test dry-run option for catchup in incremental delta mode
"""

# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
initdb_params = ['--data-checksums'],
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', "--dry-run"]
)

# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

1 change: 1 addition & 0 deletions tests/expected/option_help.out
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--remote-proto] [--remote-host]
[--remote-port] [--remote-path] [--remote-user]
[--ssh-options]
[--dry-run]
[--help]

Read the website for details. <https://github.com/postgrespro/pg_probackup>
Expand Down
Loading