From 7754d5027345079f969c25e9bdc1b54aa5412b51 Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Mon, 7 Aug 2023 21:22:28 +0000 Subject: [PATCH 01/12] bottomless: stream gzip snapshot --- bottomless/src/replicator.rs | 75 ++++++++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 8 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index c3e511db..c8d321e0 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -8,7 +8,8 @@ use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder; use aws_sdk_s3::operation::list_objects::ListObjectsOutput; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::primitives::{ByteStream, SdkBody}; +use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::{Client, Config}; use bytes::{Buf, Bytes, BytesMut}; use chrono::{DateTime, LocalResult, TimeZone, Utc}; @@ -628,19 +629,77 @@ impl Replicator { Self::read_change_counter(&mut reader).await? } CompressionKind::Gzip => { - // TODO: find a way to compress ByteStream on the fly instead of creating - // an intermediary file. - let (compressed_db_path, change_counter) = self.compress_main_db_file().await?; + let mut reader = tokio::fs::File::open(&self.db_path).await?; + + let stream = tokio::io::BufReader::new(reader.try_clone().await?); + let mut gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(stream); + let key = format!("{}-{}/db.gz", self.db_name, self.generation); + let upload_id = self + .client + .create_multipart_upload() + .bucket(&self.bucket) + .key(key.clone()) + .send() + .await? + .upload_id + .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; + + const CHUNK_SIZE: usize = 5 * 1024 * 1024; + let mut parts = Vec::new(); + // S3 takes an 1-based index + for part in 1..=10000 { + let mut buffer = bytes::BytesMut::with_capacity(CHUNK_SIZE); + loop { + let bytes_written = gzip_reader.read_buf(&mut buffer).await?; + // EOF or buffer is full + if bytes_written == 0 { + break; + } + } + + // EOF + if buffer.is_empty() { + break; + } + + let part_out = self + .client + .upload_part() + .bucket(&self.bucket) + .key(key.clone()) + .upload_id(upload_id.clone()) + .body(ByteStream::from(buffer.freeze())) + .part_number(part) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(part) + .e_tag( + part_out.e_tag.ok_or_else(|| { + anyhow::anyhow!("e_tag missing from part upload") + })?, + ) + .build(), + ); + } + self.client - .put_object() + .complete_multipart_upload() + .upload_id(upload_id) .bucket(&self.bucket) .key(key) - .body(ByteStream::from_path(compressed_db_path).await?) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(), + ) .send() .await?; - let _ = tokio::fs::remove_file(compressed_db_path).await; - change_counter + + Self::read_change_counter(&mut reader).await? } }; From fbeeef11236f066aef30350d369267c07accb5d8 Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Mon, 7 Aug 2023 23:22:12 +0000 Subject: [PATCH 02/12] send gzip in chunks of known size --- bottomless/src/replicator.rs | 226 +++++++++++++++++++++++------------ 1 file changed, 150 insertions(+), 76 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index c8d321e0..48b31811 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -8,7 +8,7 @@ use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder; use aws_sdk_s3::operation::list_objects::ListObjectsOutput; -use aws_sdk_s3::primitives::{ByteStream, SdkBody}; +use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::{Client, Config}; use bytes::{Buf, Bytes, BytesMut}; @@ -616,92 +616,166 @@ impl Replicator { } tracing::debug!("Snapshotting {}", self.db_path); let start = Instant::now(); - let change_counter = match self.use_compression { - CompressionKind::None => { - self.client - .put_object() - .bucket(&self.bucket) - .key(format!("{}-{}/db.db", self.db_name, self.generation)) - .body(ByteStream::from_path(&self.db_path).await?) - .send() - .await?; - let mut reader = tokio::fs::File::open(&self.db_path).await?; - Self::read_change_counter(&mut reader).await? - } - CompressionKind::Gzip => { - let mut reader = tokio::fs::File::open(&self.db_path).await?; - - let stream = tokio::io::BufReader::new(reader.try_clone().await?); - let mut gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(stream); - - let key = format!("{}-{}/db.gz", self.db_name, self.generation); - let upload_id = self - .client - .create_multipart_upload() - .bucket(&self.bucket) - .key(key.clone()) - .send() - .await? - .upload_id - .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; - - const CHUNK_SIZE: usize = 5 * 1024 * 1024; - let mut parts = Vec::new(); - // S3 takes an 1-based index - for part in 1..=10000 { - let mut buffer = bytes::BytesMut::with_capacity(CHUNK_SIZE); - loop { - let bytes_written = gzip_reader.read_buf(&mut buffer).await?; - // EOF or buffer is full - if bytes_written == 0 { + let change_counter = + match self.use_compression { + CompressionKind::None => { + self.client + .put_object() + .bucket(&self.bucket) + .key(format!("{}-{}/db.db", self.db_name, self.generation)) + .body(ByteStream::from_path(&self.db_path).await?) + .send() + .await?; + let mut reader = tokio::fs::File::open(&self.db_path).await?; + Self::read_change_counter(&mut reader).await? + } + CompressionKind::Gzip => { + let mut reader = tokio::fs::File::open(&self.db_path).await?; + let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?); + let mut gzip_reader = + async_compression::tokio::bufread::GzipEncoder::new(buf_reader); + + let key = format!("{}-{}/db.gz", self.db_name, self.generation); + + // Unfortunally we can send the gzip output in a single call without buffering + // the whole snapshot in memory because S3 requires the `Content-Length` header + // to be set. + let upload_id = self + .client + .create_multipart_upload() + .bucket(&self.bucket) + .key(key.clone()) + .send() + .await? + .upload_id + .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; + + let chunk_sizes = &[ + 5 * 1024 * 1024, + 10 * 1024 * 1024, + 25 * 1024 * 1024, + 50 * 1024 * 1024, + 100 * 1024 * 1024, + ]; + + const LAST_PART: i32 = 10_000; + let mut parts = Vec::new(); + let mut has_reached_eof = false; + + // S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to + // 5 GiB, except for the last one that has no limits. + // + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + for part in 0..LAST_PART - 1 { + // Progressively increase the chunk size every 16 chunks up to the last + // chunk_size. This allows smaller allocate for small databases. + // + // Here's a table of how much data we can chunk: + // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐ + // │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 25 MiB │ 16 │ 400 MiB │ 560 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 50 MiB │ 16 │ 800 MiB │ 1.172 GiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.387 GiB │ + // └────────────┴──────────────────┴───────────────────────┴──────────────────┘ + // + // We can send up to 971 GiB in chunks, which is more than enough for the + // majority of use cases. + // + // The last chunk is reserved for the remaining of the `gzip_reader` + let chunk_size = + chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)]; + + let mut buffer = bytes::BytesMut::with_capacity(chunk_size); + loop { + let bytes_written = gzip_reader.read_buf(&mut buffer).await?; + // EOF or buffer is full + if bytes_written == 0 { + break; + } + } + + // EOF + if buffer.is_empty() { + has_reached_eof = true; break; } + + let part_out = self + .client + .upload_part() + .bucket(&self.bucket) + .key(key.clone()) + .upload_id(upload_id.clone()) + .body(ByteStream::from(buffer.freeze())) + .part_number(part + 1) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(part + 1) + .e_tag(part_out.e_tag.ok_or_else(|| { + anyhow::anyhow!("e_tag missing from part upload") + })?) + .build(), + ); } - // EOF - if buffer.is_empty() { - break; + // If the gzip stream has not reached EOF we need to send the last part to S3. + // Since we don't know the size of the stream and we can't be sure if it fits in + // memory, we save it into a file to allow streaming. + // + // This would only happen to databases that are around ~1 TiB. + if !has_reached_eof { + let last_chunk_path = + format!("{}-{}/db.last-chunk.gz", self.db_name, self.generation); + let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?; + tokio::io::copy(&mut gzip_reader, &mut last_chunk_file).await?; + + let part_out = self + .client + .upload_part() + .bucket(&self.bucket) + .key(key.clone()) + .upload_id(upload_id.clone()) + .body(ByteStream::from_path(last_chunk_path).await?) + .part_number(LAST_PART) // last chunk + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(LAST_PART) + .e_tag(part_out.e_tag.ok_or_else(|| { + anyhow::anyhow!("e_tag missing from part upload") + })?) + .build(), + ); } - let part_out = self - .client - .upload_part() + self.client + .complete_multipart_upload() + .upload_id(upload_id) .bucket(&self.bucket) - .key(key.clone()) - .upload_id(upload_id.clone()) - .body(ByteStream::from(buffer.freeze())) - .part_number(part) + .key(key) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(), + ) .send() .await?; - parts.push( - CompletedPart::builder() - .part_number(part) - .e_tag( - part_out.e_tag.ok_or_else(|| { - anyhow::anyhow!("e_tag missing from part upload") - })?, - ) - .build(), - ); + Self::read_change_counter(&mut reader).await? } - - self.client - .complete_multipart_upload() - .upload_id(upload_id) - .bucket(&self.bucket) - .key(key) - .multipart_upload( - CompletedMultipartUpload::builder() - .set_parts(Some(parts)) - .build(), - ) - .send() - .await?; - - Self::read_change_counter(&mut reader).await? - } - }; + }; /* FIXME: we can't rely on the change counter in WAL mode: ** "In WAL mode, changes to the database are detected using the wal-index and From 3e11b5c6df3cbd3701f306516ccdca56d5e0146a Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Tue, 8 Aug 2023 00:29:57 +0000 Subject: [PATCH 03/12] delete last-chunk.gz --- bottomless/src/replicator.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 48b31811..8509e197 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -745,7 +745,7 @@ impl Replicator { .bucket(&self.bucket) .key(key.clone()) .upload_id(upload_id.clone()) - .body(ByteStream::from_path(last_chunk_path).await?) + .body(ByteStream::from_path(&last_chunk_path).await?) .part_number(LAST_PART) // last chunk .send() .await?; @@ -758,6 +758,8 @@ impl Replicator { })?) .build(), ); + + let _ = tokio::fs::remove_file(last_chunk_path).await; } self.client From ca9573861c92c29f2c454d9aa8389d50d801087b Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Tue, 8 Aug 2023 00:30:25 +0000 Subject: [PATCH 04/12] fix typos --- bottomless/src/replicator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 8509e197..61f284fc 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -637,7 +637,7 @@ impl Replicator { let key = format!("{}-{}/db.gz", self.db_name, self.generation); - // Unfortunally we can send the gzip output in a single call without buffering + // Unfortunally we can't send the gzip output in a single call without buffering // the whole snapshot in memory because S3 requires the `Content-Length` header // to be set. let upload_id = self @@ -668,7 +668,7 @@ impl Replicator { // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html for part in 0..LAST_PART - 1 { // Progressively increase the chunk size every 16 chunks up to the last - // chunk_size. This allows smaller allocate for small databases. + // chunk_size. This allows smaller allocations for small databases. // // Here's a table of how much data we can chunk: // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐ From 4ba8bc93ff2646acb7e2c0aeb1915df5baa9bc4a Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Tue, 8 Aug 2023 05:02:43 +0000 Subject: [PATCH 05/12] fix table values --- bottomless/src/replicator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 61f284fc..54b8d417 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -678,11 +678,11 @@ impl Replicator { // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 25 MiB │ 16 │ 400 MiB │ 560 MiB │ + // │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 50 MiB │ 16 │ 800 MiB │ 1.172 GiB │ + // │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │ // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.387 GiB │ + // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │ // └────────────┴──────────────────┴───────────────────────┴──────────────────┘ // // We can send up to 971 GiB in chunks, which is more than enough for the From 3c953d58ae02446aa6982558a38b8d23b3eb472a Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Tue, 8 Aug 2023 05:41:49 +0000 Subject: [PATCH 06/12] extract upload_s3_multipart --- bottomless/src/read.rs | 150 +++++++++++++++++++++++++++- bottomless/src/replicator.rs | 186 +++++------------------------------ 2 files changed, 174 insertions(+), 162 deletions(-) diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 1177f60b..3869d258 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -2,7 +2,10 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; use async_compression::tokio::bufread::GzipDecoder; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::{ + primitives::ByteStream, + types::{CompletedMultipartUpload, CompletedPart}, +}; use std::io::ErrorKind; use std::pin::Pin; use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; @@ -55,3 +58,148 @@ impl BatchReader { Ok(()) } } + +pub async fn upload_s3_multipart( + client: &aws_sdk_s3::Client, + key: &str, + bucket: &str, + mut reader: impl AsyncRead + Unpin, +) -> Result<()> { + let upload_id = client + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await? + .upload_id + .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; + + let chunk_sizes = &[ + 5 * 1024 * 1024, + 10 * 1024 * 1024, + 25 * 1024 * 1024, + 50 * 1024 * 1024, + 100 * 1024 * 1024, + ]; + + const LAST_PART: i32 = 10_000; + let mut parts = Vec::new(); + let mut has_reached_eof = false; + + // S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to + // 5 GiB, except for the last one that has no limits. + // + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + for part in 0..LAST_PART - 1 { + // Progressively increase the chunk size every 16 chunks up to the last + // chunk_size. This allows smaller allocations for small databases. + // + // Here's a table of how much data we can chunk: + // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐ + // │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │ + // └────────────┴──────────────────┴───────────────────────┴──────────────────┘ + // + // We can send up to 971 GiB in chunks, which is more than enough for the + // majority of use cases. + // + // The last chunk is reserved for the remaining of the `gzip_reader` + let chunk_size = chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)]; + + let mut buffer = bytes::BytesMut::with_capacity(chunk_size); + loop { + let bytes_written = reader.read_buf(&mut buffer).await?; + // EOF or buffer is full + if bytes_written == 0 { + break; + } + } + + // EOF + if buffer.is_empty() { + has_reached_eof = true; + break; + } + + let part_out = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id.clone()) + .body(ByteStream::from(buffer.freeze())) + .part_number(part + 1) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(part + 1) + .e_tag( + part_out + .e_tag + .ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?, + ) + .build(), + ); + } + + // If the gzip stream has not reached EOF we need to send the last part to S3. + // Since we don't know the size of the stream and we can't be sure if it fits in + // memory, we save it into a file to allow streaming. + // + // This would only happen to databases that are around ~1 TiB. + if !has_reached_eof { + let mut last_chunk_path = std::env::temp_dir(); + last_chunk_path.push(rand::random::().to_string()); + + let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?; + tokio::io::copy(&mut reader, &mut last_chunk_file).await?; + + let part_out = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id.clone()) + .body(ByteStream::from_path(&last_chunk_path).await?) + .part_number(LAST_PART) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(LAST_PART) + .e_tag( + part_out + .e_tag + .ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?, + ) + .build(), + ); + + let _ = tokio::fs::remove_file(last_chunk_path).await; + } + + client + .complete_multipart_upload() + .upload_id(upload_id) + .bucket(bucket) + .key(key) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(), + ) + .send() + .await?; + + Ok(()) +} diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 54b8d417..fba49223 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1,5 +1,5 @@ use crate::backup::WalCopier; -use crate::read::BatchReader; +use crate::read::{upload_s3_multipart, BatchReader}; use crate::transaction_cache::TransactionPageCache; use crate::wal::WalFileReader; use anyhow::anyhow; @@ -9,7 +9,6 @@ use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder; use aws_sdk_s3::operation::list_objects::builders::ListObjectsFluentBuilder; use aws_sdk_s3::operation::list_objects::ListObjectsOutput; use aws_sdk_s3::primitives::ByteStream; -use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart}; use aws_sdk_s3::{Client, Config}; use bytes::{Buf, Bytes, BytesMut}; use chrono::{DateTime, LocalResult, TimeZone, Utc}; @@ -616,168 +615,33 @@ impl Replicator { } tracing::debug!("Snapshotting {}", self.db_path); let start = Instant::now(); - let change_counter = - match self.use_compression { - CompressionKind::None => { - self.client - .put_object() - .bucket(&self.bucket) - .key(format!("{}-{}/db.db", self.db_name, self.generation)) - .body(ByteStream::from_path(&self.db_path).await?) - .send() - .await?; - let mut reader = tokio::fs::File::open(&self.db_path).await?; - Self::read_change_counter(&mut reader).await? - } - CompressionKind::Gzip => { - let mut reader = tokio::fs::File::open(&self.db_path).await?; - let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?); - let mut gzip_reader = - async_compression::tokio::bufread::GzipEncoder::new(buf_reader); - - let key = format!("{}-{}/db.gz", self.db_name, self.generation); - - // Unfortunally we can't send the gzip output in a single call without buffering - // the whole snapshot in memory because S3 requires the `Content-Length` header - // to be set. - let upload_id = self - .client - .create_multipart_upload() - .bucket(&self.bucket) - .key(key.clone()) - .send() - .await? - .upload_id - .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; - - let chunk_sizes = &[ - 5 * 1024 * 1024, - 10 * 1024 * 1024, - 25 * 1024 * 1024, - 50 * 1024 * 1024, - 100 * 1024 * 1024, - ]; - - const LAST_PART: i32 = 10_000; - let mut parts = Vec::new(); - let mut has_reached_eof = false; - - // S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to - // 5 GiB, except for the last one that has no limits. - // - // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html - for part in 0..LAST_PART - 1 { - // Progressively increase the chunk size every 16 chunks up to the last - // chunk_size. This allows smaller allocations for small databases. - // - // Here's a table of how much data we can chunk: - // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐ - // │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │ - // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │ - // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │ - // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │ - // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │ - // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ - // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │ - // └────────────┴──────────────────┴───────────────────────┴──────────────────┘ - // - // We can send up to 971 GiB in chunks, which is more than enough for the - // majority of use cases. - // - // The last chunk is reserved for the remaining of the `gzip_reader` - let chunk_size = - chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)]; - - let mut buffer = bytes::BytesMut::with_capacity(chunk_size); - loop { - let bytes_written = gzip_reader.read_buf(&mut buffer).await?; - // EOF or buffer is full - if bytes_written == 0 { - break; - } - } - - // EOF - if buffer.is_empty() { - has_reached_eof = true; - break; - } - - let part_out = self - .client - .upload_part() - .bucket(&self.bucket) - .key(key.clone()) - .upload_id(upload_id.clone()) - .body(ByteStream::from(buffer.freeze())) - .part_number(part + 1) - .send() - .await?; - - parts.push( - CompletedPart::builder() - .part_number(part + 1) - .e_tag(part_out.e_tag.ok_or_else(|| { - anyhow::anyhow!("e_tag missing from part upload") - })?) - .build(), - ); - } + let change_counter = match self.use_compression { + CompressionKind::None => { + self.client + .put_object() + .bucket(&self.bucket) + .key(format!("{}-{}/db.db", self.db_name, self.generation)) + .body(ByteStream::from_path(&self.db_path).await?) + .send() + .await?; + let mut reader = tokio::fs::File::open(&self.db_path).await?; + Self::read_change_counter(&mut reader).await? + } + CompressionKind::Gzip => { + let mut reader = tokio::fs::File::open(&self.db_path).await?; + let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?); + let gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(buf_reader); - // If the gzip stream has not reached EOF we need to send the last part to S3. - // Since we don't know the size of the stream and we can't be sure if it fits in - // memory, we save it into a file to allow streaming. - // - // This would only happen to databases that are around ~1 TiB. - if !has_reached_eof { - let last_chunk_path = - format!("{}-{}/db.last-chunk.gz", self.db_name, self.generation); - let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?; - tokio::io::copy(&mut gzip_reader, &mut last_chunk_file).await?; - - let part_out = self - .client - .upload_part() - .bucket(&self.bucket) - .key(key.clone()) - .upload_id(upload_id.clone()) - .body(ByteStream::from_path(&last_chunk_path).await?) - .part_number(LAST_PART) // last chunk - .send() - .await?; - - parts.push( - CompletedPart::builder() - .part_number(LAST_PART) - .e_tag(part_out.e_tag.ok_or_else(|| { - anyhow::anyhow!("e_tag missing from part upload") - })?) - .build(), - ); + let key = format!("{}-{}/db.gz", self.db_name, self.generation); - let _ = tokio::fs::remove_file(last_chunk_path).await; - } + // Unfortunally we can't send the gzip output in a single call without buffering + // the whole snapshot in memory because S3 requires the `Content-Length` header + // to be set. + upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?; - self.client - .complete_multipart_upload() - .upload_id(upload_id) - .bucket(&self.bucket) - .key(key) - .multipart_upload( - CompletedMultipartUpload::builder() - .set_parts(Some(parts)) - .build(), - ) - .send() - .await?; - - Self::read_change_counter(&mut reader).await? - } - }; + Self::read_change_counter(&mut reader).await? + } + }; /* FIXME: we can't rely on the change counter in WAL mode: ** "In WAL mode, changes to the database are detected using the wal-index and From 2f35f9fc9833c66769e07714a4b39fd82b6fcd8d Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Wed, 9 Aug 2023 13:39:11 +0000 Subject: [PATCH 07/12] deduplicate read_change_counter --- bottomless/src/replicator.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index fba49223..cd47ece4 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -614,8 +614,11 @@ impl Replicator { return Ok(false); } tracing::debug!("Snapshotting {}", self.db_path); + let start = Instant::now(); - let change_counter = match self.use_compression { + + let mut reader = tokio::fs::File::open(&self.db_path).await?; + match self.use_compression { CompressionKind::None => { self.client .put_object() @@ -624,11 +627,8 @@ impl Replicator { .body(ByteStream::from_path(&self.db_path).await?) .send() .await?; - let mut reader = tokio::fs::File::open(&self.db_path).await?; - Self::read_change_counter(&mut reader).await? } CompressionKind::Gzip => { - let mut reader = tokio::fs::File::open(&self.db_path).await?; let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?); let gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(buf_reader); @@ -638,8 +638,6 @@ impl Replicator { // the whole snapshot in memory because S3 requires the `Content-Length` header // to be set. upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?; - - Self::read_change_counter(&mut reader).await? } }; @@ -649,6 +647,7 @@ impl Replicator { ** incremented on each transaction in WAL mode." ** Instead, we need to consult WAL checksums. */ + let change_counter = Self::read_change_counter(&mut reader).await?; let change_counter_key = format!("{}-{}/.changecounter", self.db_name, self.generation); self.client .put_object() From 270f5e562613477396766a688eb485bf8fb7555f Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Wed, 9 Aug 2023 22:33:18 +0000 Subject: [PATCH 08/12] use tempfile to write last chunk --- Cargo.lock | 1 + bottomless/Cargo.toml | 1 + bottomless/src/read.rs | 12 +++++------- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 48bf5bde..511d8bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "futures", "rand", "sqld-libsql-bindings", + "tempfile", "tokio", "tokio-util", "tracing", diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 312eda77..cbe30ffd 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -25,6 +25,7 @@ arc-swap = "1.6" chrono = "0.4.23" uuid = "1.4.1" rand = "0.8.5" +tempfile = "3.3.0" [features] libsql_linked_statically = [] diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 3869d258..917cf399 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -158,18 +158,18 @@ pub async fn upload_s3_multipart( // // This would only happen to databases that are around ~1 TiB. if !has_reached_eof { - let mut last_chunk_path = std::env::temp_dir(); - last_chunk_path.push(rand::random::().to_string()); + let last_chunk_file = tempfile::NamedTempFile::new()?; + let mut last_chunk_tokio_file = + tokio::fs::File::from_std(last_chunk_file.as_file().try_clone()?); - let mut last_chunk_file = tokio::fs::File::create(&last_chunk_path).await?; - tokio::io::copy(&mut reader, &mut last_chunk_file).await?; + tokio::io::copy(&mut reader, &mut last_chunk_tokio_file).await?; let part_out = client .upload_part() .bucket(bucket) .key(key) .upload_id(upload_id.clone()) - .body(ByteStream::from_path(&last_chunk_path).await?) + .body(ByteStream::from_path(last_chunk_file.path()).await?) .part_number(LAST_PART) .send() .await?; @@ -184,8 +184,6 @@ pub async fn upload_s3_multipart( ) .build(), ); - - let _ = tokio::fs::remove_file(last_chunk_path).await; } client From a9dfc280092b6fcecf3ae4a5aae264f44d7863ab Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Wed, 9 Aug 2023 22:56:14 +0000 Subject: [PATCH 09/12] abort multipart form in the presence of errors --- bottomless/src/read.rs | 57 +++++++++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 917cf399..980b9367 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -63,7 +63,7 @@ pub async fn upload_s3_multipart( client: &aws_sdk_s3::Client, key: &str, bucket: &str, - mut reader: impl AsyncRead + Unpin, + reader: impl AsyncRead + Send + Unpin, ) -> Result<()> { let upload_id = client .create_multipart_upload() @@ -74,6 +74,46 @@ pub async fn upload_s3_multipart( .upload_id .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; + let parts = upload_s3_parts(client, upload_id.as_str(), key, bucket, reader).await; + + match parts { + Ok(parts) => { + client + .complete_multipart_upload() + .upload_id(upload_id) + .bucket(bucket) + .key(key) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(), + ) + .send() + .await?; + + Ok(()) + } + Err(err) => { + client + .abort_multipart_upload() + .upload_id(upload_id) + .bucket(bucket) + .key(key) + .send() + .await?; + + Err(err) + } + } +} + +async fn upload_s3_parts( + client: &aws_sdk_s3::Client, + upload_id: &str, + key: &str, + bucket: &str, + mut reader: impl AsyncRead + Unpin, +) -> Result> { let chunk_sizes = &[ 5 * 1024 * 1024, 10 * 1024 * 1024, @@ -186,18 +226,5 @@ pub async fn upload_s3_multipart( ); } - client - .complete_multipart_upload() - .upload_id(upload_id) - .bucket(bucket) - .key(key) - .multipart_upload( - CompletedMultipartUpload::builder() - .set_parts(Some(parts)) - .build(), - ) - .send() - .await?; - - Ok(()) + Ok(parts) } From 109b3f68d2059dc4eff3ceaafedb73719eea33d9 Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Wed, 9 Aug 2023 23:04:16 +0000 Subject: [PATCH 10/12] remove Send bound on reader --- bottomless/src/read.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 980b9367..73c9fdc7 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -63,7 +63,7 @@ pub async fn upload_s3_multipart( client: &aws_sdk_s3::Client, key: &str, bucket: &str, - reader: impl AsyncRead + Send + Unpin, + reader: impl AsyncRead + Unpin, ) -> Result<()> { let upload_id = client .create_multipart_upload() From fedc6ac0c3452426cb184b7c8d2f0a1d06a1f6f2 Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Wed, 9 Aug 2023 23:27:24 +0000 Subject: [PATCH 11/12] remove unnecessary clones --- bottomless/src/read.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 73c9fdc7..ca8e3b0e 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -174,7 +174,7 @@ async fn upload_s3_parts( .upload_part() .bucket(bucket) .key(key) - .upload_id(upload_id.clone()) + .upload_id(upload_id) .body(ByteStream::from(buffer.freeze())) .part_number(part + 1) .send() @@ -208,7 +208,7 @@ async fn upload_s3_parts( .upload_part() .bucket(bucket) .key(key) - .upload_id(upload_id.clone()) + .upload_id(upload_id) .body(ByteStream::from_path(last_chunk_file.path()).await?) .part_number(LAST_PART) .send() From 99db5823161258d2c4c6684e8843fab7542d3cbf Mon Sep 17 00:00:00 2001 From: Guilherme Neubaner Date: Thu, 24 Aug 2023 16:01:10 +0000 Subject: [PATCH 12/12] clarify why multipart upload is required --- bottomless/src/replicator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index cd47ece4..906a3a08 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -634,9 +634,9 @@ impl Replicator { let key = format!("{}-{}/db.gz", self.db_name, self.generation); - // Unfortunally we can't send the gzip output in a single call without buffering - // the whole snapshot in memory because S3 requires the `Content-Length` header - // to be set. + // Since it's not possible to know the exact size of a gzip stream and the + // PutObject operation requires the Content-Length header to be set, we need to + // send the content in chunks of known size. upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?; } };