Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.5.5"
version = "0.5.6"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -19,20 +19,24 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement
eyre = "0.6.12"
derive_more = { version = "1.0.0", features = ["from", "error", "display", "into"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1.40"
tracing = "0.1"
rkyv = { version = "0.8.9", features = ["uuid-1"] }
lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.5.3" }
worktable_codegen = { path = "codegen", version = "0.5.5" }
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4"] }
data_bucket = "0.2.2"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
data_bucket = "0.2.3"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "indexset-version-update" }
# data_bucket = { path = "../DataBucket", version = "0.2.2" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
indexset = { version = "0.12.0", features = ["concurrent", "cdc", "multimap"] }
# indexset = { path = "../indexset", version = "0.11.3", features = ["concurrent", "cdc", "multimap"] }
indexset = { version = "0.12.2", features = ["concurrent", "cdc", "multimap"] }
# indexset = { path = "../indexset", version = "0.12.0", features = ["concurrent", "cdc", "multimap"] }
# indexset = { git = "https://github.com/Handy-caT/indexset", branch = "multimap-range-fix", version = "0.12.0", features = ["concurrent", "cdc", "multimap"] }
convert_case = "0.6.0"
ordered-float = "5.0.0"
serde = { version = "1.0.215", features = ["derive"] }
prettytable-rs = "^0.10"

[dev-dependencies]
rand = "0.9.1"
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.5.4"
version = "0.5.5"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand Down
1 change: 0 additions & 1 deletion codegen/src/persist_table/generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::name_generator::WorktableNameGenerator;

pub use space_file::WT_INDEX_EXTENSION;

mod size_measurable;
mod space;
mod space_file;

Expand Down
18 changes: 0 additions & 18 deletions codegen/src/persist_table/generator/size_measurable.rs

This file was deleted.

2 changes: 0 additions & 2 deletions codegen/src/persist_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ pub fn expand(input: TokenStream) -> syn::Result<TokenStream> {
let space_file_def = generator.gen_space_file_def();
let persistence_engine = generator.get_persistence_engine_type();
let persistence_task = generator.get_persistence_task_type();
let size_measurable_impl = generator.gen_size_measurable_impl()?;

Ok(quote! {
#size_measurable_impl
#space_file_def
#persistence_engine
#persistence_task
Expand Down
1 change: 1 addition & 0 deletions codegen/src/worktable/generator/primary_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl Generator {
PartialEq,
PartialOrd,
Ord,
SizeMeasure,
#unsized_derive
)]
#[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))]
Expand Down
32 changes: 5 additions & 27 deletions codegen/src/worktable/generator/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,11 @@ impl Generator {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let ident = name_generator.get_row_type_ident();

let rows: Vec<_> = self
.columns
.columns_map
.iter()
.map(|(name, type_)| {
if type_.to_string().contains("OrderedFloat") {
let inner_type = type_.to_string();
let mut split = inner_type.split("<");
let _ = split.next();
let inner_type = split
.next()
.expect("OrderedFloat def contains inner type")
.to_uppercase()
.replace(">", "");
let ident = Ident::new(
format!("Ordered{}Def", inner_type.trim()).as_str(),
Span::call_site(),
);
quote! {
#[rkyv(with = #ident)]
pub #name: #type_,
}
} else {
quote! {pub #name: #type_,}
}
})
.collect();
let mut rows = vec![quote! {}; self.columns.field_positions.len()];
for (i, pos) in &self.columns.field_positions {
let type_ = self.columns.columns_map.get(i).unwrap();
rows[*pos] = quote! {pub #i: #type_,}
}

quote! {
#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)]
Expand Down
34 changes: 16 additions & 18 deletions codegen/src/worktable/generator/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,6 @@ impl Generator {
let persistence_task = name_generator.get_persistence_task_ident();
let lock_ident = name_generator.get_lock_type_ident();

let derive = if self.is_persist {
quote! {
#[derive(Debug, PersistTable)]
}
} else {
quote! {
#[derive(Debug)]
}
};
let persist_type_part = if self.is_persist {
quote! {
, PersistenceConfig
Expand All @@ -91,27 +82,35 @@ impl Generator {
})
.collect::<Vec<_>>();
let pk_types_unsized = is_unsized_vec(pk_types);
let node_type = if pk_types_unsized {
quote! {
UnsizedNode<IndexPair<#primary_key_type, Link>>
let derive = if self.is_persist {
if pk_types_unsized {
quote! {
#[derive(Debug, PersistTable)]
#[table(pk_unsized)]
}
} else {
quote! {
#[derive(Debug, PersistTable)]
}
}
} else {
quote! {
Vec<IndexPair<#primary_key_type, Link>>
#[derive(Debug)]
}
};
let derive_attrs = if pk_types_unsized {
let node_type = if pk_types_unsized {
quote! {
#[table(pk_unsized)]
UnsizedNode<IndexPair<#primary_key_type, Link>>
}
} else {
quote! {}
quote! {
Vec<IndexPair<#primary_key_type, Link>>
}
};

if self.config.as_ref().and_then(|c| c.page_size).is_some() {
quote! {
#derive
#derive_attrs
pub struct #ident(
WorkTable<
#row_type,
Expand All @@ -129,7 +128,6 @@ impl Generator {
} else {
quote! {
#derive
#derive_attrs
pub struct #ident(
WorkTable<
#row_type,
Expand Down
26 changes: 17 additions & 9 deletions codegen/src/worktable/generator/table/select_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,22 @@ impl Generator {
quote! {
#(#variants)*
}
});
}).collect::<Vec<_>>();

let range = if range_matches.is_empty() {
quote! {}
} else {
quote! {
if !self.params.range.is_empty() {
for (range, column) in &self.params.range {
iter = match (column, range.clone().into()) {
#(#range_matches)*
_ => unreachable!(),
};
}
}
}
};

quote! {
impl<I> SelectQueryExecutor<#row_type, I, #column_range_type, #row_fields_ident>
Expand All @@ -174,14 +189,7 @@ impl Generator {
fn execute(self) -> Result<Vec<#row_type>, WorkTableError> {
let mut iter: Box<dyn DoubleEndedIterator<Item = #row_type>> = Box::new(self.iter);

if !self.params.range.is_empty() {
for (range, column) in &self.params.range {
iter = match (column, range.clone().into()) {
#(#range_matches)*
_ => unreachable!(),
};
}
}
#range

if !self.params.order.is_empty() {
let mut items: Vec<#row_type> = iter.collect();
Expand Down
6 changes: 5 additions & 1 deletion codegen/src/worktable/model/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn is_sized(ident: &Ident) -> bool {
pub struct Columns {
pub is_sized: bool,
pub columns_map: HashMap<Ident, TokenStream>,
pub field_positions: HashMap<Ident, usize>,
pub indexes: HashMap<Ident, Index>,
pub primary_keys: Vec<Ident>,
pub generator_type: GeneratorType,
Expand All @@ -31,11 +32,12 @@ pub struct Row {
impl Columns {
pub fn try_from_rows(rows: Vec<Row>, input: &TokenStream) -> syn::Result<Self> {
let mut columns_map = HashMap::new();
let mut field_positions = HashMap::new();
let mut sized = true;
let mut pk = vec![];
let mut gen_type = None;

for row in rows {
for (pos, row) in rows.into_iter().enumerate() {
let type_ = &row.type_;
if sized {
sized = is_sized(type_)
Expand All @@ -46,6 +48,7 @@ impl Columns {
quote! { #type_ }
};
columns_map.insert(row.name.clone(), type_);
field_positions.insert(row.name.clone(), pos);

if row.is_primary_key {
if let Some(t) = gen_type {
Expand All @@ -69,6 +72,7 @@ impl Columns {
indexes: Default::default(),
primary_keys: pk,
generator_type: gen_type.expect("set"),
field_positions,
})
}
}
19 changes: 18 additions & 1 deletion src/persistence/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::fmt::Debug;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;

use tokio::sync::Notify;
Expand All @@ -10,6 +11,7 @@ use crate::prelude::Operation;
pub struct Queue<PrimaryKeyGenState, PrimaryKey, SecondaryKeys> {
queue: lockfree::queue::Queue<Operation<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>>,
notify: Notify,
len: AtomicU16,
}

impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
Expand All @@ -19,18 +21,21 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
Self {
queue: lockfree::queue::Queue::new(),
notify: Notify::new(),
len: AtomicU16::new(0),
}
}

pub fn push(&self, value: Operation<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>) {
self.queue.push(value);
self.len.fetch_add(1, Ordering::Relaxed);
self.notify.notify_one();
}

pub async fn pop(&self) -> Operation<PrimaryKeyGenState, PrimaryKey, SecondaryKeys> {
loop {
// Drain values
if let Some(value) = self.queue.pop() {
self.len.fetch_sub(1, Ordering::Relaxed);
return value;
}

Expand All @@ -42,7 +47,16 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
pub fn immediate_pop(
&self,
) -> Option<Operation<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>> {
self.queue.pop()
if let Some(v) = self.queue.pop() {
self.len.fetch_sub(1, Ordering::Relaxed);
Some(v)
} else {
None
}
}

pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed) as usize
}
}

Expand Down Expand Up @@ -81,6 +95,7 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
engine_progress_notify.notify_waiters();
engine_queue.pop().await
};
tracing::debug!("Applying operation {:?}", next_op);
let res = engine.apply_operation(next_op).await;
if let Err(err) = res {
tracing::warn!("{}", err);
Expand All @@ -96,6 +111,8 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
}

pub async fn wait_for_ops(&self) {
let count = self.queue.len();
tracing::info!("Waiting for {} operations", count);
self.progress_notify.notified().await
}
}
Loading
Loading