Skip to content

Commit

Permalink
Inline a simplified varint implementation (#82)
Browse files Browse the repository at this point in the history
The maintainer of the `varmint` crate we're using is moving on and would
like to deprecate the crate [[1]], so let's inline a simplified version
for now. I think we should revisit our serialization format soon anyway,
but this will unblock the `varmint` maintainer.

[1]: mycorrhiza/varmint-rs#8
  • Loading branch information
jamesbornholt authored Sep 26, 2022
1 parent d4b1c28 commit 5f7b519
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 11 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ rand_pcg = "0.2.1"
scoped-tls = "1.0.0"
smallvec = "1.6.1"
tracing = { version = "0.1.21", default-features = false, features = ["std"] }
varmint = "0.1.3"

[dev-dependencies]
criterion = { version = "0.3.4", features = ["html_reports"] }
Expand Down
89 changes: 81 additions & 8 deletions src/scheduler/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,76 @@
use crate::runtime::task::TaskId;
use crate::scheduler::{Schedule, ScheduleStep};
use bitvec::prelude::*;
use varmint::*;

/// A simplified version of the deprecated [`varmint`](https://github.com/mycorrhiza/varmint-rs)
/// crate, used under the MIT license.
mod varint {
pub fn space_needed(val: u64) -> usize {
let used_bits = u64::min_value().leading_zeros() - val.leading_zeros();
std::cmp::max((used_bits + 6) as usize / 7, 1)
}

pub trait WriteVarInt {
fn write_u64_varint(&mut self, val: u64) -> std::io::Result<()>;
}

impl<R: std::io::Write> WriteVarInt for R {
fn write_u64_varint(&mut self, mut val: u64) -> std::io::Result<()> {
loop {
let current = (val & 0x7F) as u8;
val >>= 7;
if val == 0 {
self.write_all(&[current])?;
return Ok(());
} else {
self.write_all(&[current | 0x80])?;
}
}
}
}

pub trait ReadVarInt {
fn read_u64_varint(&mut self) -> std::io::Result<u64>;
}

fn read_u8<R: std::io::Read>(reader: &mut R) -> std::io::Result<u8> {
let mut buffer = [0u8];
reader.read_exact(&mut buffer)?;
Ok(buffer[0])
}

impl<R: std::io::Read> ReadVarInt for R {
fn read_u64_varint(&mut self) -> std::io::Result<u64> {
let first = read_u8(self)?;
if first & 0x80 == 0 {
return Ok(u64::from(first));
}

let mut result = u64::from(first & 0x7F);
let mut offset = 7;

loop {
let current = read_u8(self)?;
result += u64::from(current & 0x7F) << offset;
if current & 0x80 == 0 {
return Ok(result);
}
offset += 7;
if offset == 63 {
let last = read_u8(self)?;
if last == 0x01 {
return Ok(result + (1 << offset));
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"varint exceeded 64 bits long",
));
}
}
}
}
}
}

// The serialization format is this:
// [task id bitwidth] [number of schedule steps] [seed] [step]*
Expand All @@ -18,6 +87,8 @@ use varmint::*;
const SCHEDULE_MAGIC_V2: u8 = 0x91;

pub(crate) fn serialize_schedule(schedule: &Schedule) -> String {
use self::varint::{space_needed, WriteVarInt};

let &max_task_id = schedule
.steps
.iter()
Expand Down Expand Up @@ -47,20 +118,22 @@ pub(crate) fn serialize_schedule(schedule: &Schedule) -> String {
}

let mut buf = Vec::with_capacity(
1 + len_usize_varint(task_id_bits)
+ len_usize_varint(schedule.len())
+ len_u64_varint(schedule.seed)
1 + space_needed(task_id_bits as u64)
+ space_needed(schedule.len() as u64)
+ space_needed(schedule.seed)
+ encoded.len(),
);
buf.push(SCHEDULE_MAGIC_V2);
buf.write_usize_varint(task_id_bits).unwrap();
buf.write_usize_varint(schedule.len()).unwrap();
buf.write_u64_varint(task_id_bits as u64).unwrap();
buf.write_u64_varint(schedule.len() as u64).unwrap();
buf.write_u64_varint(schedule.seed).unwrap();
buf.extend(encoded.as_raw_slice());
hex::encode(buf)
}

pub(crate) fn deserialize_schedule(str: &str) -> Option<Schedule> {
use self::varint::ReadVarInt;

let bytes = hex::decode(str).ok()?;

let version = bytes[0];
Expand All @@ -69,8 +142,8 @@ pub(crate) fn deserialize_schedule(str: &str) -> Option<Schedule> {
}
let mut bytes = &bytes[1..];

let task_id_bits = bytes.read_usize_varint().ok()?;
let schedule_len = bytes.read_usize_varint().ok()?;
let task_id_bits = bytes.read_u64_varint().ok()? as usize;
let schedule_len = bytes.read_u64_varint().ok()? as usize;
let seed = bytes.read_u64_varint().ok()?;

let encoded = BitSlice::<_, Lsb0>::from_slice(bytes);
Expand Down
2 changes: 1 addition & 1 deletion src/sync/atomic/bool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl AtomicBool {
success: Ordering,
failure: Ordering,
) -> Result<bool, bool> {
self.fetch_update(success, failure, |val| (val == current).then(|| new))
self.fetch_update(success, failure, |val| (val == current).then_some(new))
}

/// Stores a value into the atomic boolean if the current value is the same as the
Expand Down
2 changes: 1 addition & 1 deletion src/sync/atomic/ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<T> AtomicPtr<T> {
success: Ordering,
failure: Ordering,
) -> Result<*mut T, *mut T> {
self.fetch_update(success, failure, |val| (val == current).then(|| new))
self.fetch_update(success, failure, |val| (val == current).then_some(new))
}

/// Stores a value into the atomic pointer if the current value is the same as the
Expand Down

0 comments on commit 5f7b519

Please sign in to comment.