Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.6.13"
version = "0.6.14"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
worktable_codegen = { path = "codegen", version = "0.6.13" }
worktable_codegen = { path = "codegen", version = "0.6.14" }

eyre = "0.6.12"
derive_more = { version = "1.0.0", features = ["from", "error", "display", "into"] }
Expand Down
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.6.13"
version = "0.6.14"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand Down
7 changes: 7 additions & 0 deletions codegen/src/name_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ impl WorktableNameGenerator {
Ident::new(format!("{}Wrapper", self.name).as_str(), Span::mixed_site())
}

pub fn get_archived_wrapper_type_ident(&self) -> Ident {
Ident::new(
format!("Archived{}Wrapper", self.name).as_str(),
Span::mixed_site(),
)
}

pub fn get_lock_type_ident(&self) -> Ident {
Ident::new(format!("{}Lock", self.name).as_str(), Span::mixed_site())
}
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/queries/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl Generator {
pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> {
let rows_to_update = self.0.indexes.#index.get(#by).map(|kv| kv.1).collect::<Vec<_>>();
for link in rows_to_update {
let row = self.0.data.select(*link).map_err(WorkTableError::PagesError)?;
let row = self.0.data.select_non_ghosted(*link).map_err(WorkTableError::PagesError)?;
self.delete(row.id.into()).await?;
}
core::result::Result::Ok(())
Expand All @@ -201,7 +201,7 @@ impl Generator {
pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> {
let row_to_update = self.0.indexes.#index.get(#by).map(|v| v.get().value);
if let Some(link) = row_to_update {
let row = self.0.data.select(link).map_err(WorkTableError::PagesError)?;
let row = self.0.data.select_non_ghosted(link).map_err(WorkTableError::PagesError)?;
self.delete(row.id.into()).await?;
}
core::result::Result::Ok(())
Expand Down
2 changes: 1 addition & 1 deletion codegen/src/worktable/generator/queries/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Generator {
{
let iter = self.0.pk_map
.iter()
.filter_map(|(_, link)| self.0.data.select(*link).ok());
.filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok());

SelectQueryBuilder::new(iter)
}
Expand Down
10 changes: 5 additions & 5 deletions codegen/src/worktable/generator/queries/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Generator {
let lock = {
#full_row_lock
};
let row_old = self.0.data.select(link)?;
let row_old = self.0.data.select_non_ghosted(link)?;
self.reinsert(row_old, row)?;

lock.unlock();
Expand Down Expand Up @@ -295,7 +295,7 @@ impl Generator {
let avt_type_ident = name_generator.get_available_type_ident();
let diff_container = if idx_idents.is_some() {
quote! {
let row_old = self.0.data.select(link)?;
let row_old = self.0.data.select_non_ghosted(link)?;
let row_new = row.clone();
let updated_bytes: Vec<u8> = vec![];
let mut diffs: std::collections::HashMap<&str, Difference<#avt_type_ident>> = std::collections::HashMap::new();
Expand Down Expand Up @@ -514,7 +514,7 @@ impl Generator {

let mut locks = std::collections::HashMap::new();
for link in links.iter() {
let pk = self.0.data.select(*link)?.get_primary_key().clone();
let pk = self.0.data.select_non_ghosted(*link)?.get_primary_key().clone();
let op_lock = {
#custom_lock
};
Expand All @@ -525,7 +525,7 @@ impl Generator {
let mut pk_to_unlock: std::collections::HashMap<_, std::sync::Arc<Lock>> = std::collections::HashMap::new();
let op_id = OperationId::Multi(uuid::Uuid::now_v7());
for link in links.into_iter() {
let pk = self.0.data.select(link)?.get_primary_key().clone();
let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone();
let mut bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&row)
.map_err(|_| WorkTableError::SerializeError)?;

Expand Down Expand Up @@ -610,7 +610,7 @@ impl Generator {
.get(#by)
.map(|kv| kv.get().value)
.ok_or(WorkTableError::NotFound)?;
let pk = self.0.data.select(link)?.get_primary_key().clone();
let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone();

let lock = {
#custom_lock
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/table/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl Generator {
return Ok(())
};

let data = self.0.data.select(link).map_err(WorkTableError::PagesError)?;
let data = self.0.data.select_non_ghosted(link).map_err(WorkTableError::PagesError)?;
#func

let mut ind = false;
Expand All @@ -258,7 +258,7 @@ impl Generator {
}
};
if let Some((key, link)) = next {
let data = self.0.data.select(link).map_err(WorkTableError::PagesError)?;
let data = self.0.data.select_non_ghosted(link).map_err(WorkTableError::PagesError)?;
#func
k = key
} else {
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/table/index_fns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl Generator {
Ok(quote! {
pub fn #fn_name(&self, by: #type_) -> Option<#row_ident> {
let link = self.0.indexes.#field_ident.get(#by).map(|kv| kv.get().value)?;
self.0.data.select(link).ok()
self.0.data.select_non_ghosted(link).ok()
}
})
}
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Generator {
let rows = self.0.indexes.#field_ident
.get(#by)
.into_iter()
.filter_map(|(_, link)| self.0.data.select(*link).ok())
.filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok())
.filter(move |r| &r.#row_field_ident == &by);

SelectQueryBuilder::new(rows)
Expand Down
23 changes: 22 additions & 1 deletion codegen/src/worktable/generator/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ impl Generator {
let type_ = self.gen_wrapper_type();
let impl_ = self.gen_wrapper_impl();
let storable_impl = self.get_wrapper_storable_impl();
let ghost_wrapper_impl = self.get_wrapper_ghost_impl();

quote! {
#type_
#impl_
#storable_impl
#ghost_wrapper_impl
}
}

Expand All @@ -26,6 +28,7 @@ impl Generator {
#[repr(C)]
pub struct #wrapper_ident {
inner: #row_ident,
is_ghosted: bool,
is_deleted: bool,
}
}
Expand All @@ -43,10 +46,15 @@ impl Generator {
self.inner
}

fn is_ghosted(&self) -> bool {
self.is_ghosted
}

fn from_inner(inner: #row_ident) -> Self {
Self {
inner,
is_deleted: Default::default(),
is_ghosted: true,
is_deleted: false,
}
}
}
Expand All @@ -64,4 +72,17 @@ impl Generator {
}
}
}

fn get_wrapper_ghost_impl(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_ident = name_generator.get_archived_wrapper_type_ident();

quote! {
impl GhostWrapper for #row_ident {
fn unghost(&mut self) {
self.is_ghosted = false;
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/in_memory/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<Row, const DATA_LENGTH: usize> Data<Row, DATA_LENGTH> {
}

/// Error that can appear on [`Data`] page operations.
#[derive(Copy, Clone, Debug, Display, Error)]
#[derive(Copy, Clone, Debug, Display, Error, PartialEq)]
pub enum ExecutionError {
/// Error of trying to save row in [`Data`] page with not enough space left.
#[display("need {}, but {} left", need, left)]
Expand Down
2 changes: 1 addition & 1 deletion src/in_memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod row;

pub use data::{Data, ExecutionError as DataExecutionError, DATA_INNER_LENGTH};
pub use pages::{DataPages, ExecutionError as PagesExecutionError};
pub use row::{RowWrapper, StorableRow};
pub use row::{GhostWrapper, RowWrapper, StorableRow};
80 changes: 75 additions & 5 deletions src/in_memory/pages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,27 @@ where
Ok(gen_row.get_inner())
}

pub fn select_non_ghosted(&self, link: Link) -> Result<Row, ExecutionError>
where
Row: Archive
+ for<'a> Serialize<
Strategy<Serializer<AlignedVec, ArenaHandle<'a>, Share>, rkyv::rancor::Error>,
>,
<<Row as StorableRow>::WrappedRow as Archive>::Archived: Portable
+ Deserialize<<Row as StorableRow>::WrappedRow, HighDeserializer<rkyv::rancor::Error>>,
{
let pages = self.pages.read().unwrap();
let page = pages
// - 1 is used because page ids are starting from 1.
.get(page_id_mapper(link.page_id.into()))
.ok_or(ExecutionError::PageNotFound(link.page_id))?;
let gen_row = page.get_row(link).map_err(ExecutionError::DataPageError)?;
if gen_row.is_ghosted() {
return Err(ExecutionError::Ghosted);
}
Ok(gen_row.get_inner())
}

#[cfg_attr(
feature = "perf_measurements",
performance_measurement(prefix_name = "DataPages")
Expand Down Expand Up @@ -332,28 +353,31 @@ where
}
}

#[derive(Debug, Display, Error, From)]
#[derive(Debug, Display, Error, From, PartialEq)]
pub enum ExecutionError {
DataPageError(DataExecutionError),

PageNotFound(#[error(not(source))] PageId),

Locked,

Ghosted,
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;

use crate::in_memory::pages::DataPages;
use crate::in_memory::row::GeneralRow;
use crate::in_memory::StorableRow;
use rkyv::with::{AtomicLoad, Relaxed};
use rkyv::{Archive, Deserialize, Serialize};

use crate::in_memory::pages::DataPages;
use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow};

#[derive(
Archive, Copy, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize,
)]
Expand All @@ -362,6 +386,41 @@ mod tests {
b: u64,
}

/// General `Row` wrapper that is used to append general data for every `Inner`
/// `Row`.
#[derive(Archive, Deserialize, Debug, Serialize)]
pub struct GeneralRow<Inner> {
/// Inner generic `Row`.
pub inner: Inner,

/// Indicator for ghosted rows.
#[rkyv(with = AtomicLoad<Relaxed>)]
pub is_ghosted: AtomicBool,

/// Indicator for deleted rows.
#[rkyv(with = AtomicLoad<Relaxed>)]
pub deleted: AtomicBool,
}

impl<Inner> RowWrapper<Inner> for GeneralRow<Inner> {
fn get_inner(self) -> Inner {
self.inner
}

fn is_ghosted(&self) -> bool {
self.is_ghosted.load(Ordering::Relaxed)
}

/// Creates new [`GeneralRow`] from `Inner`.
fn from_inner(inner: Inner) -> Self {
Self {
inner,
is_ghosted: AtomicBool::new(true),
deleted: AtomicBool::new(false),
}
}
}

impl StorableRow for TestRow {
type WrappedRow = GeneralRow<TestRow>;
}
Expand Down Expand Up @@ -404,6 +463,17 @@ mod tests {
assert_eq!(res, row)
}

#[test]
fn select_non_ghosted() {
let pages = DataPages::<TestRow>::new();

let row = TestRow { a: 10, b: 20 };
let link = pages.insert(row).unwrap();
let res = pages.select_non_ghosted(link);
assert!(res.is_err());
assert_eq!(res.err(), Some(PagesExecutionError::Ghosted))
}

#[test]
fn update() {
let pages = DataPages::<TestRow>::new();
Expand Down
32 changes: 4 additions & 28 deletions src/in_memory/row.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::fmt::Debug;
use std::sync::atomic::AtomicBool;

use rkyv::with::{AtomicLoad, Relaxed};
use rkyv::{Archive, Deserialize, Serialize};
use rkyv::Archive;

/// Common trait for the `Row`s that can be stored on the [`Data`] page.
///
Expand All @@ -13,32 +11,10 @@ pub trait StorableRow {

pub trait RowWrapper<Inner> {
fn get_inner(self) -> Inner;

fn is_ghosted(&self) -> bool;
fn from_inner(inner: Inner) -> Self;
}

/// General `Row` wrapper that is used to append general data for every `Inner`
/// `Row`.
#[derive(Archive, Deserialize, Debug, Serialize)]
pub struct GeneralRow<Inner> {
/// Inner generic `Row`.
pub inner: Inner,

/// Indicator for deleted rows.
#[rkyv(with = AtomicLoad<Relaxed>)]
pub deleted: AtomicBool,
}

impl<Inner> RowWrapper<Inner> for GeneralRow<Inner> {
fn get_inner(self) -> Inner {
self.inner
}

/// Creates new [`GeneralRow`] from `Inner`.
fn from_inner(inner: Inner) -> Self {
Self {
inner,
deleted: AtomicBool::new(false),
}
}
pub trait GhostWrapper {
fn unghost(&mut self);
}
2 changes: 1 addition & 1 deletion src/index/unsized_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ where
}

fn need_to_split(&self, _: usize) -> bool {
self.length >= self.length_capacity
self.length >= self.length_capacity && self.inner.len() > 1
}

fn len(&self) -> usize {
Expand Down
Loading
Loading