Skip to content

Commit

Permalink
Merge pull request #769 from spacejam/tyler_0.26
Browse files Browse the repository at this point in the history
cut 0.26
  • Loading branch information
spacejam authored Aug 19, 2019
2 parents 2811bd5 + ed5fb80 commit 4e5370f
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 55 deletions.
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# 0.26

## New Features

* Transactions! You may now call `Tree::transaction` and
perform reads, writes, and deletes within a provided
closure with a `TransactionalTree` argument. This
closure may be called multiple times if the transaction
encounters a concurrent update in the process of its
execution. Transactions may also be used on tuples of
`Tree` objects, where the closure will then be
parameterized on `TransactionalTree` instances providing
access to each of the provided `Tree` instances. This
allows you to atomically read and modify multiple
`Tree` instances in a single atomic operation.
These transactions are serializable, fully ACID,
and optimistic.
* `Tree::apply_batch` allows you to apply a `Batch`
* `TransactionalTree::apply_batch` allow you to
apply a `Batch` from within a transaction.

## Breaking Changes

* `Tree::batch` has been removed. Now you can directly
create a `Batch` with `Batch::default()` and then apply
it to a `Tree` with `Tree::apply_batch` or during a
transaction using `TransactionalTree::apply_batch`.
This facilitates multi-`Tree` batches via transactions.
* `Event::Merge` has been removed, and `Tree::merge` will
now send a complete `Event::Set` item to be distributed
to all listening subscribers.
2 changes: 1 addition & 1 deletion crates/pagecache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pagecache"
version = "0.18.0"
version = "0.19.0"
authors = ["Tyler Neely <[email protected]>"]
description = "lock-free pagecache and log for high-performance databases"
license = "MIT/Apache-2.0"
Expand Down
4 changes: 2 additions & 2 deletions crates/sled/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "sled"
version = "0.25.0"
version = "0.26.0"
authors = ["Tyler Neely <[email protected]>"]
description = "a modern embedded database"
license = "MIT/Apache-2.0"
Expand All @@ -25,7 +25,7 @@ measure_allocs = ["pagecache/measure_allocs"]
check_snapshot_integrity = ["pagecache/check_snapshot_integrity"]

[dependencies]
pagecache = { path = "../pagecache", version = "0.18" }
pagecache = { path = "../pagecache", version = "0.19" }
serde_bytes = "0.11"
parking_lot = "0.9.0"

Expand Down
48 changes: 24 additions & 24 deletions crates/sled/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,30 +83,6 @@ fn run(
if !shutdown.is_running() {
break;
}
// we had no dirty data to flush,
// so we can spend a little effort
// cleaning up the file. try not to
// spend more than half of our sleep
// time rewriting pages though.
while before.elapsed() < flush_every / 2 {
match pagecache.attempt_gc() {
Err(e) => {
error!(
"failed to clean file from async flush thread: {}",
e
);

#[cfg(feature = "failpoints")]
pagecache.set_failpoint(e);

*shutdown = ShutdownState::ShutDown;
sc.notify_all();
return;
}
Ok(false) => break,
Ok(true) => {}
}
}
}
Ok(_) => {
wrote_data = true;
Expand All @@ -127,6 +103,30 @@ fn run(
}
}

// so we can spend a little effort
// cleaning up the segments. try not to
// spend more than half of our sleep
// time rewriting pages though.
while shutdown.is_running() && before.elapsed() < flush_every / 2 {
match pagecache.attempt_gc() {
Err(e) => {
error!(
"failed to clean file from periodic flush thread: {}",
e
);

#[cfg(feature = "failpoints")]
pagecache.set_failpoint(e);

*shutdown = ShutdownState::ShutDown;
sc.notify_all();
return;
}
Ok(false) => break,
Ok(true) => {}
}
}

let sleep_duration = flush_every
.checked_sub(before.elapsed())
.unwrap_or(Duration::from_millis(1));
Expand Down
25 changes: 11 additions & 14 deletions crates/sled/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ static ID_GEN: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum Event {
/// A new complete (key, value) pair
Set(Vec<u8>, IVec),
/// A new partial (key, merged value) pair
Merge(Vec<u8>, IVec),
Set(Arc<[u8]>, IVec),
/// A deleted key
Del(Vec<u8>),
Del(Arc<[u8]>),
}

