diff --git a/Cargo.lock b/Cargo.lock index 48bf5bde..511d8bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,7 @@ dependencies = [ "futures", "rand", "sqld-libsql-bindings", + "tempfile", "tokio", "tokio-util", "tracing", diff --git a/bottomless/Cargo.toml b/bottomless/Cargo.toml index 312eda77..cbe30ffd 100644 --- a/bottomless/Cargo.toml +++ b/bottomless/Cargo.toml @@ -25,6 +25,7 @@ arc-swap = "1.6" chrono = "0.4.23" uuid = "1.4.1" rand = "0.8.5" +tempfile = "3.3.0" [features] libsql_linked_statically = [] diff --git a/bottomless/src/read.rs b/bottomless/src/read.rs index 1177f60b..ca8e3b0e 100644 --- a/bottomless/src/read.rs +++ b/bottomless/src/read.rs @@ -2,7 +2,10 @@ use crate::replicator::CompressionKind; use crate::wal::WalFrameHeader; use anyhow::Result; use async_compression::tokio::bufread::GzipDecoder; -use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::{ + primitives::ByteStream, + types::{CompletedMultipartUpload, CompletedPart}, +}; use std::io::ErrorKind; use std::pin::Pin; use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; @@ -55,3 +58,173 @@ impl BatchReader { Ok(()) } } + +pub async fn upload_s3_multipart( + client: &aws_sdk_s3::Client, + key: &str, + bucket: &str, + reader: impl AsyncRead + Unpin, +) -> Result<()> { + let upload_id = client + .create_multipart_upload() + .bucket(bucket) + .key(key) + .send() + .await? + .upload_id + .ok_or_else(|| anyhow::anyhow!("missing upload_id"))?; + + let parts = upload_s3_parts(client, upload_id.as_str(), key, bucket, reader).await; + + match parts { + Ok(parts) => { + client + .complete_multipart_upload() + .upload_id(upload_id) + .bucket(bucket) + .key(key) + .multipart_upload( + CompletedMultipartUpload::builder() + .set_parts(Some(parts)) + .build(), + ) + .send() + .await?; + + Ok(()) + } + Err(err) => { + client + .abort_multipart_upload() + .upload_id(upload_id) + .bucket(bucket) + .key(key) + .send() + .await?; + + Err(err) + } + } +} + +async fn upload_s3_parts( + client: &aws_sdk_s3::Client, + upload_id: &str, + key: &str, + bucket: &str, + mut reader: impl AsyncRead + Unpin, +) -> Result> { + let chunk_sizes = &[ + 5 * 1024 * 1024, + 10 * 1024 * 1024, + 25 * 1024 * 1024, + 50 * 1024 * 1024, + 100 * 1024 * 1024, + ]; + + const LAST_PART: i32 = 10_000; + let mut parts = Vec::new(); + let mut has_reached_eof = false; + + // S3 allows a maximum of 10_000 parts and each part can size from 5 MiB to + // 5 GiB, except for the last one that has no limits. + // + // See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + for part in 0..LAST_PART - 1 { + // Progressively increase the chunk size every 16 chunks up to the last + // chunk_size. This allows smaller allocations for small databases. + // + // Here's a table of how much data we can chunk: + // ┌────────────┬──────────────────┬───────────────────────┬──────────────────┐ + // │ Chunk size │ Number of chunks │ Amount for chunk size │ Cumulative total │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 5 MiB │ 16 │ 80 MiB │ 80 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 10 MiB │ 16 │ 160 MiB │ 240 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 25 MiB │ 16 │ 400 MiB │ 640 MiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 50 MiB │ 16 │ 800 MiB │ 1.406 GiB │ + // ├────────────┼──────────────────┼───────────────────────┼──────────────────┤ + // │ 100 MiB │ 9935 │ 970.215 GiB │ 971.621 GiB │ + // └────────────┴──────────────────┴───────────────────────┴──────────────────┘ + // + // We can send up to 971 GiB in chunks, which is more than enough for the + // majority of use cases. + // + // The last chunk is reserved for the remaining of the `gzip_reader` + let chunk_size = chunk_sizes[((part / 16) as usize).min(chunk_sizes.len() - 1)]; + + let mut buffer = bytes::BytesMut::with_capacity(chunk_size); + loop { + let bytes_written = reader.read_buf(&mut buffer).await?; + // EOF or buffer is full + if bytes_written == 0 { + break; + } + } + + // EOF + if buffer.is_empty() { + has_reached_eof = true; + break; + } + + let part_out = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .body(ByteStream::from(buffer.freeze())) + .part_number(part + 1) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(part + 1) + .e_tag( + part_out + .e_tag + .ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?, + ) + .build(), + ); + } + + // If the gzip stream has not reached EOF we need to send the last part to S3. + // Since we don't know the size of the stream and we can't be sure if it fits in + // memory, we save it into a file to allow streaming. + // + // This would only happen to databases that are around ~1 TiB. + if !has_reached_eof { + let last_chunk_file = tempfile::NamedTempFile::new()?; + let mut last_chunk_tokio_file = + tokio::fs::File::from_std(last_chunk_file.as_file().try_clone()?); + + tokio::io::copy(&mut reader, &mut last_chunk_tokio_file).await?; + + let part_out = client + .upload_part() + .bucket(bucket) + .key(key) + .upload_id(upload_id) + .body(ByteStream::from_path(last_chunk_file.path()).await?) + .part_number(LAST_PART) + .send() + .await?; + + parts.push( + CompletedPart::builder() + .part_number(LAST_PART) + .e_tag( + part_out + .e_tag + .ok_or_else(|| anyhow::anyhow!("e_tag missing from part upload"))?, + ) + .build(), + ); + } + + Ok(parts) +} diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index c3e511db..906a3a08 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1,5 +1,5 @@ use crate::backup::WalCopier; -use crate::read::BatchReader; +use crate::read::{upload_s3_multipart, BatchReader}; use crate::transaction_cache::TransactionPageCache; use crate::wal::WalFileReader; use anyhow::anyhow; @@ -614,8 +614,11 @@ impl Replicator { return Ok(false); } tracing::debug!("Snapshotting {}", self.db_path); + let start = Instant::now(); - let change_counter = match self.use_compression { + + let mut reader = tokio::fs::File::open(&self.db_path).await?; + match self.use_compression { CompressionKind::None => { self.client .put_object() @@ -624,23 +627,17 @@ impl Replicator { .body(ByteStream::from_path(&self.db_path).await?) .send() .await?; - let mut reader = tokio::fs::File::open(&self.db_path).await?; - Self::read_change_counter(&mut reader).await? } CompressionKind::Gzip => { - // TODO: find a way to compress ByteStream on the fly instead of creating - // an intermediary file. - let (compressed_db_path, change_counter) = self.compress_main_db_file().await?; + let buf_reader = tokio::io::BufReader::new(reader.try_clone().await?); + let gzip_reader = async_compression::tokio::bufread::GzipEncoder::new(buf_reader); + let key = format!("{}-{}/db.gz", self.db_name, self.generation); - self.client - .put_object() - .bucket(&self.bucket) - .key(key) - .body(ByteStream::from_path(compressed_db_path).await?) - .send() - .await?; - let _ = tokio::fs::remove_file(compressed_db_path).await; - change_counter + + // Since it's not possible to know the exact size of a gzip stream and the + // PutObject operation requires the Content-Length header to be set, we need to + // send the content in chunks of known size. + upload_s3_multipart(&self.client, &key, &self.bucket, gzip_reader).await?; } }; @@ -650,6 +647,7 @@ impl Replicator { ** incremented on each transaction in WAL mode." ** Instead, we need to consult WAL checksums. */ + let change_counter = Self::read_change_counter(&mut reader).await?; let change_counter_key = format!("{}-{}/.changecounter", self.db_name, self.generation); self.client .put_object()