From 38520a4eb1a775fa18fd51de1d46073133ffcabd Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 20 Oct 2025 12:48:42 +0200 Subject: [PATCH 01/10] commitlog,durability: Support preallocation of disk space When a new commitlog segment is created, allocate disk space for it up to the maximum segment size. Also do this when resuming writes to an existing segment, such that segments created without preallocation will allocate as well when the database is opened. Preallocation is gated behind the feature "fallocate", because it is not always desirable to preallocate, e.g. for local `standalone` users. The feature can only be enabled on Linux targets, because allocation is done using the Linux-specific `fallocate(2)` system call. Unlike `ftruncate(2)` or the portable `posix_fallocate(3)`, `fallocate(2)` supports allocating disk space without zeroing. This is currently required, because the commitlog format does not handle padding bytes. The commitlog refuses writes if not enough space can be allocated. For commitlogs that were created without preallocation, this means that the commitlog cannot even be opened. The local durability impl will crash if it detects that the commitlog is unable to allocate enough space. This means that a database will eventually crash and unable to start in an out-of-space situation. Allocated space is not included in the reported size of the commitlog. Instead, allocated blocks are reported separately. --- Cargo.lock | 3 + crates/commitlog/Cargo.toml | 4 + crates/commitlog/src/commitlog.rs | 10 +- crates/commitlog/src/lib.rs | 3 +- crates/commitlog/src/repo/fs.rs | 75 +++- crates/commitlog/src/repo/mem.rs | 410 ++++++++++++++---- crates/commitlog/src/repo/mod.rs | 17 +- crates/commitlog/src/segment.rs | 82 +++- crates/commitlog/src/stream/writer.rs | 16 +- crates/commitlog/src/tests/bitflip.rs | 7 +- crates/commitlog/src/tests/helpers.rs | 2 +- crates/commitlog/src/tests/partial.rs | 22 +- crates/commitlog/tests/random_payload/mod.rs | 14 +- crates/core/src/db/persistence.rs | 5 +- crates/core/src/db/relational_db.rs | 6 +- crates/core/src/host/host_controller.rs | 14 +- crates/core/src/replica_context.rs | 10 +- .../subscription/module_subscription_actor.rs | 4 +- crates/datastore/src/db_metrics/mod.rs | 5 + crates/durability/Cargo.toml | 7 + crates/durability/src/imp/local.rs | 29 +- crates/durability/tests/io/fallocate.rs | 271 ++++++++++++ crates/durability/tests/io/mod.rs | 2 + crates/durability/tests/main.rs | 1 + crates/snapshot/tests/remote.rs | 2 +- 25 files changed, 861 insertions(+), 160 deletions(-) create mode 100644 crates/durability/tests/io/fallocate.rs create mode 100644 crates/durability/tests/io/mod.rs create mode 100644 crates/durability/tests/main.rs diff --git a/Cargo.lock b/Cargo.lock index 01ab9aebeee..4ee261ae1f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5947,6 +5947,7 @@ dependencies = [ "itertools 0.12.1", "log", "memmap2", + "nix 0.30.1", "once_cell", "pretty_assertions", "proptest", @@ -6144,9 +6145,11 @@ dependencies = [ "anyhow", "itertools 0.12.1", "log", + "scopeguard", "spacetimedb-commitlog", "spacetimedb-paths", "spacetimedb-sats 1.6.0", + "tempfile", "thiserror 1.0.69", "tokio", "tracing", diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index 09c8f359c94..1f538453311 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -9,9 +9,12 @@ description = "Implementation of the SpacetimeDB commitlog." [features] default = ["serde"] +# Enable streaming reads + writes streaming = ["dep:async-stream", "dep:bytes", "dep:futures", "dep:tokio", "dep:tokio-util"] # Enable types + impls useful for testing test = ["dep:env_logger"] +# Enable `fallocate` of segments +fallocate = ["dep:nix"] [dependencies] async-stream = { workspace = true, optional = true } @@ -22,6 +25,7 @@ futures = { workspace = true, optional = true } itertools.workspace = true log.workspace = true memmap2 = "0.9.4" +nix = { workspace = true, optional = true, features = ["fs"] } serde = { workspace = true, optional = true } spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index c4ab243766d..65e9c609017 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -909,7 +909,7 @@ mod tests { use super::*; use crate::{ payload::{ArrayDecodeError, ArrayDecoder}, - tests::helpers::{fill_log, mem_log}, + tests::helpers::{enable_logging, fill_log, mem_log}, }; #[test] @@ -1143,6 +1143,8 @@ mod tests { #[test] fn reset_to_offset() { + enable_logging(); + let mut log = mem_log::<[u8; 32]>(128); let total_txs = fill_log(&mut log, 50, repeat(1)) as u64; @@ -1225,7 +1227,7 @@ mod tests { #[test] fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); assert_eq!(committed, None); @@ -1233,7 +1235,7 @@ mod tests { #[test] fn set_new_epoch_commits() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); log.append(<_>::default()).unwrap(); let committed = log @@ -1246,7 +1248,7 @@ mod tests { #[test] fn set_lower_epoch_returns_error() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 784ced88dcd..52d7576813d 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -20,6 +20,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, + repo::fs::SizeOnDisk, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -356,7 +357,7 @@ impl Commitlog { } /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { + pub fn size_on_disk(&self) -> io::Result { let inner = self.inner.read().unwrap(); inner.repo.size_on_disk() } diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 3e45a1f2a5c..546fa8095bc 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -1,5 +1,6 @@ use std::fs::{self, File}; use std::io; +use std::os::unix::fs::MetadataExt; use std::sync::Arc; use log::{debug, warn}; @@ -20,12 +21,52 @@ const SEGMENT_FILE_EXT: &str = ".stdb.log"; // Experiment: // // - O_DIRECT | O_DSYNC -// - preallocation of disk space // - io_uring // pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static; +/// Size on disk of a [Fs] repo. +/// +/// Created by [Fs::size_on_disk]. +#[derive(Clone, Copy, Default)] +pub struct SizeOnDisk { + /// The total size in bytes of all segments and offset indexes in the repo. + pub total_bytes: u64, + /// The total number of 512-bytes blocks allocated by all segments and + /// offset indexes in the repo. + /// + /// Only available on unix platforms. + /// + /// For other platforms, the number computed from the number of 4096-bytes + /// pages that would be needed to store `total_bytes`. This may or may not + /// reflect that actual storage allocation. + /// + /// The number of allocated blocks is typically larger than the number of + /// actually written bytes. + /// + /// When the `fallocate` feature is enabled, the number can diverge + /// substantially. Use `total_blocks` in this case to monitor disk space. + pub total_blocks: u64, +} + +impl SizeOnDisk { + fn add(&mut self, stat: std::fs::Metadata) { + self.total_bytes += stat.size(); + #[cfg(unix)] + { + self.total_blocks += std::os::unix::fs::MetadataExt::blocks(&stat); + } + #[cfg(not(unix))] + { + let imaginary_blocks = (self.total_bytes > 0) + .then(|| 8 * self.total_bytes.div_ceil(4096)) + .unwrap_or_default(); + self.total_blocks = imaginary_blocks; + } + } +} + /// A commitlog repository [`Repo`] which stores commits in ordinary files on /// disk. #[derive(Clone)] @@ -61,18 +102,31 @@ impl Fs { self.root.segment(offset) } - /// Determine the size on disk as the sum of the sizes of all segments. + /// Determine the size on disk as the sum of the sizes of all segments, as + /// well as offset indexes. /// /// Note that the actively written-to segment (if any) is included. - pub fn size_on_disk(&self) -> io::Result { - let mut sz = 0; + pub fn size_on_disk(&self) -> io::Result { + let mut size = SizeOnDisk::default(); + for offset in self.existing_offsets()? { - sz += self.segment_path(offset).metadata()?.len(); - // Add the size of the offset index file if present - sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0); + let segment = self.segment_path(offset); + let stat = segment.metadata()?; + size.add(stat); + + // Add the size of the offset index file if present. + let index = self.root.index(offset); + let Some(stat) = index.metadata().map(Some).or_else(|e| match e.kind() { + io::ErrorKind::NotFound => Ok(None), + _ => Err(e), + })? + else { + continue; + }; + size.add(stat); } - Ok(sz) + Ok(size) } } @@ -86,6 +140,10 @@ impl FileLike for NamedTempFile { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { self.as_file_mut().ftruncate(tx_offset, size) } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.as_file_mut().fallocate(size) + } } impl Repo for Fs { @@ -149,6 +207,7 @@ impl Repo for Fs { let max_frame_size = 0x1000; compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; dst.persist(self.segment_path(offset))?; + Ok(()) } diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index 9a301e750b8..54c52e1af71 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -1,17 +1,50 @@ use std::{ collections::{btree_map, BTreeMap}, io, - sync::{Arc, RwLock, RwLockWriteGuard}, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, RwLock, RwLockWriteGuard, + }, }; -use crate::segment::FileLike; - use super::Repo; +use crate::{repo::SegmentLen, segment::FileLike}; type SharedLock = Arc>; -type SharedBytes = SharedLock>; +type SharedPages = SharedLock>; + +const PAGE_SIZE: usize = 4096; + +#[derive(Debug)] +pub struct Page { + filled: usize, + buf: [u8; PAGE_SIZE], +} + +impl Page { + pub fn remaining(&self) -> usize { + PAGE_SIZE - self.filled + } +} -/// A log segment backed by a `Vec`. +impl Default for Page { + fn default() -> Self { + Self { + filled: 0, + buf: [0; PAGE_SIZE], + } + } +} + +/// The total capacity of the imaginary storage device. +/// +/// [Segment]s are allocated from [Memory], which tracks the total space it +/// has available. [SpaceOnDevice] is shared by each [Segment]. When a [Segment] +/// allocates a [Page], it deducts the page's size from the space, returning +/// an error if [SpaceOnDevice] goes below zero. +pub type SpaceOnDevice = Arc; + +/// A log segment backed by a [Vec]. /// /// Writing to the segment behaves like a file opened with `O_APPEND`: /// [`io::Write::write`] always appends to the segment, regardless of the @@ -21,61 +54,90 @@ type SharedBytes = SharedLock>; /// Note that this is not a faithful model of a file, as safe Rust requires to /// protect the buffer with a lock. This means that pathological situations /// arising from concurrent read/write access of a file are impossible to occur. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct Segment { pos: u64, - buf: SharedBytes, + pages: SharedPages, + device: SpaceOnDevice, } impl Segment { + pub fn new(device: SpaceOnDevice) -> Self { + Self::with_pages(device, <_>::default()) + } + + pub fn with_pages(device: SpaceOnDevice, pages: SharedPages) -> Self { + Self { pos: 0, pages, device } + } + pub fn len(&self) -> usize { - self.buf.read().unwrap().len() + self.pages + .read() + .unwrap() + .iter() + .fold(0, |size, page| size + page.filled) } pub fn is_empty(&self) -> bool { self.len() == 0 } - /// Obtain mutable access to the underlying buffer. - /// - /// This is intended for tests which deliberately corrupt the segment data. - pub fn buf_mut(&self) -> RwLockWriteGuard<'_, Vec> { - self.buf.write().unwrap() - } -} + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + let mut pages = self.pages.write().unwrap(); -impl From for Segment { - fn from(buf: SharedBytes) -> Self { - Self { pos: 0, buf } + let page_idx = pos / PAGE_SIZE; + let page = pages.get_mut(page_idx).expect("pos out of bounds"); + let page_ofs = pos % PAGE_SIZE; + page.buf[page_ofs] = f(page.buf[page_ofs]); } -} -impl super::SegmentLen for Segment { - fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) - } -} + fn allocate(&self, pages: &mut RwLockWriteGuard<'_, Vec>, n: usize) -> io::Result<()> { + let mut allocated = 0; + pages.resize_with(n, || { + allocated += 1; + Page::default() + }); + if self.device.fetch_sub((allocated * PAGE_SIZE) as i64, Ordering::Relaxed) <= 0 { + return Err(io::Error::new(io::ErrorKind::StorageFull, "no space left on device")); + } -impl FileLike for Segment { - fn fsync(&mut self) -> io::Result<()> { Ok(()) } +} - fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { - let mut inner = self.buf.write().unwrap(); - inner.resize(size as usize, 0); - // NOTE: As per `ftruncate(2)`, the offset is not changed. - Ok(()) +impl SegmentLen for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(self.len() as u64) } } impl io::Write for Segment { fn write(&mut self, buf: &[u8]) -> io::Result { - let mut inner = self.buf.write().unwrap(); - inner.extend(buf); - self.pos += buf.len() as u64; + let mut written = 0; + while written < buf.len() { + let mut pages = self.pages.write().unwrap(); + let page = { + let page_idx = self.pos as usize / PAGE_SIZE; + if page_idx >= pages.len() { + self.allocate(&mut pages, page_idx + 1)?; + } + &mut pages[page_idx] + }; + let remaining = buf.len() - written; + let to_copy = page.remaining().min(remaining); + + let range_in_page = page.filled..page.filled + to_copy; + let range_in_buf = written..written + to_copy; - Ok(buf.len()) + page.buf[range_in_page].copy_from_slice(&buf[range_in_buf]); + page.filled += to_copy; + drop(pages); + + written += to_copy; + self.pos += to_copy as u64; + } + + Ok(written) } fn flush(&mut self) -> io::Result<()> { @@ -85,16 +147,26 @@ impl io::Write for Segment { impl io::Read for Segment { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let inner = self.buf.read().unwrap(); - let pos = self.pos as usize; - if pos > inner.len() { - // Bad file descriptor - return Err(io::Error::from_raw_os_error(9)); + let mut read = 0; + while read < buf.len() { + let pages = self.pages.read().unwrap(); + let Some(page) = pages.get(self.pos as usize / PAGE_SIZE) else { + break; + }; + let offset_in_page = (self.pos % PAGE_SIZE as u64) as usize; + if offset_in_page >= page.filled { + break; + } + let available_in_page = page.filled - offset_in_page; + let to_copy = (buf.len() - read).min(available_in_page); + + buf[read..read + to_copy].copy_from_slice(&page.buf[offset_in_page..offset_in_page + to_copy]); + + read += to_copy; + self.pos += to_copy as u64; } - let n = io::Read::read(&mut &inner[pos..], buf)?; - self.pos += n as u64; - Ok(n) + Ok(read) } } @@ -121,12 +193,61 @@ impl io::Seek for Segment { } } +impl FileLike for Segment { + fn fsync(&mut self) -> io::Result<()> { + Ok(()) + } + + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { + use std::cmp::Ordering::*; + + let mut pages = self.pages.write().unwrap(); + let old_page_count = pages.len() as u64; + let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; + + let zero_tail = |maybe_last_page: Option<&mut Page>| { + if let Some(last_page) = maybe_last_page { + let tail_start = (size as usize) % PAGE_SIZE; + last_page.filled = tail_start; + last_page.buf[tail_start..].fill(0); + } + }; + match new_page_count.cmp(&old_page_count) { + Greater => self.allocate(&mut pages, new_page_count as usize)?, + ordering => { + if matches!(ordering, Less) { + pages.truncate(new_page_count as usize); + } + zero_tail(pages.last_mut()); + } + }; + + if self.pos > size { + self.pos = size; + } + + Ok(()) + } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + let mut pages = self.pages.write().unwrap(); + let old_page_count = pages.len() as u64; + let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; + + if new_page_count > old_page_count { + self.allocate(&mut pages, new_page_count as usize)?; + } + + Ok(()) + } +} + #[cfg(feature = "streaming")] mod async_impls { use super::*; use std::{ - io::{Seek as _, Write as _}, + io::{Read as _, Seek as _, Write as _}, pin::Pin, task::{Context, Poll}, }; @@ -153,45 +274,35 @@ mod async_impls { } impl AsyncRead for Segment { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let this = self.get_mut(); - let inner = this.buf.read().unwrap(); - let pos = this.pos as usize; - if pos > inner.len() { - // Bad file descriptor - return Poll::Ready(Err(io::Error::from_raw_os_error(9))); - } - let filled = buf.filled().len(); - AsyncRead::poll_read(Pin::new(&mut &inner[pos..]), cx, buf).map_ok(|()| { - this.pos += (buf.filled().len() - filled) as u64; - }) - } - } - - impl AsyncSeek for Segment { - fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - self.get_mut().seek(position).map(drop) - } - - fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().stream_position()) + fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + Poll::Ready(self.get_mut().read(buf.initialize_unfilled()).map(drop)) } } impl AsyncWrite for Segment { - fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { Poll::Ready(self.get_mut().write(buf)) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } + impl AsyncSeek for Segment { + fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + self.get_mut().seek(position).map(drop) + } + + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().stream_position()) + } + } + impl AsyncFsync for Segment { async fn fsync(&self) {} } @@ -204,12 +315,18 @@ mod async_impls { } /// In-memory implementation of [`Repo`]. -#[derive(Clone, Debug, Default)] -pub struct Memory(SharedLock>); +#[derive(Clone, Debug)] +pub struct Memory { + space: SpaceOnDevice, + segments: SharedLock>, +} impl Memory { - pub fn new() -> Self { - Self::default() + pub fn new(total_space: u64) -> Self { + Self { + space: Arc::new(AtomicI64::new(total_space as _)), + segments: <_>::default(), + } } } @@ -218,13 +335,13 @@ impl Repo for Memory { type SegmentReader = io::BufReader; fn create_segment(&self, offset: u64) -> io::Result { - let mut inner = self.0.write().unwrap(); + let mut inner = self.segments.write().unwrap(); match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { let entry = entry.get(); let read_guard = entry.read().unwrap(); if read_guard.is_empty() { - Ok(Segment::from(Arc::clone(entry))) + Ok(Segment::with_pages(self.space.clone(), entry.clone())) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, @@ -233,21 +350,21 @@ impl Repo for Memory { } } btree_map::Entry::Vacant(entry) => { - let segment = entry.insert(Default::default()); - Ok(Segment::from(Arc::clone(segment))) + let segment = entry.insert(SharedPages::default()); + Ok(Segment::with_pages(self.space.clone(), segment.clone())) } } } fn open_segment_writer(&self, offset: u64) -> io::Result { - let inner = self.0.read().unwrap(); + let inner = self.segments.read().unwrap(); let Some(buf) = inner.get(&offset) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("segment {offset} does not exist"), )); }; - Ok(Segment::from(Arc::clone(buf))) + Ok(Segment::with_pages(self.space.clone(), buf.clone())) } fn open_segment_reader(&self, offset: u64) -> io::Result { @@ -255,7 +372,7 @@ impl Repo for Memory { } fn remove_segment(&self, offset: u64) -> io::Result<()> { - let mut inner = self.0.write().unwrap(); + let mut inner = self.segments.write().unwrap(); if inner.remove(&offset).is_none() { return Err(io::Error::new( io::ErrorKind::NotFound, @@ -271,38 +388,145 @@ impl Repo for Memory { } fn existing_offsets(&self) -> io::Result> { - Ok(self.0.read().unwrap().keys().copied().collect()) + Ok(self.segments.read().unwrap().keys().copied().collect()) } } #[cfg(test)] mod tests { + use pretty_assertions::assert_matches; + use tempfile::tempfile; + use super::*; use std::io::{Read, Seek, Write}; - #[test] - fn segment_read_write_seek() { - let mut segment = Segment::default(); - segment.write_all(b"alonso").unwrap(); + fn read_write_seek(f: &mut (impl Read + Seek + Write)) { + f.write_all(b"alonso").unwrap(); - segment.seek(io::SeekFrom::Start(0)).unwrap(); + f.seek(io::SeekFrom::Start(0)).unwrap(); let mut buf = [0; 6]; - segment.read_exact(&mut buf).unwrap(); + f.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"alonso"); - segment.seek(io::SeekFrom::Start(2)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::Start(2)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 4); assert_eq!(&buf[..4], b"onso"); - segment.seek(io::SeekFrom::Current(-4)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::Current(-4)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 4); assert_eq!(&buf[..4], b"onso"); - segment.seek(io::SeekFrom::End(-3)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::End(-3)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 3); assert_eq!(&buf[0..3], b"nso"); + + f.seek(io::SeekFrom::End(4096)).unwrap(); + let n = f.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn segment_read_write_seek() { + let space_on_device = Arc::new(AtomicI64::new(4096)); + read_write_seek(&mut Segment::new(space_on_device)); + } + + #[test] + fn std_file_read_write_seek() { + read_write_seek(&mut tempfile().unwrap()); + } + + #[test] + fn ftruncate() { + let space_on_device = Arc::new(AtomicI64::new(8192)); + let mut segment = Segment::new(space_on_device); + + let data = [b'z'; 512]; + let mut buf = Vec::with_capacity(4096); + + segment.write_all(&data).unwrap(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf, &data); + + // Extend adds zeroes. + segment.ftruncate(42, 1024).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf[..512], &data); + assert_eq!(&buf[512..], &[0; 512]); + + // Extend beyond existing page allocates zeroed page. + segment.ftruncate(42, 5120).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf[..512], &data); + assert_eq!(&buf[512..], &[0; 512]); + assert_eq!(segment.pages.read().unwrap().len(), 2); + + // Extends beyond available space returns `StorageFull`. + assert_matches!( + segment.ftruncate(42, 9216), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + + // Shrink deallocates pages. + segment.ftruncate(42, 512).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.pages.read().unwrap().len(), 1); + + segment.ftruncate(42, 256).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, &data[..256]); + } + + #[test] + fn fallocate() { + let space_on_device = Arc::new(AtomicI64::new(8192)); + let mut segment = Segment::new(space_on_device); + + let data = [b'z'; 512]; + let mut buf = Vec::with_capacity(4096); + + segment.write_all(&data).unwrap(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + + // Extend within existing page doesn't allocate. + segment.fallocate(1024).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.pages.read().unwrap().len(), 1); + + // Extend beyond page allocates new page. + segment.fallocate(5120).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.pages.read().unwrap().len(), 2); + + // Extend beyond available space returns `StorageFull`. + assert_matches!( + segment.fallocate(9216), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + + // Shrink does nothing. + segment.fallocate(256).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.pages.read().unwrap().len(), 3); + } + + fn read_from_start_to_end(f: &mut (impl Read + Seek), buf: &mut Vec) -> io::Result { + f.rewind()?; + f.read_to_end(buf) } } diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index a6c45ed029f..adf7334eba3 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -14,8 +14,7 @@ pub(crate) mod fs; #[cfg(any(test, feature = "test"))] pub mod mem; -pub use fs::Fs; -pub use fs::OnNewSegmentFn; +pub use fs::{Fs, OnNewSegmentFn, SizeOnDisk}; #[cfg(any(test, feature = "test"))] pub use mem::Memory; @@ -32,6 +31,10 @@ pub trait SegmentLen: io::Seek { /// If the method returns successfully, the seek position before the call is /// restored. However, if it returns an error, the seek position is /// unspecified. + /// + /// The returned length will be the bytes actually written to the segment, + /// not the allocated size of the segment (if the `fallocate` feature is + /// enabled). // // TODO: Remove trait and replace with `Seek::stream_len` if / when stabilized: // https://github.com/rust-lang/rust/issues/59359 @@ -192,6 +195,7 @@ pub fn create_segment_writer( offset: u64, ) -> io::Result> { let mut storage = repo.create_segment(offset)?; + fallocate(&mut storage, opts.max_segment_size)?; Header { log_format_version: opts.log_format_version, checksum_algorithm: Commit::CHECKSUM_ALGORITHM, @@ -238,6 +242,7 @@ pub fn resume_segment_writer( offset: u64, ) -> io::Result, Metadata>> { let mut storage = repo.open_segment_writer(offset)?; + fallocate(&mut storage, opts.max_segment_size)?; let offset_index = repo.get_offset_index(offset).ok(); let Metadata { header, @@ -300,3 +305,11 @@ pub fn open_segment_reader( let storage = repo.open_segment_reader(offset)?; Reader::new(max_log_format_version, offset, storage) } + +#[inline] +fn fallocate(_f: &mut impl FileLike, _size: u64) -> io::Result<()> { + #[cfg(feature = "fallocate")] + _f.fallocate(_size)?; + + Ok(()) +} diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 163adeafe79..cb60ab405c8 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -209,6 +209,19 @@ impl Writer { pub trait FileLike { fn fsync(&mut self) -> io::Result<()>; fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>; + /// Allocate space for at least `size` bytes in the [FileLike]. + /// + /// The allocated space shall not contain zero bytes, and shall not modify + /// the apparent size of the file (as reported by `stat`). + /// + /// No-op if `size` is smaller than the already allocated space. + /// + /// In other words, the method shall behave like: + /// + /// ```ignore + /// fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, size) + /// ``` + fn fallocate(&mut self, size: u64) -> io::Result<()>; } impl FileLike for File { @@ -216,9 +229,54 @@ impl FileLike for File { self.sync_data() } + // `ftruncate` deallocates any extra `fallocate`'d blocks, + // so if the `fallocate` feature is enable, we need + // restore the allocation after truncation. + // + // TODO: Make truncate (shrinking) a [Segment] method, so we can implement + // `ftruncate` just like `ftruncate(2)`. + #[cfg(all(feature = "fallocate", target_os = "linux"))] + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { + let stat = self.metadata()?; + let allocated_size = std::os::unix::fs::MetadataExt::blocks(&stat) * 512; + self.set_len(size)?; + self.fallocate(allocated_size)?; + + Ok(()) + } + + #[cfg(not(feature = "fallocate"))] fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { self.set_len(size) } + + #[cfg(all(feature = "fallocate", target_os = "linux"))] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + use nix::fcntl::{fallocate, FallocateFlags}; + + fallocate(self, FallocateFlags::FALLOC_FL_KEEP_SIZE, 0, size as _)?; + Ok(()) + } + + // Fail compilation if `fallocate` is enabled but not supported. + #[cfg(all(feature = "fallocate", not(all(target_os = "linux", any(test, feature = "test")))))] + compile_error!("`fallocate(2)` is not available on this platform"); + + // No-op if either: + // + // - `fallocate` is not enabled + // - it is enabled, but not supported, and this is a test build + // + // If it's a test build, we may want to run `fallocate` semantics against + // an in-memory backend (on any platform). Hence, we need the method to be + // present. + #[cfg(any( + not(feature = "fallocate"), + all(feature = "fallocate", any(test, feature = "test"), not(target_os = "linux")) + ))] + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } impl FileLike for BufWriter { @@ -229,6 +287,10 @@ impl FileLike for BufWriter { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { self.get_mut().ftruncate(tx_offset, size) } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.get_mut().fallocate(size) + } } impl FileLike for Writer { @@ -245,6 +307,10 @@ impl FileLike for Writer { .map(|index| index.ftruncate(tx_offset, size)); Ok(()) } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.inner.fallocate(size) + } } #[derive(Debug)] @@ -340,6 +406,10 @@ impl FileLike for OffsetIndexWriter { .ok(); Ok(()) } + + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } impl FileLike for IndexFileMut { @@ -351,6 +421,10 @@ impl FileLike for IndexFileMut { self.truncate(tx_offset) .map_err(|e| io::Error::other(format!("failed to truncate offset index at {tx_offset}: {e:?}"))) } + + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } #[derive(Debug)] @@ -714,7 +788,7 @@ mod tests { #[test] fn write_read_roundtrip() { - let repo = repo::Memory::default(); + let repo = repo::Memory::new(4096); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); writer.append([0; 32]).unwrap(); @@ -743,7 +817,7 @@ mod tests { #[test] fn metadata() { - let repo = repo::Memory::default(); + let repo = repo::Memory::new(4096); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 @@ -776,7 +850,7 @@ mod tests { #[test] fn commits() { - let repo = repo::Memory::default(); + let repo = repo::Memory::new(4096); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); @@ -809,7 +883,7 @@ mod tests { #[test] fn transactions() { - let repo = repo::Memory::default(); + let repo = repo::Memory::new(4096); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); diff --git a/crates/commitlog/src/stream/writer.rs b/crates/commitlog/src/stream/writer.rs index a7ff156fdb3..eb1a44ea612 100644 --- a/crates/commitlog/src/stream/writer.rs +++ b/crates/commitlog/src/stream/writer.rs @@ -14,7 +14,7 @@ use crate::{ commit, error, index::IndexFile, repo::{Repo, SegmentLen as _}, - segment::{self, FileLike as _, OffsetIndexWriter, CHECKSUM_LEN, DEFAULT_CHECKSUM_ALGORITHM}, + segment::{self, FileLike, OffsetIndexWriter, CHECKSUM_LEN, DEFAULT_CHECKSUM_ALGORITHM}, stream::common::{read_exact, AsyncFsync}, Options, StoredCommit, DEFAULT_LOG_FORMAT_VERSION, }; @@ -108,6 +108,8 @@ where }; let mut segment = repo.open_segment_writer(last)?; + fallocate(&mut segment, commitlog_options.max_segment_size)?; + let mut offset_index = repo .get_offset_index(last) .inspect_err(|e| { @@ -445,7 +447,7 @@ fn create_segment( .as_ref() .map(|range| range.end) .unwrap_or_default(); - let segment = repo.create_segment(segment_offset).or_else(|e| { + let mut segment = repo.create_segment(segment_offset).or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { trace!("segment already exists"); let mut s = repo.open_segment_writer(segment_offset)?; @@ -460,6 +462,8 @@ fn create_segment( Err(e) })?; + fallocate(&mut segment, commitlog_options.max_segment_size)?; + let index_writer = repo .create_offset_index(segment_offset, commitlog_options.offset_index_len()) .inspect_err(|e| warn!("unable to create offset index segment={segment_offset} err={e:?}")) @@ -468,3 +472,11 @@ fn create_segment( Ok((segment, index_writer)) } + +#[inline] +fn fallocate(_f: &mut impl FileLike, _size: u64) -> io::Result<()> { + #[cfg(feature = "fallocate")] + _f.fallocate(_size)?; + + Ok(()) +} diff --git a/crates/commitlog/src/tests/bitflip.rs b/crates/commitlog/src/tests/bitflip.rs index 13eb133bb98..53b8e4fff09 100644 --- a/crates/commitlog/src/tests/bitflip.rs +++ b/crates/commitlog/src/tests/bitflip.rs @@ -118,17 +118,14 @@ proptest! { let Inputs { log, - segment, + mut segment, byte_pos, bit_mask, segment_offset:_ , } = inputs; - { - let mut data = segment.buf_mut(); - data[byte_pos] ^= bit_mask; - } + segment.modify_byte_at(byte_pos, |b| b ^ bit_mask); let first_err = log .transactions_from(0, &payload::ArrayDecoder) diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index 72666c816d0..2904e6ac70a 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -10,7 +10,7 @@ use crate::{ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic { commitlog::Generic::open( - repo::Memory::new(), + repo::Memory::new(max_segment_size * 4096), Options { max_segment_size, ..Options::default() diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index cb0eacf3a69..231c765a28b 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -1,9 +1,8 @@ use std::{ cmp, fmt::Debug, - io::{self, Seek as _, SeekFrom}, + io::{self, Seek, SeekFrom}, iter::repeat, - sync::RwLockWriteGuard, }; use log::debug; @@ -100,9 +99,8 @@ fn overwrite_reopen() { { let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); - let mut data = last_segment.buf_mut(); - let pos = data.len() - last_commit.encoded_len() + 1; - data[pos] = 255; + let pos = last_segment.len() - last_commit.encoded_len() + 1; + last_segment.modify_byte_at(pos, |_| 255); } let mut log = open_log::<[u8; 32]>(repo.clone()); @@ -162,8 +160,12 @@ struct ShortSegment { } impl ShortSegment { - fn buf_mut(&mut self) -> RwLockWriteGuard<'_, Vec> { - self.inner.buf_mut() + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + self.inner.modify_byte_at(pos, f); } } @@ -181,6 +183,10 @@ impl FileLike for ShortSegment { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> std::io::Result<()> { self.inner.ftruncate(tx_offset, size) } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.inner.fallocate(size) + } } impl io::Write for ShortSegment { @@ -223,7 +229,7 @@ struct ShortMem { impl ShortMem { pub fn new(max_len: u64) -> Self { Self { - inner: repo::Memory::new(), + inner: repo::Memory::new(max_len * 1024), max_len, } } diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index b2249cfb018..85ab653480d 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,5 +1,7 @@ use std::num::NonZeroU16; +use log::info; +use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; use spacetimedb_paths::server::CommitLogDir; use spacetimedb_paths::FromPathUnchecked; @@ -79,6 +81,8 @@ fn resets() { #[test] fn compression() { + enable_logging(); + let root = tempdir().unwrap(); let clog = Commitlog::open( CommitLogDir::from_path_unchecked(root.path()), @@ -101,11 +105,13 @@ fn compression() { let uncompressed_size = clog.size_on_disk().unwrap(); - let mut segments_to_compress = clog.existing_segment_offsets().unwrap(); - segments_to_compress.retain(|&off| off < 20); - clog.compress_segments(&segments_to_compress).unwrap(); + let segments = clog.existing_segment_offsets().unwrap(); + let segments_to_compress = &segments[..segments.len() / 2]; + info!("segments: {segments:?} compressing: {segments_to_compress:?}"); + clog.compress_segments(segments_to_compress).unwrap(); - assert!(clog.size_on_disk().unwrap() < uncompressed_size); + let compressed_size = clog.size_on_disk().unwrap(); + assert!(compressed_size.total_bytes < uncompressed_size.total_bytes); assert!(clog .transactions(&payload::ArrayDecoder) diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 1f9b91d3bc3..fcfead4a4ec 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -1,6 +1,7 @@ use std::{io, sync::Arc}; use async_trait::async_trait; +use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_durability::{DurabilityExited, TxOffset}; use spacetimedb_paths::server::ServerDataDir; use spacetimedb_snapshot::SnapshotRepository; @@ -23,7 +24,7 @@ pub type Durability = dyn spacetimedb_durability::Durability; /// It is not part of the [`Durability`] trait because it must report disk /// usage of the local instance only, even if exclusively remote durability is /// configured or the database is in follower state. -pub type DiskSizeFn = Arc io::Result + Send + Sync>; +pub type DiskSizeFn = Arc io::Result + Send + Sync>; /// Persistence services for a database. pub struct Persistence { @@ -46,7 +47,7 @@ impl Persistence { /// Convenience constructor of a [Persistence] that handles boxing. pub fn new( durability: impl spacetimedb_durability::Durability + 'static, - disk_size: impl Fn() -> io::Result + Send + Sync + 'static, + disk_size: impl Fn() -> io::Result + Send + Sync + 'static, snapshots: Option, ) -> Self { Self { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index c7b05a7bbfb..ab329b7b4f0 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -7,8 +7,8 @@ use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; use fs2::FileExt; -use spacetimedb_commitlog as commitlog; use spacetimedb_commitlog::repo::OnNewSegmentFn; +use spacetimedb_commitlog::{self as commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError}; @@ -627,8 +627,8 @@ impl RelationalDB { /// The number of bytes on disk occupied by the durability layer. /// /// If this is an in-memory instance, `Ok(0)` is returned. - pub fn size_on_disk(&self) -> io::Result { - self.disk_size_fn.as_ref().map_or(Ok(0), |f| f()) + pub fn size_on_disk(&self) -> io::Result { + self.disk_size_fn.as_ref().map_or(Ok(<_>::default()), |f| f()) } /// The size in bytes of all of the in-memory data in this database. diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 6519d738c81..15f12b5168f 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -23,6 +23,7 @@ use async_trait::async_trait; use durability::{Durability, EmptyHistory}; use log::{info, trace, warn}; use parking_lot::Mutex; +use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; @@ -1061,6 +1062,9 @@ async fn metric_reporter(replica_ctx: Arc) { let message_log_size = DB_METRICS .message_log_size .with_label_values(&replica_ctx.database_identity); + let message_log_blocks = DB_METRICS + .message_log_blocks + .with_label_values(&replica_ctx.database_identity); let module_log_file_size = DB_METRICS .module_log_file_size .with_label_values(&replica_ctx.database_identity); @@ -1073,9 +1077,15 @@ async fn metric_reporter(replica_ctx: Arc) { ctx.total_disk_usage() }); if let Ok(disk_usage) = disk_usage_future.await { - if let Some(num_bytes) = disk_usage.durability { - message_log_size.set(num_bytes as i64); + if let Some(SizeOnDisk { + total_bytes, + total_blocks, + }) = disk_usage.durability + { + message_log_size.set(total_bytes as i64); + message_log_blocks.set(total_blocks as i64); } + if let Some(num_bytes) = disk_usage.logs { module_log_file_size.set(num_bytes as i64); } diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 523db69458d..267b0924ea8 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -1,3 +1,5 @@ +use spacetimedb_commitlog::SizeOnDisk; + use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; @@ -23,7 +25,7 @@ impl ReplicaContext { /// The number of bytes on disk occupied by the database's durability layer. /// /// An in-memory database will return `Ok(0)`. - pub fn durability_size_on_disk(&self) -> io::Result { + pub fn durability_size_on_disk(&self) -> io::Result { self.relational_db.size_on_disk() } @@ -64,7 +66,7 @@ impl Deref for ReplicaContext { #[derive(Copy, Clone, Default)] pub struct TotalDiskUsage { - pub durability: Option, + pub durability: Option, pub logs: Option, } @@ -76,8 +78,4 @@ impl TotalDiskUsage { logs: self.logs.or(fallback.logs), } } - - pub fn sum(&self) -> u64 { - self.durability.unwrap_or(0) + self.logs.unwrap_or(0) - } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 41eb62e60ba..c282dc68436 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1133,7 +1133,7 @@ mod tests { let (durable_offset, ..) = watch::channel(None); Self { commitlog: Arc::new(RwLock::new( - commitlog::Generic::open(repo::Memory::new(), <_>::default()).unwrap(), + commitlog::Generic::open(repo::Memory::new(4096), <_>::default()).unwrap(), )), durable_offset, } @@ -1155,7 +1155,7 @@ mod tests { EmptyHistory::new(), Some(Persistence { durability: durability.clone(), - disk_size: Arc::new(|| Ok(0)), + disk_size: Arc::new(|| Ok(<_>::default())), snapshots: None, }), None, diff --git a/crates/datastore/src/db_metrics/mod.rs b/crates/datastore/src/db_metrics/mod.rs index b10d474e09d..21d2f628f99 100644 --- a/crates/datastore/src/db_metrics/mod.rs +++ b/crates/datastore/src/db_metrics/mod.rs @@ -91,6 +91,11 @@ metrics_group!( #[labels(db: Identity)] pub message_log_size: IntGaugeVec, + #[name = spacetime_message_log_size_blocks] + #[help = "For a given database, the number of 512-byte blocks allocated by its message log"] + #[labels(db: Identity)] + pub message_log_blocks: IntGaugeVec, + #[name = spacetime_module_log_file_size_bytes] #[help = "For a given module, the size of its log file (in bytes)"] #[labels(db: Identity)] diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index beaf4be2c4b..13d20209e56 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -9,11 +9,13 @@ description = "Traits and single-node implementation of durability for Spacetime [features] test = [] +fallocate = ["spacetimedb-commitlog/fallocate"] [dependencies] anyhow.workspace = true itertools.workspace = true log.workspace = true +scopeguard.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true @@ -21,5 +23,10 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true +[dev-dependencies] +spacetimedb-commitlog = { workspace = true, features = ["test"] } +tempfile.workspace = true +tokio.workspace = true + [lints] workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 485566d5ff1..c6eb3ff693c 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -12,6 +12,7 @@ use std::{ use anyhow::Context as _; use itertools::Itertools as _; use log::{info, trace, warn}; +use scopeguard::defer_on_unwind; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; use spacetimedb_paths::server::CommitLogDir; use tokio::{ @@ -23,7 +24,7 @@ use tracing::instrument; use crate::{Durability, DurableOffset, History, TxOffset}; -pub use spacetimedb_commitlog::repo::OnNewSegmentFn; +pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] @@ -168,7 +169,7 @@ impl Local { } /// Get the size on disk of the underlying [`Commitlog`]. - pub fn size_on_disk(&self) -> io::Result { + pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() } } @@ -213,13 +214,13 @@ impl PersisterTask { let mut retry = Some(txdata); while let Some(txdata) = retry.take() { if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error(source); + flush_error("persister", source); retry = Some(txdata); } } if flush_after { - clog.flush().map(drop).unwrap_or_else(flush_error); + clog.flush().map(drop).unwrap_or_else(|e| flush_error("persister", e)); } trace!("flush-append succeeded"); @@ -240,10 +241,11 @@ impl PersisterTask { /// /// Panics if the error indicates that the log may be permanently unwritable. #[inline] -fn flush_error(e: io::Error) { - warn!("error flushing commitlog: {e:?}"); - if e.kind() == io::ErrorKind::AlreadyExists { - panic!("commitlog unwritable!"); +#[track_caller] +fn flush_error(task: &str, e: io::Error) { + warn!("error flushing commitlog ({task}): {e:?}"); + if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { + panic!("{e}"); } } @@ -263,6 +265,8 @@ impl FlushAndSyncTask { let mut interval = interval(self.period); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + defer_on_unwind!(self.abort.abort()); + loop { interval.tick().await; @@ -280,13 +284,12 @@ impl FlushAndSyncTask { match task { Err(e) => { if e.is_panic() { - self.abort.abort(); - panic::resume_unwind(e.into_panic()) + panic::resume_unwind(e.into_panic()); } break; } Ok(Err(e)) => { - warn!("flush failed: {e}"); + flush_error("flush-and-sync", e); } Ok(Ok(Some(new_offset))) => { trace!("synced to offset {new_offset}"); @@ -307,7 +310,9 @@ impl Durability for Local { type TxData = Txdata; fn append_tx(&self, tx: Self::TxData) { - self.queue.send(tx).expect("commitlog persister task vanished"); + if self.queue.send(tx).is_err() { + panic!("durability actor crashed"); + } self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs new file mode 100644 index 00000000000..15f5d73f471 --- /dev/null +++ b/crates/durability/tests/io/fallocate.rs @@ -0,0 +1,271 @@ +//! Demonstrates the crash behaviour of `spacetimedb_durability::Local` +//! if the `fallocate` feature is enabled and when there is not enough disk +//! space to pre-allocate commitlog segments. +//! +//! Requires `target_os = "linux"`. +//! +//! The setup involves mounting a file as a loop device. For this, it invokes +//! the `mount`, `umount` and `chmod` commands via `sudo`. The caller must +//! ensure that they have the appropriate entries in `sudoers(5)` to do that +//! without `sudo` prompting for a password. For example: +//! +//! ```ignore +//! %sudo ALL=(ALL) NOPASSWD: /usr/bin/mount, /usr/bin/umount, /usr/bin/chmod +//! ``` +//! +//! The `fallocate` feature is not enabled by default. To run, use: +//! +//! ```ignore +//! cargo test --features fallocate +//! ``` +use std::{ + fs::File, + io, + path::{Path, PathBuf}, + process, + sync::Arc, + time::Duration, +}; + +use anyhow::{anyhow, Context as _}; +use log::{error, info}; +use scopeguard::ScopeGuard; +use spacetimedb_commitlog::{ + payload::txdata::{Mutations, Ops}, + repo::{self, OnNewSegmentFn, Repo}, + segment, + tests::helpers::enable_logging, +}; +use spacetimedb_durability::{Durability, Txdata}; +use spacetimedb_paths::{ + server::{CommitLogDir, ReplicaDir}, + FromPathUnchecked, +}; +use tempfile::{NamedTempFile, TempDir}; +use tokio::{sync::watch, time::sleep}; + +const MB: u64 = 1024 * 1024; + +#[tokio::test] +async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Result<()> { + enable_logging(); + + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let file_path = device_file.path(); + let mountpoint = mountpoint.path(); + + let _guard = mount(file_path, mountpoint, 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint); + + match local_durability(replica_dir.commit_log(), 1024 * MB, None).await { + Err(e) if e.kind() == io::ErrorKind::StorageFull => Ok(()), + Err(e) => Err(e).context("unexpected error"), + Ok(durability) => { + durability.close().await?; + Err(anyhow!("unexpected success")) + } + } + } +} + +// NOTE: This test is set up to proceed more or less sequentially. +// In reality, `append_tx` will fail at some point in the future. +// I.e. transactions can be lost when the host runs out of disk space. +#[tokio::test] +#[should_panic = "durability actor crashed"] +async fn local_durability_crashes_on_new_segment_if_not_enough_space() { + enable_logging(); + + // Inner run fn to allow the use of `?`, + // `should_panic` tests must return unit. + async fn run() -> anyhow::Result<()> { + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let _guard = mount(device_file.path(), mountpoint.path(), 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint.path()); + + let (new_segment_tx, mut new_segment_rx) = watch::channel(()); + let on_new_segment = Arc::new(move || { + new_segment_tx.send_replace(()); + }); + let durability = local_durability(replica_dir.commit_log(), 256 * MB, Some(on_new_segment)).await?; + let txdata = txdata(); + + // Mark initial segment as seen. + new_segment_rx.borrow_and_update(); + // Write past available space. + for _ in 0..256 { + durability.append_tx(txdata.clone()); + } + // Ensure new segment is created. + new_segment_rx.changed().await?; + // Yield to give fallocate a chance to run (and fail). + sleep(Duration::from_millis(5)).await; + // Durability actor should have crashed, so this should panic. + info!("trying append on crashed durability"); + durability.append_tx(txdata.clone()); + } + + Ok(()) + } + + run().await.unwrap() +} + +/// Approximates the case where a commitlog has segments that were created +/// without `fallocate`. +/// +/// Resuming a segment when there is insufficient space should fail. +#[tokio::test] +async fn local_durability_crashes_on_resume_with_insuffient_space() -> anyhow::Result<()> { + enable_logging(); + + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let _guard = mount(device_file.path(), mountpoint.path(), 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint.path()); + + // Write a segment with only a header and no `fallocate` reservation. + { + let repo = repo::Fs::new(replica_dir.commit_log(), None)?; + let mut segment = repo.create_segment(0)?; + segment::Header::default().write(&mut segment)?; + segment.sync_data()?; + } + + // Try to open local durability with a 1GiB segment size, + // which is larger than the available disk space. + match local_durability(replica_dir.commit_log(), 1024 * MB, None).await { + Err(e) if e.kind() == io::ErrorKind::StorageFull => Ok(()), + Err(e) => Err(e).context("unexpected error"), + Ok(durability) => { + durability.close().await?; + Err(anyhow!("unexpected success")) + } + } + } +} + +async fn local_durability( + dir: CommitLogDir, + max_segment_size: u64, + on_new_segment: Option>, +) -> io::Result> { + spacetimedb_durability::Local::open( + dir, + tokio::runtime::Handle::current(), + spacetimedb_durability::local::Options { + commitlog: spacetimedb_commitlog::Options { + max_segment_size, + max_records_in_commit: 1.try_into().unwrap(), + ..<_>::default() + }, + ..<_>::default() + }, + on_new_segment, + ) +} + +fn txdata() -> Txdata<[u8; 1024 * 1024]> { + Txdata { + inputs: None, + outputs: None, + mutations: Some(Mutations { + inserts: [Ops { + table_id: 8000.into(), + rowdata: Arc::new([[42u8; 1024 * 1024]]), + }] + .into(), + deletes: [].into(), + truncates: [].into(), + }), + } +} + +struct Tmp { + device_file: NamedTempFile, + mountpoint: TempDir, +} + +impl Tmp { + fn create() -> io::Result { + let device_file = tempfile::Builder::new().prefix("disk-").tempfile()?; + let mountpoint = tempfile::Builder::new().prefix("mnt-").tempdir()?; + + Ok(Self { + device_file, + mountpoint, + }) + } +} + +fn mount(device_file: &Path, mountpoint: &Path, len: u64) -> anyhow::Result> { + info!("creating empty file at {} with len {}", device_file.display(), len); + { + let file = File::options() + .create(true) + .write(true) + .truncate(true) + .open(device_file)?; + file.set_len(len)?; + file.sync_data()?; + } + + info!("creating filesystem"); + process::Command::new("mkfs") + .args(["-t", "ext4"]) + .arg(device_file) + .status() + .success()?; + + info!("mounting {} at {}", device_file.display(), mountpoint.display()); + sudo(|cmd| { + cmd.args(["mount", "-t", "ext4", "-o", "loop"]) + .arg(device_file) + .arg(mountpoint) + .status() + }) + .success()?; + + let guard = scopeguard::guard(mountpoint.to_path_buf(), |mountpoint| { + if let Err(e) = umount(&mountpoint) { + error!("failed to umount {}: {}", mountpoint.display(), e) + } + }); + + sudo(|cmd| cmd.args(["chmod", "-R", "777"]).arg(mountpoint).status()).success()?; + + Ok(guard) +} + +fn umount(mountpoint: &Path) -> io::Result<()> { + sudo(|cmd| cmd.arg("umount").arg(mountpoint).status()).success() +} + +fn sudo(f: impl FnOnce(&mut process::Command) -> T) -> T { + f(process::Command::new("sudo").arg("--non-interactive")) +} + +trait ExitStatusExt { + fn success(self) -> io::Result<()>; +} + +impl ExitStatusExt for io::Result { + fn success(self) -> io::Result<()> { + let status = self?; + match status.success() { + true => Ok(()), + false => Err(io::Error::from_raw_os_error(status.code().unwrap())), + } + } +} diff --git a/crates/durability/tests/io/mod.rs b/crates/durability/tests/io/mod.rs new file mode 100644 index 00000000000..e48b954fcfa --- /dev/null +++ b/crates/durability/tests/io/mod.rs @@ -0,0 +1,2 @@ +#[cfg(all(target_os = "linux", feature = "fallocate"))] +mod fallocate; diff --git a/crates/durability/tests/main.rs b/crates/durability/tests/main.rs new file mode 100644 index 00000000000..6352663e2a7 --- /dev/null +++ b/crates/durability/tests/main.rs @@ -0,0 +1 @@ +mod io; diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 27d346cd9a0..9a80663461e 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -237,7 +237,7 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result::default())), snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), }; let db = TestDB::open_db(&tmp, EmptyHistory::new(), Some(persistence), None, 0)?; From 413b53935ce1416a1e0fb32ad719355615b3d2ab Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 20 Oct 2025 13:47:28 +0200 Subject: [PATCH 02/10] fixup! commitlog,durability: Support preallocation of disk space --- crates/commitlog/src/repo/mem.rs | 8 ++++++++ crates/commitlog/src/segment.rs | 2 +- crates/core/src/replica_context.rs | 24 ++++++++++++++++++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index 54c52e1af71..df860560a43 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -82,6 +82,14 @@ impl Segment { self.len() == 0 } + /// Modify the byte at `pos` in the segment. + /// + /// Convenience to let tests inject corruptions into the segment, without + /// [io::Seek] ceremony. + /// + /// # Panics + /// + /// Panics if `pos` is not within the segment's bounds. pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { let mut pages = self.pages.write().unwrap(); diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index cb60ab405c8..14e642188c2 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -230,7 +230,7 @@ impl FileLike for File { } // `ftruncate` deallocates any extra `fallocate`'d blocks, - // so if the `fallocate` feature is enable, we need + // so if the `fallocate` feature is enabled, we need // restore the allocation after truncation. // // TODO: Make truncate (shrinking) a [Segment] method, so we can implement diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 267b0924ea8..92d2019da0f 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -39,8 +39,28 @@ impl ReplicaContext { /// Some sources of size-on-disk may error, in which case the corresponding array element will be None. pub fn total_disk_usage(&self) -> TotalDiskUsage { TotalDiskUsage { - durability: self.durability_size_on_disk().ok(), - logs: self.log_file_size().ok(), + durability: self + .durability_size_on_disk() + .inspect_err(|e| { + log::error!( + "database={} replica={}: failed to obtain durability size on disk: {:#}", + self.database.database_identity, + self.replica_id, + e + ); + }) + .ok(), + logs: self + .log_file_size() + .inspect_err(|e| { + log::error!( + "database={} replica={}: failed to obtain log file size: {:#}", + self.database.database_identity, + self.replica_id, + e + ); + }) + .ok(), } } From 6572a05841f026f10e31e1c832194ffe9a8dd8f4 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 20 Oct 2025 14:20:08 +0200 Subject: [PATCH 03/10] fixup! fixup! commitlog,durability: Support preallocation of disk space --- crates/commitlog/src/repo/fs.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 546fa8095bc..22b7b9424f9 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -1,6 +1,5 @@ use std::fs::{self, File}; use std::io; -use std::os::unix::fs::MetadataExt; use std::sync::Arc; use log::{debug, warn}; @@ -51,19 +50,18 @@ pub struct SizeOnDisk { } impl SizeOnDisk { + #[cfg(unix)] fn add(&mut self, stat: std::fs::Metadata) { - self.total_bytes += stat.size(); - #[cfg(unix)] - { - self.total_blocks += std::os::unix::fs::MetadataExt::blocks(&stat); - } - #[cfg(not(unix))] - { - let imaginary_blocks = (self.total_bytes > 0) - .then(|| 8 * self.total_bytes.div_ceil(4096)) - .unwrap_or_default(); - self.total_blocks = imaginary_blocks; - } + self.total_bytes += stat.len(); + self.total_blocks += std::os::unix::fs::MetadataExt::blocks(&stat); + } + + #[cfg(not(unix))] + fn add(&mut self, stat: std::fs::Metadata) { + let imaginary_blocks = (self.total_bytes > 0) + .then(|| 8 * self.total_bytes.div_ceil(4096)) + .unwrap_or_default(); + self.total_blocks = imaginary_blocks; } } From 12c8787400a68dab587a4b039bca89555c518b48 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Tue, 21 Oct 2025 21:07:30 +0200 Subject: [PATCH 04/10] fixup! fixup! fixup! commitlog,durability: Support preallocation of disk space --- crates/commitlog/src/commitlog.rs | 6 +- crates/commitlog/src/repo/mem.rs | 397 ++++++----------------- crates/commitlog/src/repo/mem/page.rs | 52 +++ crates/commitlog/src/repo/mem/segment.rs | 297 +++++++++++++++++ crates/commitlog/src/segment.rs | 8 +- 5 files changed, 454 insertions(+), 306 deletions(-) create mode 100644 crates/commitlog/src/repo/mem/page.rs create mode 100644 crates/commitlog/src/repo/mem/segment.rs diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 65e9c609017..1337f3debab 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -1227,7 +1227,7 @@ mod tests { #[test] fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); assert_eq!(committed, None); @@ -1235,7 +1235,7 @@ mod tests { #[test] fn set_new_epoch_commits() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); log.append(<_>::default()).unwrap(); let committed = log @@ -1248,7 +1248,7 @@ mod tests { #[test] fn set_lower_epoch_returns_error() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(4096), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput) diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index df860560a43..f107e97f1e1 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -3,38 +3,22 @@ use std::{ io, sync::{ atomic::{AtomicI64, Ordering}, - Arc, RwLock, RwLockWriteGuard, + Arc, RwLock, }, }; -use super::Repo; -use crate::{repo::SegmentLen, segment::FileLike}; +use log::debug; -type SharedLock = Arc>; -type SharedPages = SharedLock>; - -const PAGE_SIZE: usize = 4096; +use super::Repo; -#[derive(Debug)] -pub struct Page { - filled: usize, - buf: [u8; PAGE_SIZE], -} +mod page; +pub use page::{Page, PAGE_SIZE}; -impl Page { - pub fn remaining(&self) -> usize { - PAGE_SIZE - self.filled - } -} +mod segment; +pub use segment::Segment; -impl Default for Page { - fn default() -> Self { - Self { - filled: 0, - buf: [0; PAGE_SIZE], - } - } -} +type SharedLock = Arc>; +type SharedPages = SharedLock>; /// The total capacity of the imaginary storage device. /// @@ -44,280 +28,18 @@ impl Default for Page { /// an error if [SpaceOnDevice] goes below zero. pub type SpaceOnDevice = Arc; -/// A log segment backed by a [Vec]. -/// -/// Writing to the segment behaves like a file opened with `O_APPEND`: -/// [`io::Write::write`] always appends to the segment, regardless of the -/// current position, and updates the position to the new length of the segment. -/// The initial position is zero. -/// -/// Note that this is not a faithful model of a file, as safe Rust requires to -/// protect the buffer with a lock. This means that pathological situations -/// arising from concurrent read/write access of a file are impossible to occur. -#[derive(Clone, Debug)] -pub struct Segment { - pos: u64, - pages: SharedPages, - device: SpaceOnDevice, -} - -impl Segment { - pub fn new(device: SpaceOnDevice) -> Self { - Self::with_pages(device, <_>::default()) - } - - pub fn with_pages(device: SpaceOnDevice, pages: SharedPages) -> Self { - Self { pos: 0, pages, device } - } - - pub fn len(&self) -> usize { - self.pages - .read() - .unwrap() - .iter() - .fold(0, |size, page| size + page.filled) - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Modify the byte at `pos` in the segment. - /// - /// Convenience to let tests inject corruptions into the segment, without - /// [io::Seek] ceremony. - /// - /// # Panics - /// - /// Panics if `pos` is not within the segment's bounds. - pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { - let mut pages = self.pages.write().unwrap(); - - let page_idx = pos / PAGE_SIZE; - let page = pages.get_mut(page_idx).expect("pos out of bounds"); - let page_ofs = pos % PAGE_SIZE; - page.buf[page_ofs] = f(page.buf[page_ofs]); - } - - fn allocate(&self, pages: &mut RwLockWriteGuard<'_, Vec>, n: usize) -> io::Result<()> { - let mut allocated = 0; - pages.resize_with(n, || { - allocated += 1; - Page::default() - }); - if self.device.fetch_sub((allocated * PAGE_SIZE) as i64, Ordering::Relaxed) <= 0 { - return Err(io::Error::new(io::ErrorKind::StorageFull, "no space left on device")); - } - - Ok(()) - } -} - -impl SegmentLen for Segment { - fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) - } -} - -impl io::Write for Segment { - fn write(&mut self, buf: &[u8]) -> io::Result { - let mut written = 0; - while written < buf.len() { - let mut pages = self.pages.write().unwrap(); - let page = { - let page_idx = self.pos as usize / PAGE_SIZE; - if page_idx >= pages.len() { - self.allocate(&mut pages, page_idx + 1)?; - } - &mut pages[page_idx] - }; - let remaining = buf.len() - written; - let to_copy = page.remaining().min(remaining); - - let range_in_page = page.filled..page.filled + to_copy; - let range_in_buf = written..written + to_copy; - - page.buf[range_in_page].copy_from_slice(&buf[range_in_buf]); - page.filled += to_copy; - drop(pages); - - written += to_copy; - self.pos += to_copy as u64; - } - - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl io::Read for Segment { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut read = 0; - while read < buf.len() { - let pages = self.pages.read().unwrap(); - let Some(page) = pages.get(self.pos as usize / PAGE_SIZE) else { - break; - }; - let offset_in_page = (self.pos % PAGE_SIZE as u64) as usize; - if offset_in_page >= page.filled { - break; - } - let available_in_page = page.filled - offset_in_page; - let to_copy = (buf.len() - read).min(available_in_page); - - buf[read..read + to_copy].copy_from_slice(&page.buf[offset_in_page..offset_in_page + to_copy]); - - read += to_copy; - self.pos += to_copy as u64; - } - - Ok(read) - } -} - -impl io::Seek for Segment { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let (base_pos, offset) = match pos { - io::SeekFrom::Start(n) => { - self.pos = n; - return Ok(n); - } - io::SeekFrom::End(n) => (self.len() as u64, n), - io::SeekFrom::Current(n) => (self.pos, n), - }; - match base_pos.checked_add_signed(offset) { - Some(n) => { - self.pos = n; - Ok(n) - } - None => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } -} - -impl FileLike for Segment { - fn fsync(&mut self) -> io::Result<()> { - Ok(()) - } - - fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { - use std::cmp::Ordering::*; - - let mut pages = self.pages.write().unwrap(); - let old_page_count = pages.len() as u64; - let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; - - let zero_tail = |maybe_last_page: Option<&mut Page>| { - if let Some(last_page) = maybe_last_page { - let tail_start = (size as usize) % PAGE_SIZE; - last_page.filled = tail_start; - last_page.buf[tail_start..].fill(0); - } - }; - match new_page_count.cmp(&old_page_count) { - Greater => self.allocate(&mut pages, new_page_count as usize)?, - ordering => { - if matches!(ordering, Less) { - pages.truncate(new_page_count as usize); - } - zero_tail(pages.last_mut()); - } - }; - - if self.pos > size { - self.pos = size; - } - - Ok(()) - } - - fn fallocate(&mut self, size: u64) -> io::Result<()> { - let mut pages = self.pages.write().unwrap(); - let old_page_count = pages.len() as u64; - let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; - - if new_page_count > old_page_count { - self.allocate(&mut pages, new_page_count as usize)?; - } - - Ok(()) - } -} - #[cfg(feature = "streaming")] mod async_impls { use super::*; - use std::{ - io::{Read as _, Seek as _, Write as _}, - pin::Pin, - task::{Context, Poll}, - }; - - use tokio::io::{self, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; - - use crate::stream::{AsyncFsync, AsyncLen, AsyncRepo, IntoAsyncWriter}; + use crate::stream::AsyncRepo; impl AsyncRepo for Memory { - type AsyncSegmentWriter = io::BufWriter; - type AsyncSegmentReader = io::BufReader; + type AsyncSegmentWriter = tokio::io::BufWriter; + type AsyncSegmentReader = tokio::io::BufReader; async fn open_segment_reader_async(&self, offset: u64) -> io::Result { - self.open_segment_writer(offset).map(io::BufReader::new) - } - } - - impl IntoAsyncWriter for Segment { - type AsyncWriter = tokio::io::BufWriter; - - fn into_async_writer(self) -> Self::AsyncWriter { - tokio::io::BufWriter::new(self) - } - } - - impl AsyncRead for Segment { - fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - Poll::Ready(self.get_mut().read(buf.initialize_unfilled()).map(drop)) - } - } - - impl AsyncWrite for Segment { - fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { - Poll::Ready(self.get_mut().write(buf)) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - } - - impl AsyncSeek for Segment { - fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - self.get_mut().seek(position).map(drop) - } - - fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().stream_position()) - } - } - - impl AsyncFsync for Segment { - async fn fsync(&self) {} - } - - impl AsyncLen for Segment { - async fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) + self.open_segment_writer(offset).map(tokio::io::BufReader::new) } } } @@ -332,7 +54,7 @@ pub struct Memory { impl Memory { pub fn new(total_space: u64) -> Self { Self { - space: Arc::new(AtomicI64::new(total_space as _)), + space: Arc::new(AtomicI64::new(total_space.min(i64::MAX as u64) as i64)), segments: <_>::default(), } } @@ -343,6 +65,7 @@ impl Repo for Memory { type SegmentReader = io::BufReader; fn create_segment(&self, offset: u64) -> io::Result { + debug!("create_segment: space={}", self.space.load(Ordering::Relaxed)); let mut inner = self.segments.write().unwrap(); match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { @@ -402,11 +125,14 @@ impl Repo for Memory { #[cfg(test)] mod tests { + use std::io::{Read, Seek, Write}; + use pretty_assertions::assert_matches; use tempfile::tempfile; + use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use super::*; - use std::io::{Read, Seek, Write}; + use crate::{segment::FileLike as _, tests::helpers::enable_logging}; fn read_write_seek(f: &mut (impl Read + Seek + Write)) { f.write_all(b"alonso").unwrap(); @@ -447,6 +173,50 @@ mod tests { read_write_seek(&mut tempfile().unwrap()); } + async fn async_read_write_seek(f: &mut (impl AsyncRead + AsyncSeek + AsyncWrite + Unpin)) { + use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; + + enable_logging(); + + f.write_all(b"alonso").await.unwrap(); + + f.seek(io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = [0; 6]; + f.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"alonso"); + + f.seek(io::SeekFrom::Start(2)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::Current(-4)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::End(-3)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 3); + assert_eq!(&buf[0..3], b"nso"); + + f.seek(io::SeekFrom::End(4096)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 0); + } + + #[tokio::test] + async fn std_file_async_read_write_seek() { + let tmp = tempfile().unwrap(); + async_read_write_seek(&mut tokio::fs::File::from_std(tmp)).await + } + + #[tokio::test] + async fn segment_async_read_write_seek() { + let space_on_device = Arc::new(AtomicI64::new(4096)); + async_read_write_seek(&mut Segment::new(space_on_device)).await + } + #[test] fn ftruncate() { let space_on_device = Arc::new(AtomicI64::new(8192)); @@ -472,9 +242,9 @@ mod tests { read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(&buf[..512], &data); assert_eq!(&buf[512..], &[0; 512]); - assert_eq!(segment.pages.read().unwrap().len(), 2); + assert_eq!(segment.page_count(), 2); - // Extends beyond available space returns `StorageFull`. + // Extend beyond available space returns `StorageFull`. assert_matches!( segment.ftruncate(42, 9216), Err(e) if e.kind() == io::ErrorKind::StorageFull @@ -485,7 +255,7 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.pages.read().unwrap().len(), 1); + assert_eq!(segment.page_count(), 1); segment.ftruncate(42, 256).unwrap(); buf.clear(); @@ -510,14 +280,14 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.pages.read().unwrap().len(), 1); + assert_eq!(segment.page_count(), 1); // Extend beyond page allocates new page. segment.fallocate(5120).unwrap(); buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.pages.read().unwrap().len(), 2); + assert_eq!(segment.page_count(), 2); // Extend beyond available space returns `StorageFull`. assert_matches!( @@ -530,7 +300,36 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.pages.read().unwrap().len(), 3); + assert_eq!(segment.page_count(), 2); + } + + #[test] + fn write_many_pages() { + enable_logging(); + + let space_on_device = Arc::new(AtomicI64::new(4096 * 4)); + let mut segment = Segment::new(space_on_device); + + let data = [b'y'; 4096]; + for _ in 0..4 { + segment.write_all(&data[..2048]).unwrap(); + segment.write_all(&data[2048..]).unwrap(); + } + assert_matches!( + segment.write_all(&data[..2048]), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + segment.rewind().unwrap(); + + let mut buf = [0; 4096]; + for _ in 0..4 { + segment.read_exact(&mut buf).unwrap(); + assert!(buf.iter().all(|&x| x == b'y')); + } + assert_matches!( + segment.read_exact(&mut buf), + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof + ); } fn read_from_start_to_end(f: &mut (impl Read + Seek), buf: &mut Vec) -> io::Result { diff --git a/crates/commitlog/src/repo/mem/page.rs b/crates/commitlog/src/repo/mem/page.rs new file mode 100644 index 00000000000..219ec55c011 --- /dev/null +++ b/crates/commitlog/src/repo/mem/page.rs @@ -0,0 +1,52 @@ +use std::slice::SliceIndex; + +pub const PAGE_SIZE: usize = 4096; + +#[derive(Debug)] +pub struct Page { + filled: usize, + buf: [u8; PAGE_SIZE], +} + +impl Page { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + filled: 0, + buf: [0; PAGE_SIZE], + } + } + + pub fn remaining(&self) -> usize { + PAGE_SIZE - self.filled + } + + pub fn len(&self) -> usize { + self.filled + } + + pub fn is_empty(&self) -> bool { + self.filled == 0 + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + self.buf[pos] = f(self.buf[pos]) + } + + pub fn copy_from_slice(&mut self, buf: &[u8]) { + self.buf[self.filled..self.filled + buf.len()].copy_from_slice(buf); + self.filled += buf.len(); + } + + pub fn slice(&self, range: I) -> &I::Output + where + I: SliceIndex<[u8]>, + { + self.buf.get(range).expect("range out of bounds") + } + + pub fn zeroize(&mut self, pos: usize) { + self.buf[pos..].fill(0); + self.filled = pos; + } +} diff --git a/crates/commitlog/src/repo/mem/segment.rs b/crates/commitlog/src/repo/mem/segment.rs new file mode 100644 index 00000000000..d8a4379a702 --- /dev/null +++ b/crates/commitlog/src/repo/mem/segment.rs @@ -0,0 +1,297 @@ +use std::{ + io, + sync::{atomic::Ordering, Arc, RwLock, RwLockWriteGuard}, +}; + +use log::{debug, trace}; + +use crate::{ + repo::{ + mem::{Page, SpaceOnDevice, PAGE_SIZE}, + SegmentLen, + }, + segment::FileLike, +}; + +type SharedLock = Arc>; +type SharedPages = SharedLock>; + +/// A log segment backed by a [Vec]. +/// +/// Writing to the segment behaves like a file opened with `O_APPEND`: +/// [`io::Write::write`] always appends to the segment, regardless of the +/// current position, and updates the position to the new length of the segment. +/// The initial position is zero. +/// +/// Note that this is not a faithful model of a file, as safe Rust requires to +/// protect the buffer with a lock. This means that pathological situations +/// arising from concurrent read/write access of a file are impossible to occur. +#[derive(Clone, Debug)] +pub struct Segment { + pos: u64, + pages: SharedPages, + device: SpaceOnDevice, +} + +impl Segment { + pub fn new(device: SpaceOnDevice) -> Self { + Self::with_pages(device, <_>::default()) + } + + pub(super) fn with_pages(device: SpaceOnDevice, pages: SharedPages) -> Self { + Self { pos: 0, pages, device } + } + + pub fn len(&self) -> usize { + self.pages + .read() + .unwrap() + .iter() + .fold(0, |size, page| size + page.len()) + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn page_count(&self) -> usize { + self.pages.read().unwrap().len() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + let mut pages = self.pages.write().unwrap(); + + let page_idx = pos / PAGE_SIZE; + let page = pages.get_mut(page_idx).expect("pos out of bounds"); + let page_ofs = pos % PAGE_SIZE; + page.modify_byte_at(page_ofs, f); + } + + fn allocate(&self, pages: &mut RwLockWriteGuard<'_, Vec>, n: usize) -> io::Result<()> { + assert!(n > pages.len()); + let page_size = PAGE_SIZE as i64; + for _ in pages.len()..n { + if self.device.load(Ordering::Relaxed) - page_size < 0 { + return Err(io::Error::new( + io::ErrorKind::StorageFull, + "not enough space left on device", + )); + } + pages.push(Page::new()); + if self.device.fetch_sub(page_size, Ordering::Relaxed) < 0 { + return Err(io::Error::new(io::ErrorKind::StorageFull, "no space left on device")); + } + } + + Ok(()) + } +} + +impl SegmentLen for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(self.len() as u64) + } +} + +impl io::Write for Segment { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut written = 0; + while written < buf.len() { + let mut pages = self.pages.write().unwrap(); + let page = { + let page_idx = self.pos as usize / PAGE_SIZE; + if page_idx >= pages.len() { + self.allocate(&mut pages, page_idx + 1)?; + } + &mut pages[page_idx] + }; + let remaining = buf.len() - written; + let to_copy = page.remaining().min(remaining); + + page.copy_from_slice(&buf[written..written + to_copy]); + drop(pages); + + written += to_copy; + self.pos += to_copy as u64; + } + + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl io::Read for Segment { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let mut read = 0; + while read < buf.len() { + trace!("read {} from {}", buf.len(), self.pos); + let pages = self.pages.read().unwrap(); + let Some(page) = pages.get(self.pos as usize / PAGE_SIZE) else { + trace!("no page at pos"); + break; + }; + let offset_in_page = (self.pos % PAGE_SIZE as u64) as usize; + if offset_in_page >= page.len() { + trace!("offset after initialized bytes in page"); + break; + } + let available_in_page = page.len() - offset_in_page; + let to_copy = (buf.len() - read).min(available_in_page); + trace!("available_in_page={available_in_page} to_copy={to_copy}"); + + buf[read..read + to_copy].copy_from_slice(page.slice(offset_in_page..offset_in_page + to_copy)); + trace!("buf={buf:?}"); + + read += to_copy; + self.pos += to_copy as u64; + } + + Ok(read) + } +} + +impl io::Seek for Segment { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let (base_pos, offset) = match pos { + io::SeekFrom::Start(n) => { + self.pos = n; + return Ok(n); + } + io::SeekFrom::End(n) => (self.len() as u64, n), + io::SeekFrom::Current(n) => (self.pos, n), + }; + match base_pos.checked_add_signed(offset) { + Some(n) => { + self.pos = n; + Ok(n) + } + None => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + } + } +} + +impl FileLike for Segment { + fn fsync(&mut self) -> io::Result<()> { + Ok(()) + } + + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { + use std::cmp::Ordering::*; + + let mut pages = self.pages.write().unwrap(); + let old_page_count = pages.len() as u64; + let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; + + let zero_tail = |maybe_last_page: Option<&mut Page>| { + if let Some(last_page) = maybe_last_page { + let tail_start = (size as usize) % PAGE_SIZE; + last_page.zeroize(tail_start); + } + }; + match new_page_count.cmp(&old_page_count) { + Greater => self.allocate(&mut pages, new_page_count as usize)?, + ordering => { + if matches!(ordering, Less) { + pages.truncate(new_page_count as usize); + } + zero_tail(pages.last_mut()); + } + }; + + if self.pos > size { + self.pos = size; + } + + Ok(()) + } + + fn fallocate(&mut self, size: u64) -> io::Result<()> { + let mut pages = self.pages.write().unwrap(); + let old_page_count = pages.len() as u64; + let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; + + debug!( + "fallocate {}: old_page_count={} new_page_count={}", + size, old_page_count, new_page_count + ); + + if new_page_count > old_page_count { + self.allocate(&mut pages, new_page_count as usize)?; + } + + Ok(()) + } +} + +#[cfg(feature = "streaming")] +mod async_impls { + use super::*; + + use std::{ + io::{Read as _, Seek as _, Write as _}, + pin::Pin, + task::{Context, Poll}, + }; + + use tokio::io::{self, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; + + use crate::stream::{AsyncFsync, AsyncLen, IntoAsyncWriter}; + + impl IntoAsyncWriter for Segment { + type AsyncWriter = tokio::io::BufWriter; + + fn into_async_writer(self) -> Self::AsyncWriter { + tokio::io::BufWriter::new(self) + } + } + + impl AsyncRead for Segment { + fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let res = self.get_mut().read(buf.initialize_unfilled()); + if let Ok(read) = &res { + buf.advance(*read); + } + Poll::Ready(res.map(drop)) + } + } + + impl AsyncWrite for Segment { + fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(self.get_mut().write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + impl AsyncSeek for Segment { + fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + self.get_mut().seek(position).map(drop) + } + + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().stream_position()) + } + } + + impl AsyncFsync for Segment { + async fn fsync(&self) {} + } + + impl AsyncLen for Segment { + async fn segment_len(&mut self) -> io::Result { + Ok(self.len() as u64) + } + } +} diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 14e642188c2..df273c1bb30 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -788,7 +788,7 @@ mod tests { #[test] fn write_read_roundtrip() { - let repo = repo::Memory::new(4096); + let repo = repo::Memory::new(u64::MAX); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); writer.append([0; 32]).unwrap(); @@ -817,7 +817,7 @@ mod tests { #[test] fn metadata() { - let repo = repo::Memory::new(4096); + let repo = repo::Memory::new(u64::MAX); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 @@ -850,7 +850,7 @@ mod tests { #[test] fn commits() { - let repo = repo::Memory::new(4096); + let repo = repo::Memory::new(u64::MAX); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); @@ -883,7 +883,7 @@ mod tests { #[test] fn transactions() { - let repo = repo::Memory::new(4096); + let repo = repo::Memory::new(u64::MAX); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); From a65d03137c22fd0912b7710122636076d9ff74bc Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 23 Oct 2025 13:22:49 +0200 Subject: [PATCH 05/10] Make allocation in the Memory repo virtual, for performance reasons --- crates/commitlog/src/commitlog.rs | 6 +- crates/commitlog/src/repo/fs.rs | 1 + crates/commitlog/src/repo/mem.rs | 241 +++++++++------- crates/commitlog/src/repo/mem/page.rs | 52 ---- crates/commitlog/src/repo/mem/segment.rs | 267 ++++++++++-------- crates/commitlog/src/repo/mod.rs | 6 + crates/commitlog/src/segment.rs | 42 +-- crates/commitlog/src/tests/helpers.rs | 2 +- crates/commitlog/src/tests/partial.rs | 3 +- .../subscription/module_subscription_actor.rs | 2 +- 10 files changed, 311 insertions(+), 311 deletions(-) delete mode 100644 crates/commitlog/src/repo/mem/page.rs diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index 1337f3debab..aac1ad9d6c6 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -1227,7 +1227,7 @@ mod tests { #[test] fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); assert_eq!(committed, None); @@ -1235,7 +1235,7 @@ mod tests { #[test] fn set_new_epoch_commits() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); log.append(<_>::default()).unwrap(); let committed = log @@ -1248,7 +1248,7 @@ mod tests { #[test] fn set_lower_epoch_returns_error() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(u64::MAX), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput) diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 22b7b9424f9..6e5de959d0c 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -139,6 +139,7 @@ impl FileLike for NamedTempFile { self.as_file_mut().ftruncate(tx_offset, size) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()> { self.as_file_mut().fallocate(size) } diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index f107e97f1e1..ff4f03caadb 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -1,63 +1,47 @@ use std::{ collections::{btree_map, BTreeMap}, io, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, RwLock, - }, + sync::{Arc, Mutex, RwLock}, }; -use log::debug; - -use super::Repo; - -mod page; -pub use page::{Page, PAGE_SIZE}; +use crate::repo::{ + mem::segment::{SharedLock, Storage}, + Repo, +}; mod segment; pub use segment::Segment; -type SharedLock = Arc>; -type SharedPages = SharedLock>; +pub const PAGE_SIZE: usize = 4096; /// The total capacity of the imaginary storage device. /// /// [Segment]s are allocated from [Memory], which tracks the total space it -/// has available. [SpaceOnDevice] is shared by each [Segment]. When a [Segment] -/// allocates a [Page], it deducts the page's size from the space, returning -/// an error if [SpaceOnDevice] goes below zero. -pub type SpaceOnDevice = Arc; - -#[cfg(feature = "streaming")] -mod async_impls { - use super::*; - - use crate::stream::AsyncRepo; - - impl AsyncRepo for Memory { - type AsyncSegmentWriter = tokio::io::BufWriter; - type AsyncSegmentReader = tokio::io::BufReader; - - async fn open_segment_reader_async(&self, offset: u64) -> io::Result { - self.open_segment_writer(offset).map(tokio::io::BufReader::new) - } - } -} +/// has available. [SpaceOnDevice] is shared by each [Segment]. +/// +/// [Segment]s allocate space in [PAGE_SIZE] increments. When space is allocated, +/// it is deducted from [SpaceOnDevice]. When there is not enough [SpaceOnDevice], +/// allocating operations will return [io::ErrorKind::StorageFull]. +pub type SpaceOnDevice = Arc>; /// In-memory implementation of [`Repo`]. #[derive(Clone, Debug)] pub struct Memory { space: SpaceOnDevice, - segments: SharedLock>, + segments: SharedLock>>, } impl Memory { pub fn new(total_space: u64) -> Self { Self { - space: Arc::new(AtomicI64::new(total_space.min(i64::MAX as u64) as i64)), + space: Arc::new(Mutex::new(total_space)), segments: <_>::default(), } } + + pub fn unlimited() -> Self { + Self::new(u64::MAX) + } } impl Repo for Memory { @@ -65,14 +49,13 @@ impl Repo for Memory { type SegmentReader = io::BufReader; fn create_segment(&self, offset: u64) -> io::Result { - debug!("create_segment: space={}", self.space.load(Ordering::Relaxed)); let mut inner = self.segments.write().unwrap(); match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { let entry = entry.get(); let read_guard = entry.read().unwrap(); if read_guard.is_empty() { - Ok(Segment::with_pages(self.space.clone(), entry.clone())) + Ok(Segment::from_shared(self.space.clone(), entry.clone())) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, @@ -81,8 +64,8 @@ impl Repo for Memory { } } btree_map::Entry::Vacant(entry) => { - let segment = entry.insert(SharedPages::default()); - Ok(Segment::with_pages(self.space.clone(), segment.clone())) + let segment = entry.insert(Arc::new(RwLock::new(Storage::new()))); + Ok(Segment::from_shared(self.space.clone(), segment.clone())) } } } @@ -95,7 +78,7 @@ impl Repo for Memory { format!("segment {offset} does not exist"), )); }; - Ok(Segment::with_pages(self.space.clone(), buf.clone())) + Ok(Segment::from_shared(self.space.clone(), buf.clone())) } fn open_segment_reader(&self, offset: u64) -> io::Result { @@ -123,13 +106,116 @@ impl Repo for Memory { } } +#[cfg(feature = "streaming")] +mod async_impls { + use std::io; + + use crate::{ + repo::{ + mem::{Memory, Segment}, + Repo as _, + }, + stream::AsyncRepo, + }; + + impl AsyncRepo for Memory { + type AsyncSegmentWriter = tokio::io::BufWriter; + type AsyncSegmentReader = tokio::io::BufReader; + + async fn open_segment_reader_async(&self, offset: u64) -> io::Result { + self.open_segment_writer(offset).map(tokio::io::BufReader::new) + } + } + + #[cfg(test)] + mod tests { + use std::io; + + use pretty_assertions::assert_matches; + use tempfile::tempfile; + use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _}; + + use crate::{repo::mem::Segment, tests::helpers::enable_logging}; + + async fn read_write_seek(f: &mut (impl AsyncRead + AsyncSeek + AsyncWrite + Unpin)) { + enable_logging(); + + f.write_all(b"alonso").await.unwrap(); + + f.seek(io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = [0; 6]; + f.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"alonso"); + + f.seek(io::SeekFrom::Start(2)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::Current(-4)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::End(-3)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 3); + assert_eq!(&buf[0..3], b"nso"); + + f.seek(io::SeekFrom::End(4096)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 0); + } + + #[tokio::test] + async fn std_file_read_write_seek() { + let tmp = tempfile().unwrap(); + read_write_seek(&mut tokio::fs::File::from_std(tmp)).await + } + + #[tokio::test] + async fn segment_read_write_seek() { + read_write_seek(&mut Segment::new(4096)).await + } + + #[tokio::test] + async fn write_many_pages() { + use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; + + enable_logging(); + + let mut segment = Segment::new(4 * 4096); + + let data = [b'y'; 4096]; + for _ in 0..4 { + segment.write_all(&data[..2048]).await.unwrap(); + segment.write_all(&data[2048..]).await.unwrap(); + } + assert_matches!( + segment.write_all(&data[..2048]).await, + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + segment.rewind().await.unwrap(); + + let mut buf = [0; 4096]; + for _ in 0..4 { + segment.read_exact(&mut buf).await.unwrap(); + assert!(buf.iter().all(|&x| x == b'y')); + } + assert_matches!( + segment.read_exact(&mut buf).await, + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof + ); + } + } +} + #[cfg(test)] mod tests { use std::io::{Read, Seek, Write}; use pretty_assertions::assert_matches; use tempfile::tempfile; - use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use super::*; use crate::{segment::FileLike as _, tests::helpers::enable_logging}; @@ -164,8 +250,7 @@ mod tests { #[test] fn segment_read_write_seek() { - let space_on_device = Arc::new(AtomicI64::new(4096)); - read_write_seek(&mut Segment::new(space_on_device)); + read_write_seek(&mut Segment::new(4096)); } #[test] @@ -173,54 +258,11 @@ mod tests { read_write_seek(&mut tempfile().unwrap()); } - async fn async_read_write_seek(f: &mut (impl AsyncRead + AsyncSeek + AsyncWrite + Unpin)) { - use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; - - enable_logging(); - - f.write_all(b"alonso").await.unwrap(); - - f.seek(io::SeekFrom::Start(0)).await.unwrap(); - let mut buf = [0; 6]; - f.read_exact(&mut buf).await.unwrap(); - assert_eq!(&buf, b"alonso"); - - f.seek(io::SeekFrom::Start(2)).await.unwrap(); - let n = f.read(&mut buf).await.unwrap(); - assert_eq!(n, 4); - assert_eq!(&buf[..4], b"onso"); - - f.seek(io::SeekFrom::Current(-4)).await.unwrap(); - let n = f.read(&mut buf).await.unwrap(); - assert_eq!(n, 4); - assert_eq!(&buf[..4], b"onso"); - - f.seek(io::SeekFrom::End(-3)).await.unwrap(); - let n = f.read(&mut buf).await.unwrap(); - assert_eq!(n, 3); - assert_eq!(&buf[0..3], b"nso"); - - f.seek(io::SeekFrom::End(4096)).await.unwrap(); - let n = f.read(&mut buf).await.unwrap(); - assert_eq!(n, 0); - } - - #[tokio::test] - async fn std_file_async_read_write_seek() { - let tmp = tempfile().unwrap(); - async_read_write_seek(&mut tokio::fs::File::from_std(tmp)).await - } - - #[tokio::test] - async fn segment_async_read_write_seek() { - let space_on_device = Arc::new(AtomicI64::new(4096)); - async_read_write_seek(&mut Segment::new(space_on_device)).await - } - #[test] fn ftruncate() { - let space_on_device = Arc::new(AtomicI64::new(8192)); - let mut segment = Segment::new(space_on_device); + enable_logging(); + + let mut segment = Segment::new(2 * 4096); let data = [b'z'; 512]; let mut buf = Vec::with_capacity(4096); @@ -241,8 +283,10 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(&buf[..512], &data); - assert_eq!(&buf[512..], &[0; 512]); - assert_eq!(segment.page_count(), 2); + let rest = &buf[512..]; + assert_eq!(rest.len(), 5120 - 512); + assert!(rest.iter().all(|&b| b == 0)); + assert_eq!(segment.allocated_space(), 8192); // Extend beyond available space returns `StorageFull`. assert_matches!( @@ -255,7 +299,7 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.page_count(), 1); + assert_eq!(segment.allocated_space(), 4096); segment.ftruncate(42, 256).unwrap(); buf.clear(); @@ -263,10 +307,12 @@ mod tests { assert_eq!(buf, &data[..256]); } + #[cfg(feature = "fallocate")] #[test] fn fallocate() { - let space_on_device = Arc::new(AtomicI64::new(8192)); - let mut segment = Segment::new(space_on_device); + enable_logging(); + + let mut segment = Segment::new(8192); let data = [b'z'; 512]; let mut buf = Vec::with_capacity(4096); @@ -280,14 +326,14 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.page_count(), 1); + assert_eq!(segment.allocated_space(), 4096); // Extend beyond page allocates new page. segment.fallocate(5120).unwrap(); buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.page_count(), 2); + assert_eq!(segment.allocated_space(), 2 * 4096); // Extend beyond available space returns `StorageFull`. assert_matches!( @@ -300,15 +346,14 @@ mod tests { buf.clear(); read_from_start_to_end(&mut segment, &mut buf).unwrap(); assert_eq!(buf, data); - assert_eq!(segment.page_count(), 2); + assert_eq!(segment.allocated_space(), 2 * 4096); } #[test] fn write_many_pages() { enable_logging(); - let space_on_device = Arc::new(AtomicI64::new(4096 * 4)); - let mut segment = Segment::new(space_on_device); + let mut segment = Segment::new(4 * 4096); let data = [b'y'; 4096]; for _ in 0..4 { diff --git a/crates/commitlog/src/repo/mem/page.rs b/crates/commitlog/src/repo/mem/page.rs deleted file mode 100644 index 219ec55c011..00000000000 --- a/crates/commitlog/src/repo/mem/page.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::slice::SliceIndex; - -pub const PAGE_SIZE: usize = 4096; - -#[derive(Debug)] -pub struct Page { - filled: usize, - buf: [u8; PAGE_SIZE], -} - -impl Page { - #[allow(clippy::new_without_default)] - pub fn new() -> Self { - Self { - filled: 0, - buf: [0; PAGE_SIZE], - } - } - - pub fn remaining(&self) -> usize { - PAGE_SIZE - self.filled - } - - pub fn len(&self) -> usize { - self.filled - } - - pub fn is_empty(&self) -> bool { - self.filled == 0 - } - - pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { - self.buf[pos] = f(self.buf[pos]) - } - - pub fn copy_from_slice(&mut self, buf: &[u8]) { - self.buf[self.filled..self.filled + buf.len()].copy_from_slice(buf); - self.filled += buf.len(); - } - - pub fn slice(&self, range: I) -> &I::Output - where - I: SliceIndex<[u8]>, - { - self.buf.get(range).expect("range out of bounds") - } - - pub fn zeroize(&mut self, pos: usize) { - self.buf[pos..].fill(0); - self.filled = pos; - } -} diff --git a/crates/commitlog/src/repo/mem/segment.rs b/crates/commitlog/src/repo/mem/segment.rs index d8a4379a702..eb806dacf6d 100644 --- a/crates/commitlog/src/repo/mem/segment.rs +++ b/crates/commitlog/src/repo/mem/segment.rs @@ -1,22 +1,54 @@ use std::{ io, - sync::{atomic::Ordering, Arc, RwLock, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock}, }; -use log::{debug, trace}; - use crate::{ repo::{ - mem::{Page, SpaceOnDevice, PAGE_SIZE}, + mem::{SpaceOnDevice, PAGE_SIZE}, SegmentLen, }, segment::FileLike, }; -type SharedLock = Arc>; -type SharedPages = SharedLock>; +pub type SharedLock = Arc>; + +/// Backing storage for a [Segment]. +/// +/// Morally, this consists of [PAGE_SIZE] chunks. Actually allocating the +/// memory is, however, prohibitively expensive (in particular in property +/// test). Thus, the underlying [Vec] buffer allocates as necessary, but +/// [Storage] tracks the logical amount of allocated space (in [PAGE_SIZE] +/// increments). +/// +/// The data of a [Storage] is fully managed by its frontend [Segment]. +/// The type is exported to allow sharing the storage between different +/// segments, each tracking a different read/write position. +#[derive(Debug)] +pub(super) struct Storage { + alloc: u64, + buf: Vec, +} + +impl Storage { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + alloc: 0, + buf: Vec::with_capacity(PAGE_SIZE), + } + } + + pub const fn len(&self) -> usize { + self.buf.len() + } -/// A log segment backed by a [Vec]. + pub const fn is_empty(&self) -> bool { + self.buf.is_empty() + } +} + +/// A log segment backed by a [Vec]. /// /// Writing to the segment behaves like a file opened with `O_APPEND`: /// [`io::Write::write`] always appends to the segment, regardless of the @@ -29,93 +61,65 @@ type SharedPages = SharedLock>; #[derive(Clone, Debug)] pub struct Segment { pos: u64, - pages: SharedPages, - device: SpaceOnDevice, + storage: SharedLock, + space: SpaceOnDevice, } impl Segment { - pub fn new(device: SpaceOnDevice) -> Self { - Self::with_pages(device, <_>::default()) + pub fn new(space: u64) -> Self { + Self::from_shared(Arc::new(Mutex::new(space)), Arc::new(RwLock::new(Storage::new()))) } - pub(super) fn with_pages(device: SpaceOnDevice, pages: SharedPages) -> Self { - Self { pos: 0, pages, device } + pub(super) fn from_shared(space: SpaceOnDevice, storage: SharedLock) -> Self { + Self { pos: 0, space, storage } } pub fn len(&self) -> usize { - self.pages - .read() - .unwrap() - .iter() - .fold(0, |size, page| size + page.len()) + self.storage.read().unwrap().len() } pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn page_count(&self) -> usize { - self.pages.read().unwrap().len() + self.storage.read().unwrap().is_empty() } pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { - let mut pages = self.pages.write().unwrap(); - - let page_idx = pos / PAGE_SIZE; - let page = pages.get_mut(page_idx).expect("pos out of bounds"); - let page_ofs = pos % PAGE_SIZE; - page.modify_byte_at(page_ofs, f); + let mut storage = self.storage.write().unwrap(); + storage.buf[pos] = f(storage.buf[pos]) } - fn allocate(&self, pages: &mut RwLockWriteGuard<'_, Vec>, n: usize) -> io::Result<()> { - assert!(n > pages.len()); - let page_size = PAGE_SIZE as i64; - for _ in pages.len()..n { - if self.device.load(Ordering::Relaxed) - page_size < 0 { - return Err(io::Error::new( - io::ErrorKind::StorageFull, - "not enough space left on device", - )); - } - pages.push(Page::new()); - if self.device.fetch_sub(page_size, Ordering::Relaxed) < 0 { - return Err(io::Error::new(io::ErrorKind::StorageFull, "no space left on device")); - } - } - - Ok(()) - } -} - -impl SegmentLen for Segment { - fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) + pub fn allocated_space(&self) -> u64 { + self.storage.read().unwrap().alloc } } impl io::Write for Segment { fn write(&mut self, buf: &[u8]) -> io::Result { - let mut written = 0; - while written < buf.len() { - let mut pages = self.pages.write().unwrap(); - let page = { - let page_idx = self.pos as usize / PAGE_SIZE; - if page_idx >= pages.len() { - self.allocate(&mut pages, page_idx + 1)?; - } - &mut pages[page_idx] - }; - let remaining = buf.len() - written; - let to_copy = page.remaining().min(remaining); - - page.copy_from_slice(&buf[written..written + to_copy]); - drop(pages); - - written += to_copy; - self.pos += to_copy as u64; + let mut storage = self.storage.write().unwrap(); + + let mut remaining = (storage.alloc - self.pos) as usize; + // If we don't have enough space, allocate some. + // If not enough space to write all of `buf` can be allocated, + // just write as much as we can. The next `write` call will return + // ENOSPC then. + if remaining == 0 { + let mut avail = self.space.lock().unwrap(); + if *avail == 0 { + return Err(enospc()); + } + + let want = (buf.len() - remaining).next_multiple_of(PAGE_SIZE); + let have = want.min(*avail as usize); + + storage.alloc += have as u64; + *avail -= have as u64; + remaining = (storage.alloc - self.pos) as usize; } - Ok(written) + let read = buf.len().min(remaining); + storage.buf.extend(&buf[..read]); + self.pos += read as u64; + + Ok(read) } fn flush(&mut self) -> io::Result<()> { @@ -125,31 +129,17 @@ impl io::Write for Segment { impl io::Read for Segment { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut read = 0; - while read < buf.len() { - trace!("read {} from {}", buf.len(), self.pos); - let pages = self.pages.read().unwrap(); - let Some(page) = pages.get(self.pos as usize / PAGE_SIZE) else { - trace!("no page at pos"); - break; - }; - let offset_in_page = (self.pos % PAGE_SIZE as u64) as usize; - if offset_in_page >= page.len() { - trace!("offset after initialized bytes in page"); - break; - } - let available_in_page = page.len() - offset_in_page; - let to_copy = (buf.len() - read).min(available_in_page); - trace!("available_in_page={available_in_page} to_copy={to_copy}"); - - buf[read..read + to_copy].copy_from_slice(page.slice(offset_in_page..offset_in_page + to_copy)); - trace!("buf={buf:?}"); + let storage = self.storage.read().unwrap(); - read += to_copy; - self.pos += to_copy as u64; - } + let Some(remaining) = storage.len().checked_sub(self.pos as usize) else { + return Ok(0); + }; + let want = remaining.min(buf.len()); + let pos = self.pos as usize; + buf[..want].copy_from_slice(&storage.buf[pos..pos + want]); + self.pos += want as u64; - Ok(read) + Ok(want) } } @@ -176,59 +166,86 @@ impl io::Seek for Segment { } } +impl SegmentLen for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(self.len() as _) + } +} + impl FileLike for Segment { fn fsync(&mut self) -> io::Result<()> { Ok(()) } fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { - use std::cmp::Ordering::*; - - let mut pages = self.pages.write().unwrap(); - let old_page_count = pages.len() as u64; - let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; - - let zero_tail = |maybe_last_page: Option<&mut Page>| { - if let Some(last_page) = maybe_last_page { - let tail_start = (size as usize) % PAGE_SIZE; - last_page.zeroize(tail_start); + let mut storage = self.storage.write().unwrap(); + let mut avail = self.space.lock().unwrap(); + + // NOTE: We don't modify `self.pos`, which is how `ftruncate(2)` behaves. + // This means the position can be invalid after calling this. + if size > storage.alloc { + if *avail == 0 { + return Err(enospc()); } - }; - match new_page_count.cmp(&old_page_count) { - Greater => self.allocate(&mut pages, new_page_count as usize)?, - ordering => { - if matches!(ordering, Less) { - pages.truncate(new_page_count as usize); - } - zero_tail(pages.last_mut()); - } - }; - if self.pos > size { - self.pos = size; + let want = size.next_multiple_of(PAGE_SIZE as u64) - storage.alloc; + let have = want.min(*avail); + + storage.alloc += have; + *avail -= have; + storage.buf.resize(size as usize, 0); + + // NOTE: `ftruncate(2)` is a bit ambiguous as to what should happen + // if the requested size exceeds the available space. + // + // [std::fs::File::set_len] will succeed, but all subsequent + // operations return EBADF. + // + // That's not super helpful, so instead we zero out as much space as + // possible, and return ENOSPC if more than that was requested. + if want > have { + return Err(enospc()); + } + } else { + let alloc = size.next_multiple_of(PAGE_SIZE as u64); + *avail += storage.alloc - alloc; + storage.alloc = alloc; + storage.buf.resize(size as usize, 0); } Ok(()) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()> { - let mut pages = self.pages.write().unwrap(); - let old_page_count = pages.len() as u64; - let new_page_count = size.next_multiple_of(PAGE_SIZE as u64) / PAGE_SIZE as u64; + let mut storage = self.storage.write().unwrap(); - debug!( - "fallocate {}: old_page_count={} new_page_count={}", - size, old_page_count, new_page_count - ); + if size <= storage.alloc { + return Ok(()); + } + + let mut avail = self.space.lock().unwrap(); + if *avail == 0 { + return Err(enospc()); + } - if new_page_count > old_page_count { - self.allocate(&mut pages, new_page_count as usize)?; + let want = size.next_multiple_of(PAGE_SIZE as u64) - storage.alloc; + let have = want.min(*avail); + storage.alloc += have; + *avail -= have; + + if want > have { + return Err(enospc()); } Ok(()) } } +fn enospc() -> io::Error { + io::Error::new(io::ErrorKind::StorageFull, "no space left on device") +} + #[cfg(feature = "streaming")] mod async_impls { use super::*; diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index adf7334eba3..7de7665a4c5 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -195,6 +195,7 @@ pub fn create_segment_writer( offset: u64, ) -> io::Result> { let mut storage = repo.create_segment(offset)?; + // Ensure we have enough space for this segment. fallocate(&mut storage, opts.max_segment_size)?; Header { log_format_version: opts.log_format_version, @@ -242,6 +243,10 @@ pub fn resume_segment_writer( offset: u64, ) -> io::Result, Metadata>> { let mut storage = repo.open_segment_writer(offset)?; + // Ensure we have enough space for this segment. + // The segment could have been created without the `fallocate` feature + // enabled, so we call this here again to ensure writes can't fail due to + // ENOSPC. fallocate(&mut storage, opts.max_segment_size)?; let offset_index = repo.get_offset_index(offset).ok(); let Metadata { @@ -306,6 +311,7 @@ pub fn open_segment_reader( Reader::new(max_log_format_version, offset, storage) } +/// Allocate space if the `fallocate` feature is enabled. No-op otherwise. #[inline] fn fallocate(_f: &mut impl FileLike, _size: u64) -> io::Result<()> { #[cfg(feature = "fallocate")] diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index df273c1bb30..bf07bebab4c 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -221,6 +221,7 @@ pub trait FileLike { /// ```ignore /// fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, size) /// ``` + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()>; } @@ -229,23 +230,6 @@ impl FileLike for File { self.sync_data() } - // `ftruncate` deallocates any extra `fallocate`'d blocks, - // so if the `fallocate` feature is enabled, we need - // restore the allocation after truncation. - // - // TODO: Make truncate (shrinking) a [Segment] method, so we can implement - // `ftruncate` just like `ftruncate(2)`. - #[cfg(all(feature = "fallocate", target_os = "linux"))] - fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { - let stat = self.metadata()?; - let allocated_size = std::os::unix::fs::MetadataExt::blocks(&stat) * 512; - self.set_len(size)?; - self.fallocate(allocated_size)?; - - Ok(()) - } - - #[cfg(not(feature = "fallocate"))] fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { self.set_len(size) } @@ -259,21 +243,15 @@ impl FileLike for File { } // Fail compilation if `fallocate` is enabled but not supported. - #[cfg(all(feature = "fallocate", not(all(target_os = "linux", any(test, feature = "test")))))] + #[cfg(all(feature = "fallocate", not(target_os = "linux"), not(any(test, feature = "test"))))] compile_error!("`fallocate(2)` is not available on this platform"); - // No-op if either: - // - // - `fallocate` is not enabled - // - it is enabled, but not supported, and this is a test build + // No-op if `fallocate` is enabled, unsupported, but this is a test build. // // If it's a test build, we may want to run `fallocate` semantics against // an in-memory backend (on any platform). Hence, we need the method to be // present. - #[cfg(any( - not(feature = "fallocate"), - all(feature = "fallocate", any(test, feature = "test"), not(target_os = "linux")) - ))] + #[cfg(all(feature = "fallocate", not(target_os = "linux"), any(test, feature = "test")))] fn fallocate(&mut self, _: u64) -> io::Result<()> { Ok(()) } @@ -288,6 +266,7 @@ impl FileLike for BufWriter { self.get_mut().ftruncate(tx_offset, size) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()> { self.get_mut().fallocate(size) } @@ -308,6 +287,7 @@ impl FileLike for Writer { Ok(()) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()> { self.inner.fallocate(size) } @@ -407,6 +387,7 @@ impl FileLike for OffsetIndexWriter { Ok(()) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, _: u64) -> io::Result<()> { Ok(()) } @@ -422,6 +403,7 @@ impl FileLike for IndexFileMut { .map_err(|e| io::Error::other(format!("failed to truncate offset index at {tx_offset}: {e:?}"))) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, _: u64) -> io::Result<()> { Ok(()) } @@ -788,7 +770,7 @@ mod tests { #[test] fn write_read_roundtrip() { - let repo = repo::Memory::new(u64::MAX); + let repo = repo::Memory::unlimited(); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); writer.append([0; 32]).unwrap(); @@ -817,7 +799,7 @@ mod tests { #[test] fn metadata() { - let repo = repo::Memory::new(u64::MAX); + let repo = repo::Memory::unlimited(); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 @@ -850,7 +832,7 @@ mod tests { #[test] fn commits() { - let repo = repo::Memory::new(u64::MAX); + let repo = repo::Memory::unlimited(); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); @@ -883,7 +865,7 @@ mod tests { #[test] fn transactions() { - let repo = repo::Memory::new(u64::MAX); + let repo = repo::Memory::unlimited(); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index 2904e6ac70a..e7129c78caa 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -10,7 +10,7 @@ use crate::{ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic { commitlog::Generic::open( - repo::Memory::new(max_segment_size * 4096), + repo::Memory::unlimited(), Options { max_segment_size, ..Options::default() diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index 231c765a28b..4404cb948cb 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -184,6 +184,7 @@ impl FileLike for ShortSegment { self.inner.ftruncate(tx_offset, size) } + #[cfg(feature = "fallocate")] fn fallocate(&mut self, size: u64) -> io::Result<()> { self.inner.fallocate(size) } @@ -229,7 +230,7 @@ struct ShortMem { impl ShortMem { pub fn new(max_len: u64) -> Self { Self { - inner: repo::Memory::new(max_len * 1024), + inner: repo::Memory::new(max_len * 4096), max_len, } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index c282dc68436..12ea0c16059 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1133,7 +1133,7 @@ mod tests { let (durable_offset, ..) = watch::channel(None); Self { commitlog: Arc::new(RwLock::new( - commitlog::Generic::open(repo::Memory::new(4096), <_>::default()).unwrap(), + commitlog::Generic::open(repo::Memory::unlimited(), <_>::default()).unwrap(), )), durable_offset, } From 988f06440cd32d02cf4ae0b7cc7964a753ae7f9b Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 23 Oct 2025 13:43:04 +0200 Subject: [PATCH 06/10] See if we can run the durability tests with --features fallocate --- .github/workflows/ci.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9fae3383572..57d41be10d4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,6 +125,9 @@ jobs: #Note: Unreal tests will be run separately run: cargo test --all -- --skip unreal + - name: Run fallocate tests + run: cargo test -p spacetimedb-durability --features fallocate + - name: Check that the test outputs are up-to-date run: bash tools/check-diff.sh From 595103ef233ce883ca56080ddd78ba1d8f34ea9c Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 23 Oct 2025 17:32:50 +0200 Subject: [PATCH 07/10] Allow runtime configuration of whether or not to preallocate segments --- crates/commitlog/src/lib.rs | 12 ++++++++++++ crates/commitlog/src/repo/mod.rs | 16 +++++++++++----- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 52d7576813d..8ed7ba72e06 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -90,6 +90,12 @@ pub struct Options { serde(default = "Options::default_offset_index_require_segment_fsync") )] pub offset_index_require_segment_fsync: bool, + /// If `true`, preallocate the disk space for commitlog segments, up to the + /// `max_segment_size`. + /// + /// Has no effect if the `fallocate` feature is not enabled. + #[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))] + pub preallocate_segments: bool, } impl Default for Options { @@ -103,6 +109,7 @@ impl Options { pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX; pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; + pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, @@ -110,6 +117,7 @@ impl Options { max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), + preallocate_segments: Self::default_preallocate_segments(), }; pub const fn default_log_format_version() -> u8 { @@ -132,6 +140,10 @@ impl Options { Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC } + pub const fn default_preallocate_segments() -> bool { + Self::DEFAULT_PREALLOCATE_SEGMENTS + } + /// Compute the length in bytes of an offset index based on the settings in /// `self`. pub fn offset_index_len(&self) -> u64 { diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index 7de7665a4c5..ae4fdcb66f8 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -196,7 +196,7 @@ pub fn create_segment_writer( ) -> io::Result> { let mut storage = repo.create_segment(offset)?; // Ensure we have enough space for this segment. - fallocate(&mut storage, opts.max_segment_size)?; + fallocate(&mut storage, &opts)?; Header { log_format_version: opts.log_format_version, checksum_algorithm: Commit::CHECKSUM_ALGORITHM, @@ -247,7 +247,7 @@ pub fn resume_segment_writer( // The segment could have been created without the `fallocate` feature // enabled, so we call this here again to ensure writes can't fail due to // ENOSPC. - fallocate(&mut storage, opts.max_segment_size)?; + fallocate(&mut storage, &opts)?; let offset_index = repo.get_offset_index(offset).ok(); let Metadata { header, @@ -311,11 +311,17 @@ pub fn open_segment_reader( Reader::new(max_log_format_version, offset, storage) } -/// Allocate space if the `fallocate` feature is enabled. No-op otherwise. +/// Allocate [Options::max_segment_size] of space for [FileLike] +/// if the `fallocate` feature is enabled, +/// and [Options::preallocate_segments] is `true`. +/// +/// No-op otherwise. #[inline] -fn fallocate(_f: &mut impl FileLike, _size: u64) -> io::Result<()> { +fn fallocate(_f: &mut impl FileLike, _opts: &Options) -> io::Result<()> { #[cfg(feature = "fallocate")] - _f.fallocate(_size)?; + if _opts.preallocate_segments { + _f.fallocate(_opts.max_segment_size)?; + } Ok(()) } From cf4d6aafc7e63618598ec6c03616cd10447ab6f6 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Thu, 23 Oct 2025 17:37:08 +0200 Subject: [PATCH 08/10] fixup! Allow runtime configuration of whether or not to preallocate segments --- crates/commitlog/src/repo/mod.rs | 2 +- crates/commitlog/src/stream/writer.rs | 14 +++----------- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index ae4fdcb66f8..2f5eed1d3eb 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -317,7 +317,7 @@ pub fn open_segment_reader( /// /// No-op otherwise. #[inline] -fn fallocate(_f: &mut impl FileLike, _opts: &Options) -> io::Result<()> { +pub(crate) fn fallocate(_f: &mut impl FileLike, _opts: &Options) -> io::Result<()> { #[cfg(feature = "fallocate")] if _opts.preallocate_segments { _f.fallocate(_opts.max_segment_size)?; diff --git a/crates/commitlog/src/stream/writer.rs b/crates/commitlog/src/stream/writer.rs index eb1a44ea612..83f37a9f1d6 100644 --- a/crates/commitlog/src/stream/writer.rs +++ b/crates/commitlog/src/stream/writer.rs @@ -13,7 +13,7 @@ use tokio::{ use crate::{ commit, error, index::IndexFile, - repo::{Repo, SegmentLen as _}, + repo::{fallocate, Repo, SegmentLen as _}, segment::{self, FileLike, OffsetIndexWriter, CHECKSUM_LEN, DEFAULT_CHECKSUM_ALGORITHM}, stream::common::{read_exact, AsyncFsync}, Options, StoredCommit, DEFAULT_LOG_FORMAT_VERSION, @@ -108,7 +108,7 @@ where }; let mut segment = repo.open_segment_writer(last)?; - fallocate(&mut segment, commitlog_options.max_segment_size)?; + fallocate(&mut segment, &commitlog_options)?; let mut offset_index = repo .get_offset_index(last) @@ -462,7 +462,7 @@ fn create_segment( Err(e) })?; - fallocate(&mut segment, commitlog_options.max_segment_size)?; + fallocate(&mut segment, &commitlog_options)?; let index_writer = repo .create_offset_index(segment_offset, commitlog_options.offset_index_len()) @@ -472,11 +472,3 @@ fn create_segment( Ok((segment, index_writer)) } - -#[inline] -fn fallocate(_f: &mut impl FileLike, _size: u64) -> io::Result<()> { - #[cfg(feature = "fallocate")] - _f.fallocate(_size)?; - - Ok(()) -} From 586db7b17911d8664b1616fe86548bb818cebb6e Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Fri, 24 Oct 2025 16:46:16 +0200 Subject: [PATCH 09/10] Warn if preallocate_segments && not(feature = "fallocate") --- crates/commitlog/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 8ed7ba72e06..fbdf22b60e5 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -171,6 +171,13 @@ impl Commitlog { /// free-standing functions in this module for how to traverse a read-only /// commitlog. pub fn open(root: CommitLogDir, opts: Options, on_new_segment: Option>) -> io::Result { + #[cfg(not(feature = "fallocate"))] + if opts.preallocate_segments { + log::warn!( + "`preallocate_segments` enabled but not supported by this build. commitlog-dir={}", + root.display() + ); + } let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?; Ok(Self { From 21f12bef79be8bf527d62ac0147b03f654980ac7 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Mon, 10 Nov 2025 17:11:41 +0100 Subject: [PATCH 10/10] Fix defaults for fallocate tests --- crates/durability/tests/io/fallocate.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs index 15f5d73f471..533d1fdeb2b 100644 --- a/crates/durability/tests/io/fallocate.rs +++ b/crates/durability/tests/io/fallocate.rs @@ -168,6 +168,7 @@ async fn local_durability( commitlog: spacetimedb_commitlog::Options { max_segment_size, max_records_in_commit: 1.try_into().unwrap(), + preallocate_segments: true, ..<_>::default() }, ..<_>::default()