diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 0ac1f43f57e..82be228e4a2 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -332,16 +332,39 @@ impl Commitlog { } /// Compress the segments at the offsets provided, marking them as immutable. + /// + /// `offsets` must contain the exact segment offsets, no rounding to the + /// nearest offset is performed. If a segment is not found on disk, an error + /// is returned and no further segments from the list are processed. + /// + /// The latest, writable segment will not be compressed. If `offsets` + /// contains its offset, an error is returned. + /// + /// This method acquires a read lock on this `Commitlog` instance, but + /// releases it once the compression work starts. Concurrent compression + /// tasks on the same segment are safe, but external coordination is + /// required to avoid duplicate work. + /// + /// Attempting to compress a segment that is already compressed incurs a + /// small overhead to open the file and determining its format, but + /// otherwise does nothing. pub fn compress_segments(&self, offsets: &[u64]) -> io::Result { - // even though `compress_segment` takes &self, we take an - // exclusive lock to avoid any weirdness happening. - #[allow(clippy::readonly_write_lock)] - let inner = self.inner.write().unwrap(); - assert!(!offsets.contains(&inner.head.min_tx_offset())); - // TODO: parallelize, maybe + let (repo, head_offset) = { + let inner = self.inner.read().unwrap(); + let repo = inner.repo.clone(); + let head_offset = inner.head.min_tx_offset(); + + (repo, head_offset) + }; + if offsets.contains(&head_offset) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("refusing to compress mutable segment {head_offset}"), + )); + } let mut stats = <_>::default(); for offset in offsets { - stats += inner.repo.compress_segment(*offset)?; + stats += repo.compress_segment(*offset)?; } Ok(stats) }