impl Event {
/// Return a reference to the key that this `Event` refers to
pub fn key(&self) -> &[u8] {
match self {
Event::Set(k, ..) | Event::Merge(k, ..) | Event::Del(k) => &*k,
Event::Set(k, ..) | Event::Del(k) => &*k,
}
}
}
Expand All @@ -41,7 +39,6 @@ impl Clone for Event {

match self {
Set(k, v) => Set(k.clone(), v.clone()),
Merge(k, v) => Merge(k.clone(), v.clone()),
Del(k) => Del(k.clone()),
}
}
Expand Down Expand Up @@ -181,31 +178,31 @@ fn basic_subscription() {

let mut s1 = subs.register([].to_vec());

let k2 = vec![];
let k2: Arc<[u8]> = vec![].into();
let r2 = subs.reserve(&k2).unwrap();
r2.complete(Event::Set(k2.clone(), IVec::from(k2.clone())));

let k3 = vec![0];
let k3: Arc<[u8]> = vec![0].into();
let r3 = subs.reserve(&k3).unwrap();
r3.complete(Event::Set(k3.clone(), IVec::from(k3.clone())));

let k4 = vec![0, 1];
let k4: Arc<[u8]> = vec![0, 1].into();
let r4 = subs.reserve(&k4).unwrap();
r4.complete(Event::Del(k4.clone()));

let k5 = vec![0, 1, 2];
let k5: Arc<[u8]> = vec![0, 1, 2].into();
let r5 = subs.reserve(&k5).unwrap();
r5.complete(Event::Merge(k5.clone(), IVec::from(k5.clone())));
r5.complete(Event::Set(k5.clone(), IVec::from(k5.clone())));

let k6 = vec![1, 1, 2];
let k6: Arc<[u8]> = vec![1, 1, 2].into();
let r6 = subs.reserve(&k6).unwrap();
r6.complete(Event::Del(k6.clone()));

let k7 = vec![1, 1, 2];
let k7: Arc<[u8]> = vec![1, 1, 2].into();
let r7 = subs.reserve(&k7).unwrap();
drop(r7);

let k8 = vec![1, 2, 2];
let k8: Arc<[u8]> = vec![1, 2, 2].into();
let r8 = subs.reserve(&k8).unwrap();
r8.complete(Event::Set(k8.clone(), IVec::from(k8.clone())));

Expand Down
60 changes: 46 additions & 14 deletions crates/sled/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl Tree {
// success
if let Some(res) = subscriber_reservation.take() {
let event =
subscription::Event::Set(key.as_ref().to_vec(), value);
subscription::Event::Set(key.as_ref().into(), value);

res.complete(event);
}
Expand Down Expand Up @@ -409,7 +409,7 @@ impl Tree {
if link.is_ok() {
// success
if let Some(res) = subscriber_reservation.take() {
let event = subscription::Event::Del(key.as_ref().to_vec());
let event = subscription::Event::Del(key.as_ref().into());

res.complete(event);
}
Expand Down Expand Up @@ -503,9 +503,9 @@ impl Tree {
if link.is_ok() {
if let Some(res) = subscriber_reservation.take() {
let event = if let Some(new) = new {
subscription::Event::Set(key.as_ref().to_vec(), new)
subscription::Event::Set(key.as_ref().into(), new)
} else {
subscription::Event::Del(key.as_ref().to_vec())
subscription::Event::Del(key.as_ref().into())
};

res.complete(event);
Expand Down Expand Up @@ -676,8 +676,7 @@ impl Tree {
/// // events is a blocking `Iterator` over `Event`s
/// for event in events.take(1) {
/// match event {
/// Event::Set(key, value) => assert_eq!(key, vec![0]),
/// Event::Merge(key, partial_value) => {}
/// Event::Set(key, value) => assert_eq!(key.as_ref(), &[0]),
/// Event::Del(key) => {}
/// }
/// }
Expand Down Expand Up @@ -881,15 +880,48 @@ impl Tree {

let merge_operator = merge_operator_opt.unwrap();

let key = key.as_ref();
let mut current = self.get(key)?;

loop {
let tmp = current.as_ref().map(AsRef::as_ref);
let next = merge_operator(key, tmp, value.as_ref()).map(IVec::from);
match self.cas::<_, _, IVec>(key, tmp, next.clone())? {
Ok(()) => return Ok(next),
Err(new_current) => current = new_current,
let guard = pin();
let View { ptr, pid, node, .. } =
self.node_for_key(key.as_ref(), &guard)?;

let (encoded_key, current_value) =
if let Some((k, v)) = node.leaf_pair_for_key(key.as_ref()) {
(k.clone(), Some(v.clone()))
} else {
let k = prefix_encode(&node.lo, key.as_ref());
let old_v = None;
(k, old_v)
};

let tmp = current_value.as_ref().map(AsRef::as_ref);
let new = merge_operator(key.as_ref(), tmp, value.as_ref())
.map(IVec::from);

let mut subscriber_reservation = self.subscriptions.reserve(&key);

let frag = if let Some(ref new) = new {
Frag::Set(encoded_key, new.clone())
} else {
Frag::Del(encoded_key)
};
let link = self.context.pagecache.link(pid, ptr, frag, &guard)?;

if link.is_ok() {
if let Some(res) = subscriber_reservation.take() {
let event = if let Some(new) = &new {
subscription::Event::Set(
key.as_ref().into(),
new.clone(),
)
} else {
subscription::Event::Del(key.as_ref().into())
};

res.complete(event);
}

return Ok(new);
}
M.tree_looped();
}
Expand Down

0 comments on commit 4e5370f

Please sign in to comment.