diff --git a/Makefile b/Makefile index e449c9e5..4272e348 100644 --- a/Makefile +++ b/Makefile @@ -50,8 +50,21 @@ include $(top_srcdir)/contrib/contrib-global.mk endif PG_CPPFLAGS = -I$(libpq_srcdir) ${PTHREAD_CFLAGS} -Isrc -I$(srchome)/$(subdir)/src + +# LZ4 compression library detection +LZ4_LIBS = +ifneq ($(shell pkg-config --exists liblz4 2>/dev/null && echo yes),) +PG_CPPFLAGS += -DHAVE_LIBLZ4 $(shell pkg-config --cflags liblz4) +LZ4_LIBS = $(shell pkg-config --libs liblz4) +else +ifneq ($(wildcard /usr/include/lz4.h),) +PG_CPPFLAGS += -DHAVE_LIBLZ4 +LZ4_LIBS = -llz4 +endif +endif + override CPPFLAGS := -DFRONTEND $(CPPFLAGS) $(PG_CPPFLAGS) -PG_LIBS_INTERNAL = $(libpq_pgport) ${PTHREAD_CFLAGS} +PG_LIBS_INTERNAL = $(libpq_pgport) ${PTHREAD_CFLAGS} $(LZ4_LIBS) src/utils/configuration.o: src/datapagemap.h src/archive.o: src/instr_time.h diff --git a/doc/pgprobackup.xml b/doc/pgprobackup.xml index 3497dab3..2dd48b46 100644 --- a/doc/pgprobackup.xml +++ b/doc/pgprobackup.xml @@ -3014,7 +3014,7 @@ content-crc = 802820606 compress-alg — compression algorithm used during backup. Possible values: - zlib, pglz, none. + zlib, pglz, lz4, none. @@ -5885,11 +5885,15 @@ pg_probackup catchup -b catchup_mode Defines the algorithm to use for compressing data files. Possible values are zlib, - pglz, and none. If set - to zlib or pglz, this option enables compression. By default, + pglz, lz4, and none. If set + to zlib, pglz, or lz4, this option enables compression. By default, compression is disabled. For the command, the - pglz compression algorithm is not supported. + pglz and lz4 compression algorithms are not supported. + + + The lz4 algorithm requires the liblz4 library + to be installed at build time. Default: none @@ -5901,9 +5905,11 @@ pg_probackup catchup -b catchup_mode - Defines compression level (0 through 9, 0 being no compression - and 9 being best compression). This option can be used - together with the option. + Defines compression level. For zlib and pglz, + valid values are 0 through 9 (0 being no compression and 9 being best compression). + For lz4, valid values are 0 through 12: level 0 uses fast LZ4 compression, + levels 1-12 use LZ4_HC (high compression) mode with increasing compression ratio. + This option can be used together with the option. Default: 1 diff --git a/src/catalog.c b/src/catalog.c index 409d9141..9eff7f00 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -2882,6 +2882,8 @@ parse_compress_alg(const char *arg) return ZLIB_COMPRESS; else if (pg_strncasecmp("pglz", arg, len) == 0) return PGLZ_COMPRESS; + else if (pg_strncasecmp("lz4", arg, len) == 0) + return LZ4_COMPRESS; else if (pg_strncasecmp("none", arg, len) == 0) return NONE_COMPRESS; else @@ -2902,6 +2904,8 @@ deparse_compress_alg(int alg) return "zlib"; case PGLZ_COMPRESS: return "pglz"; + case LZ4_COMPRESS: + return "lz4"; } return NULL; diff --git a/src/data.c b/src/data.c index ef3325c1..78f37f53 100644 --- a/src/data.c +++ b/src/data.c @@ -22,6 +22,11 @@ #include #endif +#ifdef HAVE_LIBLZ4 +#include +#include +#endif + #include "utils/thread.h" /* Union to ease operations on relation pages */ @@ -58,6 +63,45 @@ zlib_decompress(void *dst, size_t dst_size, void const *src, size_t src_size) } #endif +#ifdef HAVE_LIBLZ4 +/* + * Implementation of LZ4 compression method. + * Uses LZ4_HC (high compression) mode when level is specified, + * otherwise uses fast LZ4 compression. + */ +static int32 +lz4_compress(void *dst, size_t dst_size, void const *src, size_t src_size, + int level) +{ + int compressed_size; + + /* + * Level 1-12: use LZ4_HC for better compression ratio. + * Level 0 or unspecified: use fast LZ4_compress_default. + */ + if (level >= LZ4_COMPRESS_LEVEL_MIN && level <= LZ4_COMPRESS_LEVEL_MAX) + compressed_size = LZ4_compress_HC((const char *) src, (char *) dst, + (int) src_size, (int) dst_size, level); + else + compressed_size = LZ4_compress_default((const char *) src, (char *) dst, + (int) src_size, (int) dst_size); + + return compressed_size > 0 ? compressed_size : -1; +} + +/* Implementation of LZ4 decompression method */ +static int32 +lz4_decompress(void *dst, size_t dst_size, void const *src, size_t src_size) +{ + int decompressed_size; + + decompressed_size = LZ4_decompress_safe((const char *) src, (char *) dst, + (int) src_size, (int) dst_size); + + return decompressed_size > 0 ? decompressed_size : -1; +} +#endif + /* * Compresses source into dest using algorithm. Returns the number of bytes * written in the destination buffer, or -1 if compression fails. @@ -83,6 +127,16 @@ do_compress(void *dst, size_t dst_size, void const *src, size_t src_size, #endif case PGLZ_COMPRESS: return pglz_compress(src, src_size, dst, PGLZ_strategy_always); +#ifdef HAVE_LIBLZ4 + case LZ4_COMPRESS: + { + int32 ret; + ret = lz4_compress(dst, dst_size, src, src_size, level); + if (ret < 0 && errormsg) + *errormsg = "LZ4 compression failed"; + return ret; + } +#endif } return -1; @@ -119,6 +173,16 @@ do_decompress(void *dst, size_t dst_size, void const *src, size_t src_size, return pglz_decompress(src, src_size, dst, dst_size, true); #else return pglz_decompress(src, src_size, dst, dst_size); +#endif +#ifdef HAVE_LIBLZ4 + case LZ4_COMPRESS: + { + int32 ret; + ret = lz4_decompress(dst, dst_size, src, src_size); + if (ret < 0 && errormsg) + *errormsg = "LZ4 decompression failed"; + return ret; + } #endif } diff --git a/src/help.c b/src/help.c index eacef9a4..d74e04f1 100644 --- a/src/help.c +++ b/src/help.c @@ -405,7 +405,7 @@ help_backup(void) printf(_("\n Compression options:\n")); printf(_(" --compress alias for --compress-algorithm='zlib' and --compress-level=1\n")); printf(_(" --compress-algorithm=compress-algorithm\n")); - printf(_(" available options: 'zlib', 'pglz', 'none' (default: none)\n")); + printf(_(" available options: 'zlib', 'pglz', 'lz4', 'none' (default: none)\n")); printf(_(" --compress-level=compress-level\n")); printf(_(" level of compression [0-9] (default: 1)\n")); @@ -909,7 +909,7 @@ help_set_config(void) printf(_("\n Compression options:\n")); printf(_(" --compress alias for --compress-algorithm='zlib' and --compress-level=1\n")); printf(_(" --compress-algorithm=compress-algorithm\n")); - printf(_(" available options: 'zlib','pglz','none' (default: 'none')\n")); + printf(_(" available options: 'zlib','pglz','lz4','none' (default: 'none')\n")); printf(_(" --compress-level=compress-level\n")); printf(_(" level of compression [0-9] (default: 1)\n")); @@ -1052,7 +1052,7 @@ help_archive_push(void) printf(_("\n Compression options:\n")); printf(_(" --compress alias for --compress-algorithm='zlib' and --compress-level=1\n")); printf(_(" --compress-algorithm=compress-algorithm\n")); - printf(_(" available options: 'zlib','pglz','none' (default: 'none')\n")); + printf(_(" available options: 'zlib','pglz','lz4','none' (default: 'none')\n")); printf(_(" --compress-level=compress-level\n")); printf(_(" level of compression [0-9] (default: 1)\n")); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 030d64b0..20aa631a 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -1154,7 +1154,15 @@ compress_init(ProbackupSubcmd const subcmd) "compress-algorithm option"); } - if (instance_config.compress_level < 0 || instance_config.compress_level > 9) + /* Validate compression level based on algorithm */ + if (instance_config.compress_alg == LZ4_COMPRESS) + { + if (instance_config.compress_level < 0 || + instance_config.compress_level > LZ4_COMPRESS_LEVEL_MAX) + elog(ERROR, "--compress-level for LZ4 must be in range 0-%d", + LZ4_COMPRESS_LEVEL_MAX); + } + else if (instance_config.compress_level < 0 || instance_config.compress_level > 9) elog(ERROR, "--compress-level value must be in the range from 0 to 9"); if (instance_config.compress_alg == ZLIB_COMPRESS && instance_config.compress_level == 0) @@ -1166,6 +1174,11 @@ compress_init(ProbackupSubcmd const subcmd) if (instance_config.compress_alg == ZLIB_COMPRESS) elog(ERROR, "This build does not support zlib compression"); else +#endif +#ifndef HAVE_LIBLZ4 + if (instance_config.compress_alg == LZ4_COMPRESS) + elog(ERROR, "This build does not support LZ4 compression"); + else #endif if (instance_config.compress_alg == PGLZ_COMPRESS && num_threads > 1) elog(ERROR, "Multithread backup does not support pglz compression"); diff --git a/src/pg_probackup.h b/src/pg_probackup.h index e5d03495..2338052a 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -217,8 +217,13 @@ typedef enum CompressAlg NONE_COMPRESS, PGLZ_COMPRESS, ZLIB_COMPRESS, + LZ4_COMPRESS, } CompressAlg; +/* LZ4 compression level limits (LZ4_HC mode) */ +#define LZ4_COMPRESS_LEVEL_MIN 1 +#define LZ4_COMPRESS_LEVEL_MAX 12 + typedef enum ForkName { none, diff --git a/tests/compression_test.py b/tests/compression_test.py index 448ea47c..1f956fd6 100644 --- a/tests/compression_test.py +++ b/tests/compression_test.py @@ -34,7 +34,11 @@ def test_basic_compression_stream_zlib(self): ) full_result = node.table_checksum("t_heap") full_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="full", options=["--stream", "--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=zlib"], ) # PAGE BACKUP @@ -46,7 +50,11 @@ def test_basic_compression_stream_zlib(self): ) page_result = node.table_checksum("t_heap") page_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="page", options=["--stream", "--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="page", + options=["--stream", "--compress-algorithm=zlib"], ) # DELTA BACKUP @@ -58,7 +66,11 @@ def test_basic_compression_stream_zlib(self): ) delta_result = node.table_checksum("t_heap") delta_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="delta", options=["--stream", "--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=zlib"], ) # Drop Node @@ -143,7 +155,11 @@ def test_compression_archive_zlib(self): ) full_result = node.table_checksum("t_heap") full_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="full", options=["--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="full", + options=["--compress-algorithm=zlib"], ) # PAGE BACKUP @@ -155,7 +171,11 @@ def test_compression_archive_zlib(self): ) page_result = node.table_checksum("t_heap") page_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="page", options=["--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="page", + options=["--compress-algorithm=zlib"], ) # DELTA BACKUP @@ -166,7 +186,11 @@ def test_compression_archive_zlib(self): ) delta_result = node.table_checksum("t_heap") delta_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="delta", options=["--compress-algorithm=zlib"] + backup_dir, + "node", + node, + backup_type="delta", + options=["--compress-algorithm=zlib"], ) # Drop Node @@ -253,7 +277,11 @@ def test_compression_stream_pglz(self): ) full_result = node.table_checksum("t_heap") full_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="full", options=["--stream", "--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=pglz"], ) # PAGE BACKUP @@ -265,7 +293,11 @@ def test_compression_stream_pglz(self): ) page_result = node.table_checksum("t_heap") page_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="page", options=["--stream", "--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="page", + options=["--stream", "--compress-algorithm=pglz"], ) # DELTA BACKUP @@ -277,7 +309,11 @@ def test_compression_stream_pglz(self): ) delta_result = node.table_checksum("t_heap") delta_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="delta", options=["--stream", "--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=pglz"], ) # Drop Node @@ -364,7 +400,11 @@ def test_compression_archive_pglz(self): ) full_result = node.table_checksum("t_heap") full_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="full", options=["--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="full", + options=["--compress-algorithm=pglz"], ) # PAGE BACKUP @@ -376,7 +416,11 @@ def test_compression_archive_pglz(self): ) page_result = node.table_checksum("t_heap") page_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="page", options=["--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="page", + options=["--compress-algorithm=pglz"], ) # DELTA BACKUP @@ -388,7 +432,11 @@ def test_compression_archive_pglz(self): ) delta_result = node.table_checksum("t_heap") delta_backup_id = self.backup_node( - backup_dir, "node", node, backup_type="delta", options=["--compress-algorithm=pglz"] + backup_dir, + "node", + node, + backup_type="delta", + options=["--compress-algorithm=pglz"], ) # Drop Node @@ -467,7 +515,13 @@ def test_compression_wrong_algorithm(self): node.slow_start() try: - self.backup_node(backup_dir, "node", node, backup_type="full", options=["--compress-algorithm=bla-blah"]) + self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--compress-algorithm=bla-blah"], + ) # we should die here because exception is what we expect to happen self.assertEqual( 1, @@ -503,12 +557,21 @@ def test_incompressible_pages(self): node.slow_start() # Full - self.backup_node(backup_dir, "node", node, options=["--compress-algorithm=zlib", "--compress-level=0"]) + self.backup_node( + backup_dir, + "node", + node, + options=["--compress-algorithm=zlib", "--compress-level=0"], + ) node.pgbench_init(scale=3) self.backup_node( - backup_dir, "node", node, backup_type="delta", options=["--compress-algorithm=zlib", "--compress-level=0"] + backup_dir, + "node", + node, + backup_type="delta", + options=["--compress-algorithm=zlib", "--compress-level=0"], ) pgdata = self.pgdata_content(node.data_dir) @@ -523,3 +586,808 @@ def test_incompressible_pages(self): self.compare_pgdata(pgdata, pgdata_restored) node.slow_start() + + def test_basic_compression_stream_lz4(self): + """ + make archive node, make full, page and delta stream backups with LZ4, + check data correctness in restored instance + """ + self.maxDiff = None + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # FULL BACKUP + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,256) i", + ) + full_result = node.table_checksum("t_heap") + full_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # PAGE BACKUP + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(256,512) i", + ) + page_result = node.table_checksum("t_heap") + page_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="page", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # DELTA BACKUP + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(512,768) i", + ) + delta_result = node.table_checksum("t_heap") + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Drop Node + node.cleanup() + + # Check full backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(full_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=full_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + full_result_new = node.table_checksum("t_heap") + self.assertEqual(full_result, full_result_new) + node.cleanup() + + # Check page backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(page_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=page_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + page_result_new = node.table_checksum("t_heap") + self.assertEqual(page_result, page_result_new) + node.cleanup() + + # Check delta backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(delta_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=delta_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + delta_result_new = node.table_checksum("t_heap") + self.assertEqual(delta_result, delta_result_new) + + def test_compression_archive_lz4(self): + """ + make archive node, make full, page and delta backups with LZ4, + check data correctness in restored instance + """ + self.maxDiff = None + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # FULL BACKUP + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(i::text)::tsvector as tsvector from generate_series(0,1) i", + ) + full_result = node.table_checksum("t_heap") + full_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--compress-algorithm=lz4"], + ) + + # PAGE BACKUP + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(i::text)::tsvector as tsvector " + "from generate_series(0,2) i", + ) + page_result = node.table_checksum("t_heap") + page_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="page", + options=["--compress-algorithm=lz4"], + ) + + # DELTA BACKUP + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(i::text)::tsvector as tsvector from generate_series(0,3) i", + ) + delta_result = node.table_checksum("t_heap") + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--compress-algorithm=lz4"], + ) + + # Drop Node + node.cleanup() + + # Check full backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(full_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=full_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + full_result_new = node.table_checksum("t_heap") + self.assertEqual(full_result, full_result_new) + node.cleanup() + + # Check page backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(page_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=page_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + page_result_new = node.table_checksum("t_heap") + self.assertEqual(page_result, page_result_new) + node.cleanup() + + # Check delta backup + self.assertIn( + "INFO: Restore of backup {0} completed.".format(delta_backup_id), + self.restore_node( + backup_dir, + "node", + node, + backup_id=delta_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + delta_result_new = node.table_checksum("t_heap") + self.assertEqual(delta_result, delta_result_new) + node.cleanup() + + def test_lz4_compression_levels(self): + """ + test LZ4 compression with different levels (0=fast, 1-12=HC) + """ + self.maxDiff = None + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # Create test data + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,1000) i", + ) + + # Test level 0 (fast compression) + self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4", "--compress-level=0"], + ) + + # Test level 6 (medium HC compression) + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(1000,2000) i", + ) + self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4", "--compress-level=6"], + ) + + # Test level 12 (max HC compression) + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(2000,3000) i", + ) + result_level12 = node.table_checksum("t_heap") + backup_id_level12 = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4", "--compress-level=12"], + ) + + # Drop Node + node.cleanup() + + # Check level 12 backup restore + self.assertIn( + "INFO: Restore of backup {0} completed.".format(backup_id_level12), + self.restore_node( + backup_dir, + "node", + node, + backup_id=backup_id_level12, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ), + "\n Unexpected Error Message: {0}\n CMD: {1}".format(repr(self.output), self.cmd), + ) + node.slow_start() + + restored_result = node.table_checksum("t_heap") + self.assertEqual(result_level12, restored_result) + + def test_merge_lz4_backups(self): + """ + Test MERGE command with LZ4 compressed backups: + FULL(lz4) -> PAGE(lz4) -> DELTA(lz4) -> MERGE + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # FULL backup with LZ4 + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,1000) i", + ) + + self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # PAGE backup with LZ4 + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(1000,2000) i", + ) + + self.backup_node( + backup_dir, + "node", + node, + backup_type="page", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # DELTA backup with LZ4 + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(2000,3000) i", + ) + + final_result = node.table_checksum("t_heap") + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Save pgdata for comparison + if self.paranoia: + pgdata = self.pgdata_content(node.data_dir) + + # Merge all backups + self.merge_backup(backup_dir, "node", delta_backup_id, options=["-j", "4"]) + + # Check merge result + show_backups = self.show_pb(backup_dir, "node") + self.assertEqual(len(show_backups), 1) + self.assertEqual(show_backups[0]["status"], "OK") + self.assertEqual(show_backups[0]["backup-mode"], "FULL") + + # Restore and verify + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + restored_result = node.table_checksum("t_heap") + self.assertEqual(final_result, restored_result) + + # Physical comparison + if self.paranoia: + pgdata_restored = self.pgdata_content(node.data_dir) + self.compare_pgdata(pgdata, pgdata_restored) + + def test_validate_lz4_backup(self): + """ + Test VALIDATE command with LZ4 compressed backups + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # Create data and make FULL backup with LZ4 + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text from generate_series(0,10000) i", + ) + + full_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Validate FULL backup + self.validate_pb(backup_dir, "node", full_backup_id, options=["-j", "4"]) + + # Make incremental backup + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text from generate_series(10000,20000) i", + ) + + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Validate DELTA backup + self.validate_pb(backup_dir, "node", delta_backup_id, options=["-j", "4"]) + + # Validate entire instance + self.validate_pb(backup_dir, "node", options=["-j", "4"]) + + # Check backup statuses + full_status = self.show_pb(backup_dir, "node", full_backup_id)["status"] + delta_status = self.show_pb(backup_dir, "node", delta_backup_id)["status"] + + self.assertEqual(full_status, "OK") + self.assertEqual(delta_status, "OK") + + def test_mixed_compression_chain_zlib_lz4(self): + """ + Test backup chain with mixed compression algorithms: + FULL(zlib) -> PAGE(lz4) -> DELTA(lz4) -> restore + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # FULL backup with ZLIB + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(0,500) i", + ) + + self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=zlib"], + ) + + # PAGE backup with LZ4 + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(500,1000) i", + ) + page_result = node.table_checksum("t_heap") + + page_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="page", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # DELTA backup with LZ4 + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text, " + "md5(repeat(i::text,10))::tsvector as tsvector " + "from generate_series(1000,1500) i", + ) + delta_result = node.table_checksum("t_heap") + + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Validate the mixed chain + self.validate_pb(backup_dir, "node", options=["-j", "4"]) + + # Restore from DELTA (should use entire chain) + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + backup_id=delta_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + restored_result = node.table_checksum("t_heap") + self.assertEqual(delta_result, restored_result) + + # Restore from PAGE backup + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + backup_id=page_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + restored_result = node.table_checksum("t_heap") + self.assertEqual(page_result, restored_result) + + def test_merge_mixed_compression_zlib_lz4(self): + """ + Test MERGE with mixed compression: FULL(zlib) + DELTA(lz4) -> MERGE + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # FULL backup with ZLIB + node.safe_psql( + "postgres", + "create table t_heap as select i as id, md5(i::text) as text from generate_series(0,1000) i", + ) + + self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=zlib"], + ) + + # DELTA backup with LZ4 + node.safe_psql( + "postgres", + "insert into t_heap select i as id, md5(i::text) as text from generate_series(1000,2000) i", + ) + + node.safe_psql("postgres", "delete from t_heap where id < 500") + + node.safe_psql("postgres", "vacuum t_heap") + + final_result = node.table_checksum("t_heap") + + delta_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + if self.paranoia: + pgdata = self.pgdata_content(node.data_dir) + + # Merge + self.merge_backup(backup_dir, "node", delta_backup_id, options=["-j", "4"]) + + # Check merge result + show_backups = self.show_pb(backup_dir, "node") + self.assertEqual(len(show_backups), 1) + self.assertEqual(show_backups[0]["status"], "OK") + + # Restore and verify + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + restored_result = node.table_checksum("t_heap") + self.assertEqual(final_result, restored_result) + + if self.paranoia: + pgdata_restored = self.pgdata_content(node.data_dir) + self.compare_pgdata(pgdata, pgdata_restored) + + def test_lz4_incompressible_data(self): + """ + Test LZ4 compression with incompressible (random) data. + When compression doesn't help, pages should be stored uncompressed. + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # Create table with random (incompressible) data + # Using md5() with random() to generate pseudo-random data without pgcrypto extension + # Concatenating multiple md5 hashes creates ~500 bytes of poorly compressible data + node.safe_psql( + "postgres", + "create table t_random as select i as id, " + "decode(" + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text) || " + "md5(random()::text || i::text) || md5(random()::text || i::text), " + "'hex') as random_data " + "from generate_series(0,5000) i", + ) + + original_result = node.table_checksum("t_random") + + # Backup with LZ4 + backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Validate backup + self.validate_pb(backup_dir, "node", backup_id, options=["-j", "4"]) + + # Check backup is OK + backup_info = self.show_pb(backup_dir, "node", backup_id) + self.assertEqual(backup_info["status"], "OK") + + # Restore and verify data integrity + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + backup_id=backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + restored_result = node.table_checksum("t_random") + self.assertEqual(original_result, restored_result) + + def test_lz4_empty_and_sparse_tables(self): + """ + Test LZ4 compression with edge cases: empty tables, sparse data + """ + backup_dir = os.path.join(self.tmp_path, self.module_name, self.fname, "backup") + node = self.make_simple_node( + base_dir=os.path.join(self.module_name, self.fname, "node"), + set_replication=True, + initdb_params=["--data-checksums"], + ) + + self.init_pb(backup_dir) + self.add_instance(backup_dir, "node", node) + self.set_archiving(backup_dir, "node", node) + node.slow_start() + + # Create empty table + node.safe_psql("postgres", "create table t_empty (id int, data text)") + + # Create table with NULLs (sparse data) + node.safe_psql( + "postgres", + "create table t_sparse as select i as id, " + "case when i % 100 = 0 then md5(i::text) else null end as data " + "from generate_series(0,10000) i", + ) + + # Create table with repetitive data (highly compressible) + node.safe_psql( + "postgres", + "create table t_repetitive as select i as id, repeat('A', 1000) as data from generate_series(0,1000) i", + ) + + empty_result = node.execute("postgres", "select count(*) from t_empty")[0][0] + sparse_result = node.table_checksum("t_sparse") + repetitive_result = node.table_checksum("t_repetitive") + + # FULL backup with LZ4 + full_backup_id = self.backup_node( + backup_dir, + "node", + node, + backup_type="full", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Make changes and DELTA backup + node.safe_psql("postgres", "insert into t_empty values (1, 'test')") + node.safe_psql("postgres", "update t_sparse set data = 'updated' where id % 1000 = 0") + + self.backup_node( + backup_dir, + "node", + node, + backup_type="delta", + options=["--stream", "--compress-algorithm=lz4"], + ) + + # Validate + self.validate_pb(backup_dir, "node", options=["-j", "4"]) + + # Restore from FULL and verify + node.cleanup() + self.restore_node( + backup_dir, + "node", + node, + backup_id=full_backup_id, + options=["-j", "4", "--immediate", "--recovery-target-action=promote"], + ) + node.slow_start() + + self.assertEqual(empty_result, node.execute("postgres", "select count(*) from t_empty")[0][0]) + self.assertEqual(sparse_result, node.table_checksum("t_sparse")) + self.assertEqual(repetitive_result, node.table_checksum("t_repetitive"))