From 4db9ff922aa2e6f382c18e2f33136dabb9696cbb Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 16 Oct 2023 12:13:25 +0200 Subject: [PATCH 1/4] bottomless: add xz compression option Empirical testing shows, that gzip achieves mere x2 compression ratio even with very simple and repeatable data patterns. Since compression is very important for optimizing our egress traffic and throughput in general, .xz algorithm is hereby implemented as well. Ran with the same data set, it achieved ~x50 compression ratio, which is orders of magnitude better than gzip, at the cost of elevated CPU usage. Note: with more algos implemented, we should also consider adding code that detects which compression methods was used when restoring a snapshot, to allow restoring from a gzip file, but continue new snapshots with xz. Currently, setting the compression methods via the env var assumes that both restore and backup use the same algorithm. --- bottomless/Cargo.toml | 2 +- bottomless/src/backup.rs | 8 ++++++ bottomless/src/read.rs | 6 ++++- bottomless/src/replicator.rs | 52 +++++++++++++++++++++++++++++------- 4 files changed, 57 insertions(+), 11 deletions(-) diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 312eda77..31fac29d 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL" [dependencies] anyhow = "1.0.66" -async-compression = { version = "0.3.15", features = ["tokio", "gzip"] } +async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] } aws-config = { version = "0.55" } aws-sdk-s3 = { version = "0.28" } bytes = "1" diff --git a/bottomless/src/backup.rs b/bottomless/src/backup.rs index bb6e9081..1e72395b 100644 --- a/bottomless/src/backup.rs +++ b/bottomless/src/backup.rs @@ -116,6 +116,14 @@ impl WalCopier { wal.copy_frames(&mut gzip, len).await?; gzip.shutdown().await?; } + CompressionKind::Xz => { + let mut xz = async_compression::tokio::write::XzEncoder::with_quality( + &mut out, + async_compression::Level::Best, + ); + wal.copy_frames(&mut xz, len).await?; + xz.shutdown().await?; + } } if tracing::enabled!(tracing::Level::DEBUG) { let elapsed = Instant::now() - period_start; diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 1177f60b..f1837c19 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -1,7 +1,7 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; -use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::bufread::{GzipDecoder, XzEncoder}; use aws_sdk_s3::primitives::ByteStream; use std::io::ErrorKind; use std::pin::Pin; @@ -32,6 +32,10 @@ impl BatchReader { let gzip = GzipDecoder::new(reader); Box::pin(gzip) } + CompressionKind::Xz => { + let xz = XzEncoder::new(reader); + Box::pin(xz) + } }, } } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index f95bc973..557128a9 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp; use crate::wal::WalFileReader; use anyhow::{anyhow, bail}; use arc_swap::ArcSwapOption; -use async_compression::tokio::write::GzipEncoder; +use async_compression::tokio::write::{GzipEncoder, XzEncoder}; use aws_sdk_s3::config::{Credentials, Region}; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; @@ -653,7 +653,7 @@ impl Replicator { CompressionKind::None => Ok(ByteStream::from_path(db_path).await?), CompressionKind::Gzip => { let mut reader = File::open(db_path).await?; - let gzip_path = Self::db_gzip_path(db_path); + let gzip_path = Self::db_compressed_path(db_path, "gz"); let compressed_file = OpenOptions::new() .create(true) .write(true) @@ -671,13 +671,34 @@ impl Replicator { ); Ok(ByteStream::from_path(gzip_path).await?) } + CompressionKind::Xz => { + let mut reader = File::open(db_path).await?; + let xz_path = Self::db_compressed_path(db_path, "xz"); + let compressed_file = OpenOptions::new() + .create(true) + .write(true) + .read(true) + .truncate(true) + .open(&xz_path) + .await?; + let mut writer = + XzEncoder::with_quality(compressed_file, async_compression::Level::Best); + let size = tokio::io::copy(&mut reader, &mut writer).await?; + writer.shutdown().await?; + tracing::debug!( + "Compressed database file ({} bytes) into `{}`", + size, + xz_path.display() + ); + Ok(ByteStream::from_path(xz_path).await?) + } } } - fn db_gzip_path(db_path: &Path) -> PathBuf { - let mut gzip_path = db_path.to_path_buf(); - gzip_path.pop(); - gzip_path.join("db.gz") + fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf { + let mut compressed_path: PathBuf = db_path.to_path_buf(); + compressed_path.pop(); + compressed_path.join(format!("db.{suffix}")) } fn restore_db_path(&self) -> PathBuf { @@ -816,9 +837,10 @@ impl Replicator { let _ = snapshot_notifier.send(Ok(Some(generation))); let elapsed = Instant::now() - start; tracing::debug!("Snapshot upload finished (took {:?})", elapsed); - // cleanup gzip database snapshot if exists - let gzip_path = Self::db_gzip_path(&db_path); - let _ = tokio::fs::remove_file(gzip_path).await; + // cleanup gzip/xz database snapshot if exists + for suffix in &["gz", "xz"] { + let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await; + } }); let elapsed = Instant::now() - start_ts; tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed); @@ -1163,6 +1185,7 @@ impl Replicator { let main_db_path = match self.use_compression { CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), }; if let Ok(db_file) = self.get_object(main_db_path).send().await { @@ -1175,6 +1198,12 @@ impl Replicator { ); tokio::io::copy(&mut decompress_reader, db).await? } + CompressionKind::Xz => { + let mut decompress_reader = async_compression::tokio::bufread::XzDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } }; db.flush().await?; @@ -1235,6 +1264,7 @@ impl Replicator { Some(result) => result, None => { if !key.ends_with(".gz") + && !key.ends_with(".xz") && !key.ends_with(".db") && !key.ends_with(".meta") && !key.ends_with(".dep") @@ -1423,6 +1453,7 @@ impl Replicator { let str = fpath.to_str()?; if str.ends_with(".db") | str.ends_with(".gz") + | str.ends_with(".xz") | str.ends_with(".raw") | str.ends_with(".meta") | str.ends_with(".dep") @@ -1670,6 +1701,7 @@ pub enum CompressionKind { #[default] None, Gzip, + Xz, } impl CompressionKind { @@ -1677,6 +1709,7 @@ impl CompressionKind { match kind { "gz" | "gzip" => Ok(CompressionKind::Gzip), "raw" | "" => Ok(CompressionKind::None), + "xz" => Ok(CompressionKind::Xz), other => Err(other), } } @@ -1687,6 +1720,7 @@ impl std::fmt::Display for CompressionKind { match self { CompressionKind::None => write!(f, "raw"), CompressionKind::Gzip => write!(f, "gz"), + CompressionKind::Xz => write!(f, "xz"), } } } From 751231950204389232a8e372a73e3b74bc6b314f Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 16 Oct 2023 12:18:44 +0200 Subject: [PATCH 2/4] bottomless: increase the max batch size to 10000 The reasoning is as follows: 10000 uncompressed frames weigh 40MiB. Gzip is expected to create a ~20MiB file from them, while xz can compress it down to ~800KiB. The previous limit would make xz create a 50KiB file, which is less than the minimum 128KiB that S3-like services charge for when writing to an object store. --- bottomless/src/replicator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 557128a9..56f34494 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -171,7 +171,7 @@ impl Options { let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok(); let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok(); let max_frames_per_batch = - env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::()?; + env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::()?; let s3_upload_max_parallelism = env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::()?; let restore_transaction_page_swap_after = From 1fa6774d0d2a5ec5c9fed616ce9c4c50b7a43834 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 17 Oct 2023 13:15:10 +0200 Subject: [PATCH 3/4] bottomless: use default compression level for xz Best level seems to produce corrupted files. --- bottomless/src/backup.rs | 5 +---- bottomless/src/replicator.rs | 3 +-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/bottomless/src/backup.rs b/bottomless/src/backup.rs index 1e72395b..f3de86af 100644 --- a/bottomless/src/backup.rs +++ b/bottomless/src/backup.rs @@ -117,10 +117,7 @@ impl WalCopier { gzip.shutdown().await?; } CompressionKind::Xz => { - let mut xz = async_compression::tokio::write::XzEncoder::with_quality( - &mut out, - async_compression::Level::Best, - ); + let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out); wal.copy_frames(&mut xz, len).await?; xz.shutdown().await?; } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 56f34494..049e650b 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -681,8 +681,7 @@ impl Replicator { .truncate(true) .open(&xz_path) .await?; - let mut writer = - XzEncoder::with_quality(compressed_file, async_compression::Level::Best); + let mut writer = XzEncoder::new(compressed_file); let size = tokio::io::copy(&mut reader, &mut writer).await?; writer.shutdown().await?; tracing::debug!( From 5ee66f8d5e21c9c50f258ab02ba12a79fa2b1370 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 17 Oct 2023 14:02:19 +0200 Subject: [PATCH 4/4] bottomless: fall back to other compression algos on failure If the db snapshot is not found with given compression algo, other choices are checked too. This code will fire if somebody used to use Gzip, but then decided to restore a database that declares to use Xz for compressing bottomless. --- bottomless/src/replicator.rs | 74 +++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 049e650b..5ad70a23 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1181,38 +1181,58 @@ impl Replicator { } async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result { - let main_db_path = match self.use_compression { - CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), - CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), - CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), + let algos_to_try = match self.use_compression { + CompressionKind::None => &[ + CompressionKind::None, + CompressionKind::Xz, + CompressionKind::Gzip, + ], + CompressionKind::Gzip => &[ + CompressionKind::Gzip, + CompressionKind::Xz, + CompressionKind::None, + ], + CompressionKind::Xz => &[ + CompressionKind::Xz, + CompressionKind::Gzip, + CompressionKind::None, + ], }; - if let Ok(db_file) = self.get_object(main_db_path).send().await { - let mut body_reader = db_file.body.into_async_read(); - let db_size = match self.use_compression { - CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, - CompressionKind::Gzip => { - let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new( - tokio::io::BufReader::new(body_reader), - ); - tokio::io::copy(&mut decompress_reader, db).await? - } - CompressionKind::Xz => { - let mut decompress_reader = async_compression::tokio::bufread::XzDecoder::new( - tokio::io::BufReader::new(body_reader), - ); - tokio::io::copy(&mut decompress_reader, db).await? - } + for algo in algos_to_try { + let main_db_path = match algo { + CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation), + CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation), + CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation), }; - db.flush().await?; + if let Ok(db_file) = self.get_object(main_db_path).send().await { + let mut body_reader = db_file.body.into_async_read(); + let db_size = match algo { + CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?, + CompressionKind::Gzip => { + let mut decompress_reader = + async_compression::tokio::bufread::GzipDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + CompressionKind::Xz => { + let mut decompress_reader = + async_compression::tokio::bufread::XzDecoder::new( + tokio::io::BufReader::new(body_reader), + ); + tokio::io::copy(&mut decompress_reader, db).await? + } + }; + db.flush().await?; - let page_size = Self::read_page_size(db).await?; - self.set_page_size(page_size)?; - tracing::info!("Restored the main database file ({} bytes)", db_size); - Ok(true) - } else { - Ok(false) + let page_size = Self::read_page_size(db).await?; + self.set_page_size(page_size)?; + tracing::info!("Restored the main database file ({} bytes)", db_size); + return Ok(true); + } } + Ok(false) } async fn restore_wal(