Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove zstd in anticipation for it being re-added through marble #1426

Merged
merged 6 commits into from
Oct 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
default.sled
crash_*
*db
*conf
*snap.*
Expand Down
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ default = []
# test-only configurations that cause performance to drop significantly.
# It will cause your tests to take much more time, and possibly time out etc...
testing = ["event_log", "lock_free_delays", "light_testing"]
light_testing = ["compression", "failpoints", "backtrace", "memshred"]
compression = ["zstd"]
light_testing = ["failpoints", "backtrace", "memshred"]
lock_free_delays = []
failpoints = []
event_log = []
Expand All @@ -41,20 +40,20 @@ no_logs = ["log/max_level_off"]
no_inline = []
pretty_backtrace = ["color-backtrace"]
docs = []
no_zstd = []
miri_optimizations = []
mutex = []
memshred = []

[dependencies]
libc = "0.2.96"
zstd = { version = "0.11.2", optional = true }
crc32fast = "1.2.1"
log = "0.4.14"
parking_lot = "0.12.0"
parking_lot = "0.12.1"
color-backtrace = { version = "0.5.1", optional = true }
num-format = { version = "0.4.0", optional = true }
backtrace = { version = "0.3.60", optional = true }
im = "15.0.0"
im = "15.1.0"

[target.'cfg(any(target_os = "linux", target_os = "macos", target_os="windows"))'.dependencies]
fs2 = "0.4.3"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ extreme::run(async move {

# minimum supported Rust version (MSRV)

We support Rust 1.57.0 and up.
We support Rust 1.62 and up.

# architecture

Expand Down
1 change: 0 additions & 1 deletion benchmarks/stress2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ overflow-checks = true
default = []
lock_free_delays = ["sled/lock_free_delays"]
event_log = ["sled/event_log"]
compression = ["sled/compression"]
no_logs = ["sled/no_logs"]
metrics = ["sled/metrics"]
jemalloc = ["jemallocator"]
Expand Down
4 changes: 2 additions & 2 deletions scripts/cross_compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ rustup update --no-self-update

RUSTFLAGS="--cfg miri" cargo check

rustup toolchain install 1.57.0 --no-self-update
rustup toolchain install 1.62 --no-self-update
cargo clean
rm Cargo.lock
cargo +1.57.0 check
cargo +1.62 check

for target in $targets; do
echo "setting up $target..."
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ impl Config {
);
if self.use_compression {
supported!(
cfg!(feature = "compression"),
"the 'compression' feature must be enabled"
!cfg!(feature = "no_zstd"),
"the 'no_zstd' feature is set, but Config.use_compression is also set to true"
);
}
supported!(
Expand Down
3 changes: 1 addition & 2 deletions src/doc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
//! * forward and reverse iterators
//! * a monotonic ID generator capable of giving out 75-125+ million unique IDs
//! per second, never double allocating even in the presence of crashes
//! * [zstd](https://github.com/facebook/zstd) compression (use the zstd build
//! feature)
//! * [zstd](https://github.com/facebook/zstd) compression
//! * cpu-scalable lock-free implementation
//! * SSD-optimized log-structured storage
//!
Expand Down
4 changes: 2 additions & 2 deletions src/ebr/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl CompareAndSetOrdering for (Ordering, Ordering) {

/// Returns a bitmask containing the unused least significant bits of an aligned pointer to `T`.
#[inline]
fn low_bits<T: ?Sized + Pointable>() -> usize {
const fn low_bits<T: ?Sized + Pointable>() -> usize {
(1 << T::ALIGN.trailing_zeros()) - 1
}

Expand Down Expand Up @@ -720,7 +720,7 @@ impl<'g, T> Shared<'g, T> {

impl<'g, T: ?Sized + Pointable> Shared<'g, T> {
/// Returns a new null pointer.
pub(crate) fn null() -> Shared<'g, T> {
pub(crate) const fn null() -> Shared<'g, T> {
Shared { data: 0, _marker: PhantomData }
}

Expand Down
2 changes: 1 addition & 1 deletion src/ebr/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl Global {

for _ in 0..steps {
match self.queue.try_pop_if(
&|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
guard,
) {
None => break,
Expand Down
2 changes: 1 addition & 1 deletion src/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ fn multithreaded() {
}));
}

for t in threads.into_iter() {
for t in threads {
t.join().unwrap();
}

Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ mod compile_time_assertions {
use crate::*;

#[allow(unreachable_code)]
fn _assert_public_types_send_sync() {
const fn _assert_public_types_send_sync() {
_assert_send::<Subscriber>();

_assert_send_sync::<Iter>();
Expand All @@ -513,9 +513,9 @@ mod compile_time_assertions {
_assert_send_sync::<Mode>();
}

fn _assert_send<S: Send>() {}
const fn _assert_send<S: Send>() {}

fn _assert_send_sync<S: Send + Sync>() {}
const fn _assert_send_sync<S: Send + Sync>() {}
}

#[cfg(all(unix, not(miri)))]
Expand Down
6 changes: 1 addition & 5 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,14 +320,10 @@ impl Metrics {
ret.push_str(&format!("hit ratio: {}%\n", hit_ratio));

ret.push_str(&format!("{}\n", "-".repeat(134)));
ret.push_str("serialization and compression:\n");
ret.push_str("serialization:\n");
ret.push_str(&p(vec![
lat("serialize", &self.serialize),
lat("deserialize", &self.deserialize),
#[cfg(feature = "compression")]
lat("compress", &self.compress),
#[cfg(feature = "compression")]
lat("decompress", &self.decompress),
]));

ret.push_str(&format!("{}\n", "-".repeat(134)));
Expand Down
2 changes: 1 addition & 1 deletion src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2629,7 +2629,7 @@ mod test {
}

let key_ref = KeyRef::Computed { base: &[2, 253], distance: 8 };
let mut buf = &mut [0, 0][..];
let buf = &mut [0, 0][..];
key_ref.write_into(buf);
assert_eq!(buf, &[3, 5]);
}
Expand Down
20 changes: 3 additions & 17 deletions src/pagecache/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,10 @@ impl Heap {
}
}

pub fn read(
&self,
heap_id: HeapId,
use_compression: bool,
) -> Result<(MessageKind, Vec<u8>)> {
pub fn read(&self, heap_id: HeapId) -> Result<(MessageKind, Vec<u8>)> {
log::trace!("Heap::read({:?})", heap_id);
let (slab_id, slab_idx, original_lsn) = heap_id.decompose();
self.slabs[slab_id as usize].read(
slab_idx,
original_lsn,
use_compression,
)
self.slabs[slab_id as usize].read(slab_idx, original_lsn)
}

pub fn free(&self, heap_id: HeapId) {
Expand Down Expand Up @@ -300,7 +292,6 @@ impl Slab {
&self,
slab_idx: SlabIdx,
original_lsn: Lsn,
use_compression: bool,
) -> Result<(MessageKind, Vec<u8>)> {
let bs = slab_id_to_size(self.slab_id);
let offset = u64::from(slab_idx) * bs;
Expand Down Expand Up @@ -332,12 +323,7 @@ impl Slab {
return Err(Error::corruption(None));
}
let buf = heap_buf[13..].to_vec();
let buf2 = if use_compression {
crate::pagecache::decompress(buf)
} else {
buf
};
Ok((MessageKind::from(heap_buf[0]), buf2))
Ok((MessageKind::from(heap_buf[0]), buf))
} else {
log::debug!(
"heap message CRC does not match contents. stored: {} actual: {}",
Expand Down
69 changes: 17 additions & 52 deletions src/pagecache/logger.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fs::File;

use super::{
arr_to_lsn, arr_to_u32, assert_usize, decompress, header, iobuf,
lsn_to_arr, pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc,
BasedBuf, DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn,
MessageKind, Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID,
COUNTER_PID, MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
arr_to_lsn, arr_to_u32, assert_usize, header, iobuf, lsn_to_arr,
pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc, BasedBuf,
DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn, MessageKind,
Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID, COUNTER_PID,
MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
};

use crate::*;
Expand Down Expand Up @@ -70,18 +70,16 @@ impl Log {
// here because it might not still
// exist in the inline log.
let heap_id = ptr.heap_id().unwrap();
self.config.heap.read(heap_id, self.config.use_compression).map(
|(kind, buf)| {
let header = MessageHeader {
kind,
pid,
segment_number: expected_segment_number,
crc32: 0,
len: 0,
};
LogRead::Heap(header, buf, heap_id, 0)
},
)
self.config.heap.read(heap_id).map(|(kind, buf)| {
let header = MessageHeader {
kind,
pid,
segment_number: expected_segment_number,
crc32: 0,
len: 0,
};
LogRead::Heap(header, buf, heap_id, 0)
})
}
}

Expand Down Expand Up @@ -129,43 +127,13 @@ impl Log {
/// completed or aborted later. Useful for maintaining
/// linearizability across CAS operations that may need to
/// persist part of their operation.
#[allow(unused)]
pub fn reserve<T: Serialize + Debug>(
&self,
log_kind: LogKind,
pid: PageId,
item: &T,
guard: &Guard,
) -> Result<Reservation<'_>> {
#[cfg(feature = "compression")]
{
if self.config.use_compression && pid != BATCH_MANIFEST_PID {
use zstd::bulk::compress;

let buf = item.serialize();

#[cfg(feature = "metrics")]
let _measure = Measure::new(&M.compress);

let compressed_buf =
compress(&buf, self.config.compression_factor).unwrap();

let ret = self.reserve_inner(
log_kind,
pid,
&IVec::from(compressed_buf),
None,
guard,
);

if let Err(e) = &ret {
self.iobufs.set_global_error(*e);
}

return ret;
}
}

let ret = self.reserve_inner(log_kind, pid, item, None, guard);

if let Err(e) = &ret {
Expand Down Expand Up @@ -860,7 +828,7 @@ pub(crate) fn read_message<R: ReadAt>(
assert_eq!(buf.len(), 16);
let heap_id = HeapId::deserialize(&mut &buf[..]).unwrap();

match config.heap.read(heap_id, config.use_compression) {
match config.heap.read(heap_id) {
Ok((kind, buf2)) => {
assert_eq!(header.kind, kind);
trace!(
Expand All @@ -883,10 +851,7 @@ pub(crate) fn read_message<R: ReadAt>(
| MessageKind::Free
| MessageKind::Counter => {
trace!("read a successful inline message");
let buf2 =
if config.use_compression { decompress(buf) } else { buf };

Ok(LogRead::Inline(header, buf2, inline_len))
Ok(LogRead::Inline(header, buf, inline_len))
}
MessageKind::BatchManifest => {
assert_eq!(buf.len(), std::mem::size_of::<Lsn>());
Expand Down
26 changes: 0 additions & 26 deletions src/pagecache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,32 +230,6 @@ pub(crate) const fn u32_to_arr(number: u32) -> [u8; 4] {
number.to_le_bytes()
}

#[allow(clippy::needless_pass_by_value)]
#[allow(clippy::needless_return)]
pub(in crate::pagecache) fn decompress(in_buf: Vec<u8>) -> Vec<u8> {
#[cfg(feature = "compression")]
{
use zstd::stream::decode_all;

let scootable_in_buf = &mut &*in_buf;
let raw: IVec = IVec::deserialize(scootable_in_buf)
.expect("this had to be serialized with an extra length frame");
#[cfg(feature = "metrics")]
let _measure = Measure::new(&M.decompress);
let out_buf = decode_all(&raw[..]).expect(
"failed to decompress data. \
This is not expected, please open an issue on \
https://github.com/spacejam/sled so we can \
fix this critical issue ASAP. Thank you :)",
);

return out_buf;
}

#[cfg(not(feature = "compression"))]
in_buf
}

#[derive(Debug, Clone, Copy)]
pub struct NodeView<'g>(pub(crate) PageView<'g>);

Expand Down
3 changes: 1 addition & 2 deletions src/pagecache/reservation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl<'a> Reservation<'a> {

/// Refills the reservation buffer with new data.
/// Must supply a buffer of an identical length
/// as the one initially provided. Don't use this
/// on messages subject to compression etc...
/// as the one initially provided.
///
/// # Panics
///
Expand Down
Loading