Skip to content

Commit

Permalink
0.20 (#595)
Browse files Browse the repository at this point in the history
* rustfmt

* Disable async io on racy test

* Bump sled to 0.20 and pagecache to 0.13
  • Loading branch information
spacejam authored Mar 25, 2019
1 parent c4413aa commit 6ce3e12
Show file tree
Hide file tree
Showing 45 changed files with 682 additions and 1,551 deletions.
2 changes: 1 addition & 1 deletion crates/pagecache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pagecache"
version = "0.12.0"
version = "0.13.0"
authors = ["Tyler Neely <[email protected]>"]
description = "lock-free pagecache and log for high-performance databases"
license = "MIT/Apache-2.0"
Expand Down
27 changes: 6 additions & 21 deletions crates/pagecache/src/blob_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,12 @@ use std::io::{Read, Write};

use super::*;

pub(crate) fn read_blob(
blob_ptr: Lsn,
config: &Config,
) -> Result<Vec<u8>> {
pub(crate) fn read_blob(blob_ptr: Lsn, config: &Config) -> Result<Vec<u8>> {
let path = config.blob_path(blob_ptr);
let f_res = std::fs::OpenOptions::new().read(true).open(&path);

if let Err(e) = &f_res {
debug!(
"failed to open file for blob read at {}: {:?}",
blob_ptr, e
);
debug!("failed to open file for blob read at {}: {:?}", blob_ptr, e);
}

let mut f = f_res?;
Expand All @@ -23,8 +17,7 @@ pub(crate) fn read_blob(
if let Err(e) = f.read_exact(&mut crc_expected_bytes) {
debug!(
"failed to read the initial CRC bytes in the blob at {}: {:?}",
blob_ptr,
e,
blob_ptr, e,
);
return Err(e.into());
}
Expand All @@ -35,8 +28,7 @@ pub(crate) fn read_blob(
if let Err(e) = f.read_to_end(&mut buf) {
debug!(
"failed to read data after the CRC bytes in blob at {}: {:?}",
blob_ptr,
e,
blob_ptr, e,
);
return Err(e.into());
}
Expand All @@ -54,11 +46,7 @@ pub(crate) fn read_blob(
}
}

pub(crate) fn write_blob(
config: &Config,
id: Lsn,
data: &[u8],
) -> Result<()> {
pub(crate) fn write_blob(config: &Config, id: Lsn, data: &[u8]) -> Result<()> {
let path = config.blob_path(id);
let mut f = std::fs::OpenOptions::new()
.write(true)
Expand All @@ -76,10 +64,7 @@ pub(crate) fn write_blob(
.map_err(|e| e.into())
}

pub(crate) fn gc_blobs(
config: &Config,
stable_lsn: Lsn,
) -> Result<()> {
pub(crate) fn gc_blobs(config: &Config, stable_lsn: Lsn) -> Result<()> {
let stable = config.blob_path(stable_lsn);
let blob_dir = stable.parent().unwrap();
let blobs = std::fs::read_dir(blob_dir)?;
Expand Down
98 changes: 37 additions & 61 deletions crates/pagecache/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,7 @@ impl ConfigBuilder {

/// Set the merge operator that can be relied on during merges in
/// the `PageCache`.
pub fn merge_operator(
mut self,
mo: MergeOperator,
) -> ConfigBuilder {
pub fn merge_operator(mut self, mo: MergeOperator) -> ConfigBuilder {
self.merge_operator = Some(mo as usize);
self
}
Expand All @@ -176,23 +173,19 @@ impl ConfigBuilder {
// only validate, setup directory, and open file once
self.validate().unwrap();

if self.temporary && self.path == PathBuf::from(DEFAULT_PATH)
{
if self.temporary && self.path == PathBuf::from(DEFAULT_PATH) {
#[cfg(unix)]
let salt = {
static SALT_COUNTER: AtomicUsize =
AtomicUsize::new(0);
static SALT_COUNTER: AtomicUsize = AtomicUsize::new(0);
let pid = unsafe { libc::getpid() };
((pid as u64) << 32)
+ SALT_COUNTER.fetch_add(1, Ordering::SeqCst)
as u64
+ SALT_COUNTER.fetch_add(1, Ordering::SeqCst) as u64
};

#[cfg(not(unix))]
let salt = {
let now = uptime();
(now.as_secs() * 1_000_000_000)
+ u64::from(now.subsec_nanos())
(now.as_secs() * 1_000_000_000) + u64::from(now.subsec_nanos())
};

// use shared memory for temporary linux files
Expand All @@ -215,9 +208,7 @@ impl ConfigBuilder {

let tp = rayon::ThreadPoolBuilder::new()
.num_threads(self.async_io_threads)
.thread_name(move |id| {
format!("sled_io_{}_{:?}", id, path)
})
.thread_name(move |id| format!("sled_io_{}_{:?}", id, path))
.start_handler(move |_id| {
start_threads.fetch_add(1, SeqCst);
})
Expand Down Expand Up @@ -358,8 +349,7 @@ impl ConfigBuilder {
}

if !dir.exists() {
let res: std::io::Result<()> =
std::fs::create_dir_all(dir);
let res: std::io::Result<()> = std::fs::create_dir_all(dir);
res.map_err(|e: std::io::Error| {
let ret: Error = e.into();
ret
Expand Down Expand Up @@ -394,27 +384,29 @@ impl ConfigBuilder {
match self.read_config() {
Ok(Some(old)) => {
if old.merge_operator.is_some() {
supported!(self.merge_operator.is_some(),
supported!(
self.merge_operator.is_some(),
"this system was previously opened with a \
merge operator. must supply one FOREVER after \
choosing to do so once, BWAHAHAHAHAHAHA!!!!");
merge operator. must supply one FOREVER after \
choosing to do so once, BWAHAHAHAHAHAHA!!!!"
);
}

supported!(
self.use_compression == old.use_compression,
format!("cannot change compression values across restarts. \
old value of use_compression loaded from disk: {}, \
currently set value: {}.",
old.use_compression,
self.use_compression,
format!(
"cannot change compression values across restarts. \
old value of use_compression loaded from disk: {}, \
currently set value: {}.",
old.use_compression, self.use_compression,
)
);

supported!(
self.io_buf_size == old.io_buf_size,
format!(
"cannot change the io buffer size across restarts. \
please change it back to {}",
please change it back to {}",
old.io_buf_size
)
);
Expand Down Expand Up @@ -449,13 +441,10 @@ impl ConfigBuilder {
fn read_config(&self) -> std::io::Result<Option<ConfigBuilder>> {
let path = self.config_path();

let f_res =
std::fs::OpenOptions::new().read(true).open(&path);
let f_res = std::fs::OpenOptions::new().read(true).open(&path);

let mut f = match f_res {
Err(ref e)
if e.kind() == std::io::ErrorKind::NotFound =>
{
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(None);
}
Err(other) => {
Expand Down Expand Up @@ -562,9 +551,7 @@ impl Drop for Config {
drop(thread_pool);

while self.threads.load(SeqCst) != 0 {
std::thread::sleep(std::time::Duration::from_millis(
50,
));
std::thread::sleep(std::time::Duration::from_millis(50));
}
debug!("threadpool drained");

Expand Down Expand Up @@ -625,40 +612,35 @@ impl Config {

// returns the snapshot file paths for this system
#[doc(hidden)]
pub fn get_snapshot_files(
&self,
) -> std::io::Result<Vec<PathBuf>> {
pub fn get_snapshot_files(&self) -> std::io::Result<Vec<PathBuf>> {
let mut prefix = self.snapshot_prefix();

prefix.push("snap.");

let abs_prefix: PathBuf = if Path::new(&prefix).is_absolute()
{
let abs_prefix: PathBuf = if Path::new(&prefix).is_absolute() {
prefix
} else {
let mut abs_path = std::env::current_dir()?;
abs_path.push(prefix.clone());
abs_path
};

let filter =
|dir_entry: std::io::Result<std::fs::DirEntry>| {
if let Ok(de) = dir_entry {
let path_buf = de.path();
let path = path_buf.as_path();
let path_str = &*path.to_string_lossy();
if path_str
.starts_with(&*abs_prefix.to_string_lossy())
&& !path_str.ends_with(".in___motion")
{
Some(path.to_path_buf())
} else {
None
}
let filter = |dir_entry: std::io::Result<std::fs::DirEntry>| {
if let Ok(de) = dir_entry {
let path_buf = de.path();
let path = path_buf.as_path();
let path_str = &*path.to_string_lossy();
if path_str.starts_with(&*abs_prefix.to_string_lossy())
&& !path_str.ends_with(".in___motion")
{
Some(path.to_path_buf())
} else {
None
}
};
} else {
None
}
};

let snap_dir = Path::new(&abs_prefix).parent().unwrap();

Expand All @@ -673,13 +655,7 @@ impl Config {
pub fn verify_snapshot<PM, P>(&self) -> Result<()>
where
PM: Materializer<PageFrag = P>,
P: 'static
+ Debug
+ Clone
+ Serialize
+ DeserializeOwned
+ Send
+ Sync,
P: 'static + Debug + Clone + Serialize + DeserializeOwned + Send + Sync,
{
debug!("generating incremental snapshot");

Expand Down
9 changes: 2 additions & 7 deletions crates/pagecache/src/debug_delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ pub fn debug_delay() {
let mut rng = if let Some(rng) = try_thread_rng() {
rng
} else {
warn!(
"already destroyed TLS when this debug delay was called"
);
warn!("already destroyed TLS when this debug delay was called");
return;
};

Expand Down Expand Up @@ -77,10 +75,7 @@ impl RngCore for ThreadRng {
unsafe { (*self.rng).fill_bytes(dest) }
}

fn try_fill_bytes(
&mut self,
dest: &mut [u8],
) -> Result<(), rand::Error> {
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
self.fill_bytes(dest);
Ok(())
}
Expand Down
14 changes: 2 additions & 12 deletions crates/pagecache/src/diskptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@ use super::LogReader;

/// A pointer to a location on disk or an off-log blob.
#[derive(
Debug,
Clone,
PartialOrd,
Ord,
Copy,
Eq,
PartialEq,
Serialize,
Deserialize,
Debug, Clone, PartialOrd, Ord, Copy, Eq, PartialEq, Serialize, Deserialize,
)]
pub enum DiskPtr {
/// Points to a value stored in the single-file log.
Expand Down Expand Up @@ -48,9 +40,7 @@ impl DiskPtr {
pub(crate) fn inline(&self) -> LogId {
match self {
DiskPtr::Inline(l) => *l,
DiskPtr::Blob(_, _) => {
panic!("inline called on Blob disk pointer")
}
DiskPtr::Blob(_, _) => panic!("inline called on Blob disk pointer"),
}
}

Expand Down
5 changes: 1 addition & 4 deletions crates/pagecache/src/ds/dll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,7 @@ impl Dll {
}
}

pub(crate) unsafe fn pop_ptr(
&mut self,
ptr: *mut Node,
) -> PageId {
pub(crate) unsafe fn pop_ptr(&mut self, ptr: *mut Node) -> PageId {
self.len -= 1;

let mut node = Box::from_raw(ptr);
Expand Down
6 changes: 1 addition & 5 deletions crates/pagecache/src/ds/lru.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ impl Shard {
}
}

fn accessed(
&mut self,
rel_idx: PageId,
sz: usize,
) -> Vec<PageId> {
fn accessed(&mut self, rel_idx: PageId, sz: usize) -> Vec<PageId> {
if self.entries.len() <= rel_idx {
self.entries.resize(rel_idx + 1, Entry::default());
}
Expand Down
9 changes: 3 additions & 6 deletions crates/pagecache/src/ds/pagetable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,10 @@ fn traverse<'g, T: 'static + Send>(
let mut l2_ptr = l1[l1k].load(SeqCst, guard);

if l2_ptr.is_null() {
let next_child =
Owned::new(Node2::default()).into_shared(guard);
let next_child = Owned::new(Node2::default()).into_shared(guard);

debug_delay();
let ret = l1[l1k]
.compare_and_set(l2_ptr, next_child, SeqCst, guard);
let ret = l1[l1k].compare_and_set(l2_ptr, next_child, SeqCst, guard);

match ret {
Ok(_) => {
Expand Down Expand Up @@ -205,8 +203,7 @@ where
fn drop(&mut self) {
unsafe {
let head =
self.head.load(Relaxed, &unprotected()).as_raw()
as usize;
self.head.load(Relaxed, &unprotected()).as_raw() as usize;
drop(Box::from_raw(head as *mut Node1<T>));
}
}
Expand Down
Loading

0 comments on commit 6ce3e12

Please sign in to comment.