Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,39 @@ impl<T> Commitlog<T> {
}

/// Compress the segments at the offsets provided, marking them as immutable.
///
/// `offsets` must contain the exact segment offsets, no rounding to the
/// nearest offset is performed. If a segment is not found on disk, an error
/// is returned and no further segments from the list are processed.
///
/// The latest, writable segment will not be compressed. If `offsets`
/// contains its offset, an error is returned.
///
/// This method acquires a read lock on this `Commitlog` instance, but
/// releases it once the compression work starts. Concurrent compression
/// tasks on the same segment are safe, but external coordination is
/// required to avoid duplicate work.
///
/// Attempting to compress a segment that is already compressed incurs a
/// small overhead to open the file and determining its format, but
/// otherwise does nothing.
pub fn compress_segments(&self, offsets: &[u64]) -> io::Result<CompressionStats> {
// even though `compress_segment` takes &self, we take an
// exclusive lock to avoid any weirdness happening.
#[allow(clippy::readonly_write_lock)]
let inner = self.inner.write().unwrap();
assert!(!offsets.contains(&inner.head.min_tx_offset()));
// TODO: parallelize, maybe
let (repo, head_offset) = {
let inner = self.inner.read().unwrap();
let repo = inner.repo.clone();
let head_offset = inner.head.min_tx_offset();

(repo, head_offset)
};
if offsets.contains(&head_offset) {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("refusing to compress mutable segment {head_offset}"),
));
}
let mut stats = <_>::default();
for offset in offsets {
stats += inner.repo.compress_segment(*offset)?;
stats += repo.compress_segment(*offset)?;
}
Ok(stats)
}
Expand Down
Loading