diff --git a/Cargo.toml b/Cargo.toml index 63b001c4..7f9070d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.5.5" +version = "0.5.6" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -19,20 +19,24 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement eyre = "0.6.12" derive_more = { version = "1.0.0", features = ["from", "error", "display", "into"] } tokio = { version = "1", features = ["full"] } -tracing = "0.1.40" +tracing = "0.1" rkyv = { version = "0.8.9", features = ["uuid-1"] } lockfree = { version = "0.5.1" } -worktable_codegen = { path = "codegen", version = "0.5.3" } +worktable_codegen = { path = "codegen", version = "0.5.5" } futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4"] } -data_bucket = "0.2.2" -# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" } +data_bucket = "0.2.3" +# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "indexset-version-update" } # data_bucket = { path = "../DataBucket", version = "0.2.2" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } -indexset = { version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } -# indexset = { path = "../indexset", version = "0.11.3", features = ["concurrent", "cdc", "multimap"] } +indexset = { version = "0.12.2", features = ["concurrent", "cdc", "multimap"] } +# indexset = { path = "../indexset", version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } +# indexset = { git = "https://github.com/Handy-caT/indexset", branch = "multimap-range-fix", version = "0.12.0", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" ordered-float = "5.0.0" serde = { version = "1.0.215", features = ["derive"] } prettytable-rs = "^0.10" + +[dev-dependencies] +rand = "0.9.1" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index b2513920..6249881c 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.5.4" +version = "0.5.5" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/codegen/src/persist_table/generator/mod.rs b/codegen/src/persist_table/generator/mod.rs index afcdc80e..bab3211b 100644 --- a/codegen/src/persist_table/generator/mod.rs +++ b/codegen/src/persist_table/generator/mod.rs @@ -6,7 +6,6 @@ use crate::name_generator::WorktableNameGenerator; pub use space_file::WT_INDEX_EXTENSION; -mod size_measurable; mod space; mod space_file; diff --git a/codegen/src/persist_table/generator/size_measurable.rs b/codegen/src/persist_table/generator/size_measurable.rs deleted file mode 100644 index d09d6e0a..00000000 --- a/codegen/src/persist_table/generator/size_measurable.rs +++ /dev/null @@ -1,18 +0,0 @@ -use proc_macro2::TokenStream; -use quote::quote; - -use crate::persist_table::generator::Generator; - -impl Generator { - pub fn gen_size_measurable_impl(&self) -> syn::Result { - let pk_type = &self.pk_ident; - - Ok(quote! { - impl SizeMeasurable for #pk_type { - fn aligned_size(&self) -> usize { - self.0.aligned_size() - } - } - }) - } -} diff --git a/codegen/src/persist_table/mod.rs b/codegen/src/persist_table/mod.rs index 3376db5b..92c7a36a 100644 --- a/codegen/src/persist_table/mod.rs +++ b/codegen/src/persist_table/mod.rs @@ -23,10 +23,8 @@ pub fn expand(input: TokenStream) -> syn::Result { let space_file_def = generator.gen_space_file_def(); let persistence_engine = generator.get_persistence_engine_type(); let persistence_task = generator.get_persistence_task_type(); - let size_measurable_impl = generator.gen_size_measurable_impl()?; Ok(quote! { - #size_measurable_impl #space_file_def #persistence_engine #persistence_task diff --git a/codegen/src/worktable/generator/primary_key.rs b/codegen/src/worktable/generator/primary_key.rs index 308a7a1e..fca26554 100644 --- a/codegen/src/worktable/generator/primary_key.rs +++ b/codegen/src/worktable/generator/primary_key.rs @@ -80,6 +80,7 @@ impl Generator { PartialEq, PartialOrd, Ord, + SizeMeasure, #unsized_derive )] #[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))] diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index b1d93c35..a9aa445e 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -67,33 +67,11 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let ident = name_generator.get_row_type_ident(); - let rows: Vec<_> = self - .columns - .columns_map - .iter() - .map(|(name, type_)| { - if type_.to_string().contains("OrderedFloat") { - let inner_type = type_.to_string(); - let mut split = inner_type.split("<"); - let _ = split.next(); - let inner_type = split - .next() - .expect("OrderedFloat def contains inner type") - .to_uppercase() - .replace(">", ""); - let ident = Ident::new( - format!("Ordered{}Def", inner_type.trim()).as_str(), - Span::call_site(), - ); - quote! { - #[rkyv(with = #ident)] - pub #name: #type_, - } - } else { - quote! {pub #name: #type_,} - } - }) - .collect(); + let mut rows = vec![quote! {}; self.columns.field_positions.len()]; + for (i, pos) in &self.columns.field_positions { + let type_ = self.columns.columns_map.get(i).unwrap(); + rows[*pos] = quote! {pub #i: #type_,} + } quote! { #[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] diff --git a/codegen/src/worktable/generator/table/mod.rs b/codegen/src/worktable/generator/table/mod.rs index d56dea7c..9761b7ff 100644 --- a/codegen/src/worktable/generator/table/mod.rs +++ b/codegen/src/worktable/generator/table/mod.rs @@ -60,15 +60,6 @@ impl Generator { let persistence_task = name_generator.get_persistence_task_ident(); let lock_ident = name_generator.get_lock_type_ident(); - let derive = if self.is_persist { - quote! { - #[derive(Debug, PersistTable)] - } - } else { - quote! { - #[derive(Debug)] - } - }; let persist_type_part = if self.is_persist { quote! { , PersistenceConfig @@ -91,27 +82,35 @@ impl Generator { }) .collect::>(); let pk_types_unsized = is_unsized_vec(pk_types); - let node_type = if pk_types_unsized { - quote! { - UnsizedNode> + let derive = if self.is_persist { + if pk_types_unsized { + quote! { + #[derive(Debug, PersistTable)] + #[table(pk_unsized)] + } + } else { + quote! { + #[derive(Debug, PersistTable)] + } } } else { quote! { - Vec> + #[derive(Debug)] } }; - let derive_attrs = if pk_types_unsized { + let node_type = if pk_types_unsized { quote! { - #[table(pk_unsized)] + UnsizedNode> } } else { - quote! {} + quote! { + Vec> + } }; if self.config.as_ref().and_then(|c| c.page_size).is_some() { quote! { #derive - #derive_attrs pub struct #ident( WorkTable< #row_type, @@ -129,7 +128,6 @@ impl Generator { } else { quote! { #derive - #derive_attrs pub struct #ident( WorkTable< #row_type, diff --git a/codegen/src/worktable/generator/table/select_executor.rs b/codegen/src/worktable/generator/table/select_executor.rs index 71802a5b..ad2e57a9 100644 --- a/codegen/src/worktable/generator/table/select_executor.rs +++ b/codegen/src/worktable/generator/table/select_executor.rs @@ -149,7 +149,22 @@ impl Generator { quote! { #(#variants)* } - }); + }).collect::>(); + + let range = if range_matches.is_empty() { + quote! {} + } else { + quote! { + if !self.params.range.is_empty() { + for (range, column) in &self.params.range { + iter = match (column, range.clone().into()) { + #(#range_matches)* + _ => unreachable!(), + }; + } + } + } + }; quote! { impl SelectQueryExecutor<#row_type, I, #column_range_type, #row_fields_ident> @@ -174,14 +189,7 @@ impl Generator { fn execute(self) -> Result, WorkTableError> { let mut iter: Box> = Box::new(self.iter); - if !self.params.range.is_empty() { - for (range, column) in &self.params.range { - iter = match (column, range.clone().into()) { - #(#range_matches)* - _ => unreachable!(), - }; - } - } + #range if !self.params.order.is_empty() { let mut items: Vec<#row_type> = iter.collect(); diff --git a/codegen/src/worktable/model/column.rs b/codegen/src/worktable/model/column.rs index 988ed8ec..1610e07a 100644 --- a/codegen/src/worktable/model/column.rs +++ b/codegen/src/worktable/model/column.rs @@ -14,6 +14,7 @@ fn is_sized(ident: &Ident) -> bool { pub struct Columns { pub is_sized: bool, pub columns_map: HashMap, + pub field_positions: HashMap, pub indexes: HashMap, pub primary_keys: Vec, pub generator_type: GeneratorType, @@ -31,11 +32,12 @@ pub struct Row { impl Columns { pub fn try_from_rows(rows: Vec, input: &TokenStream) -> syn::Result { let mut columns_map = HashMap::new(); + let mut field_positions = HashMap::new(); let mut sized = true; let mut pk = vec![]; let mut gen_type = None; - for row in rows { + for (pos, row) in rows.into_iter().enumerate() { let type_ = &row.type_; if sized { sized = is_sized(type_) @@ -46,6 +48,7 @@ impl Columns { quote! { #type_ } }; columns_map.insert(row.name.clone(), type_); + field_positions.insert(row.name.clone(), pos); if row.is_primary_key { if let Some(t) = gen_type { @@ -69,6 +72,7 @@ impl Columns { indexes: Default::default(), primary_keys: pk, generator_type: gen_type.expect("set"), + field_positions, }) } } diff --git a/src/persistence/task.rs b/src/persistence/task.rs index 9329f480..809334fd 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -1,4 +1,5 @@ use std::fmt::Debug; +use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use tokio::sync::Notify; @@ -10,6 +11,7 @@ use crate::prelude::Operation; pub struct Queue { queue: lockfree::queue::Queue>, notify: Notify, + len: AtomicU16, } impl @@ -19,11 +21,13 @@ impl Self { queue: lockfree::queue::Queue::new(), notify: Notify::new(), + len: AtomicU16::new(0), } } pub fn push(&self, value: Operation) { self.queue.push(value); + self.len.fetch_add(1, Ordering::Relaxed); self.notify.notify_one(); } @@ -31,6 +35,7 @@ impl loop { // Drain values if let Some(value) = self.queue.pop() { + self.len.fetch_sub(1, Ordering::Relaxed); return value; } @@ -42,7 +47,16 @@ impl pub fn immediate_pop( &self, ) -> Option> { - self.queue.pop() + if let Some(v) = self.queue.pop() { + self.len.fetch_sub(1, Ordering::Relaxed); + Some(v) + } else { + None + } + } + + pub fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) as usize } } @@ -81,6 +95,7 @@ impl engine_progress_notify.notify_waiters(); engine_queue.pop().await }; + tracing::debug!("Applying operation {:?}", next_op); let res = engine.apply_operation(next_op).await; if let Err(err) = res { tracing::warn!("{}", err); @@ -96,6 +111,8 @@ impl } pub async fn wait_for_ops(&self) { + let count = self.queue.len(); + tracing::info!("Waiting for {} operations", count); self.progress_notify.notified().await } } diff --git a/tests/worktable/bench.rs b/tests/worktable/bench.rs new file mode 100644 index 00000000..3dae7d62 --- /dev/null +++ b/tests/worktable/bench.rs @@ -0,0 +1,84 @@ +use rand::distr::{Alphanumeric, SampleString}; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; +use tokio::sync::RwLock; + +use worktable::prelude::*; +use worktable_codegen::worktable; + +worktable!( + name: Map, + columns: { + id: u64 primary_key, + value: String + }, + queries: { + update: { + ValueById(value) by id, + } + } +); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn rw_lock_hash_map_vs_wt() { + let wt = Arc::new(MapWorkTable::default()); + let hash_map = Arc::new(RwLock::new(HashMap::::default())); + + println!("Inserting..."); + let map_start = Instant::now(); + for i in 0..100u64 { + let mut map = hash_map.write().await; + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + map.insert(i, s); + } + println!("map insert in {} μs", map_start.elapsed().as_micros()); + + let wt_start = Instant::now(); + for i in 0..100 { + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + let row = MapRow { id: i, value: s }; + wt.insert(row).unwrap(); + } + println!("wt insert in {} μs", wt_start.elapsed().as_micros()); + + println!("Updating..."); + let map_start = Instant::now(); + let task_map = hash_map.clone(); + let h = tokio::task::spawn(async move { + for i in 0..100000u64 { + let mut map = task_map.write().await; + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + map.insert((i % 50) * 2, s); + } + }); + for i in 0..100000u64 { + let mut map = hash_map.write().await; + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + map.insert((i % 50) * 2 + 1, s); + } + h.await.unwrap(); + println!("map update in {} μs", map_start.elapsed().as_micros()); + + let wt_start = Instant::now(); + let task_wt = wt.clone(); + let h = tokio::task::spawn(async move { + for i in 0..100000u64 { + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + let q = ValueByIdQuery { value: s }; + task_wt + .update_value_by_id(q, ((i % 50) * 2).into()) + .await + .unwrap(); + } + }); + for i in 0..100000u64 { + let s: String = Alphanumeric.sample_string(&mut rand::rng(), 8); + let q = ValueByIdQuery { value: s }; + wt.update_value_by_id(q, ((i % 50) * 2 + 1).into()) + .await + .unwrap(); + } + h.await.unwrap(); + println!("wt update in {} μs", wt_start.elapsed().as_micros()); +} diff --git a/tests/worktable/mod.rs b/tests/worktable/mod.rs index ce0b1438..827a5934 100644 --- a/tests/worktable/mod.rs +++ b/tests/worktable/mod.rs @@ -1,5 +1,6 @@ mod array; mod base; +mod bench; mod config; mod count; mod custom_pk;