From 981eddf15e4f99aa574a3500b4170a07f552abbd Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 29 Jul 2025 02:28:59 +0300 Subject: [PATCH 1/4] corrections + tests --- .../src/worktable/generator/queries/update.rs | 6 + .../worktable/generator/table/index_fns.rs | 4 +- tests/worktable/unsized_.rs | 335 ++++++++++++++++++ 3 files changed, 344 insertions(+), 1 deletion(-) diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index f75178d4..912af22a 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -246,6 +246,12 @@ impl Generator { #full_row_lock }; + let link = self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let row_old = self.0.data.select(link)?; let mut row_new = row_old.clone(); let pk = row_old.get_primary_key().clone(); diff --git a/codegen/src/worktable/generator/table/index_fns.rs b/codegen/src/worktable/generator/table/index_fns.rs index be638e8b..b316e738 100644 --- a/codegen/src/worktable/generator/table/index_fns.rs +++ b/codegen/src/worktable/generator/table/index_fns.rs @@ -84,6 +84,7 @@ impl Generator { .ok_or(syn::Error::new(i.span(), "Row not found"))?; let fn_name = Ident::new(format!("select_by_{i}").as_str(), Span::mixed_site()); let field_ident = &idx.name; + let row_field_ident = &idx.field; let by = if is_float(type_.to_string().as_str()) { quote! { &OrderedFloat(by) @@ -103,7 +104,8 @@ impl Generator { let rows = self.0.indexes.#field_ident .get(#by) .into_iter() - .filter_map(|(_, link)| self.0.data.select(*link).ok()); + .filter_map(|(_, link)| self.0.data.select(*link).ok()) + .filter(move |r| &r.#row_field_ident == &by); SelectQueryBuilder::new(rows) } diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index 3683aee7..f0e01c2c 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -305,6 +305,9 @@ worktable! ( update: { ExchangeAndSomeByTest(exchange, some_string) by test, ExchangeAndSomeById(exchange, some_string) by id, + ExchangeAgainById(exchange) by id, + SomeById(some_string) by id, + AnotherById(another) by id, ExchangeAndSomeByAnother(exchange, some_string) by another, SomeOtherByExchange(some_string, other_srting) by exchange, } @@ -513,3 +516,335 @@ async fn test_update_many_strings_by_string() { assert!(empty_links.contains(&first_link)); assert!(empty_links.contains(&second_link)) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn update_parallel_more_strings() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + let s_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..100 { + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{i}"), + some_string: format!("some_{i}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h = tokio::spawn(async move { + for _ in 0..2000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..=99); + shared + .update_exchange_again_by_id( + ExchangeAgainByIdQuery { + exchange: format!("test_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + for _ in 0..2000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..=99); + table + .update_some_by_id( + SomeByIdQuery { + some_string: format!("some_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = s_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("some_{val}")) + .or_insert(format!("some_{val}")); + } + } + h.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.exchange, e) + } + for (id, s) in s_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.some_string, s) + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn update_parallel_more_strings_more_threads() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + let s_state = Arc::new(Mutex::new(HashMap::new())); + let a_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..100 { + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{i}"), + some_string: format!("some_{i}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h1 = tokio::spawn(async move { + for _ in 0..2000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..=99); + shared + .update_exchange_again_by_id( + ExchangeAgainByIdQuery { + exchange: format!("test_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + let shared = table.clone(); + let shared_t_state = a_state.clone(); + let h2 = tokio::spawn(async move { + for _ in 0..5000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..=99); + shared + .update_another_by_id(AnotherByIdQuery { another: val }, id_to_update.into()) + .await + .unwrap(); + { + let mut guard = shared_t_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = val) + .or_insert(val); + } + } + }); + for _ in 0..2000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..=99); + table + .update_some_by_id( + SomeByIdQuery { + some_string: format!("some_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = s_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("some_{val}")) + .or_insert(format!("some_{val}")); + } + } + h1.await.unwrap(); + h2.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.exchange, e) + } + for (id, s) in s_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.some_string, s) + } + for (id, a) in a_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.another, a) + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn update_parallel_more_strings_with_select_non_unique() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + let a_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h1 = tokio::spawn(async move { + for _ in 0..5_000 { + let val = fastrand::u8(0..100); + let id_to_update = fastrand::u64(0..1000); + shared + .update_exchange_again_by_id( + ExchangeAgainByIdQuery { + exchange: format!("test_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + let shared = table.clone(); + let shared_t_state = a_state.clone(); + let h2 = tokio::spawn(async move { + for _ in 0..10_000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..1000); + shared + .update_another_by_id(AnotherByIdQuery { another: val }, id_to_update.into()) + .await + .unwrap(); + { + let mut guard = shared_t_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = val) + .or_insert(val); + } + } + }); + for _ in 0..20_000 { + let val = fastrand::u8(0..100); + let vals = table + .select_by_exchange(format!("test_{val}")) + .execute() + .unwrap(); + for v in vals { + assert_eq!(v.exchange, format!("test_{val}")) + } + } + h1.await.unwrap(); + h2.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.exchange, e) + } + for (id, a) in a_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.another, a) + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn update_parallel_more_strings_with_select_unique() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + let a_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h1 = tokio::spawn(async move { + for _ in 0..5_000 { + let val = fastrand::u8(0..100); + let id_to_update = fastrand::u64(0..1000); + shared + .update_exchange_again_by_id( + ExchangeAgainByIdQuery { + exchange: format!("test_{val}"), + }, + id_to_update.into(), + ) + .await + .unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + let shared = table.clone(); + let shared_t_state = a_state.clone(); + let h2 = tokio::spawn(async move { + for _ in 0..10_000 { + let val = fastrand::u64(..); + let id_to_update = fastrand::u64(0..1000); + shared + .update_another_by_id(AnotherByIdQuery { another: val }, id_to_update.into()) + .await + .unwrap(); + { + let mut guard = shared_t_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = val) + .or_insert(val); + } + } + }); + for _ in 0..20_000 { + let val = fastrand::i64(0..1000); + let res = table.select_by_test(val); + assert!(res.is_some()) + } + h1.await.unwrap(); + h2.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.exchange, e) + } + for (id, a) in a_state.lock_arc().iter() { + let row = table.select((*id).into()).unwrap(); + assert_eq!(&row.another, a) + } +} From 091ec3a8ae220862eda50246a7c2e3df47f1154b Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 29 Jul 2025 22:51:51 +0300 Subject: [PATCH 2/4] corrections --- codegen/src/worktable/generator/queries/delete.rs | 6 ------ codegen/src/worktable/generator/queries/update.rs | 8 +------- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index eeb954da..dc77b13a 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -45,12 +45,6 @@ impl Generator { #full_row_lock }; - let link = self.0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - #delete_logic lock.unlock(); // Releases locks diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 912af22a..2caf8126 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -246,13 +246,7 @@ impl Generator { #full_row_lock }; - let link = self.0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - - let row_old = self.0.data.select(link)?; + let row_old = self.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); let pk = row_old.get_primary_key().clone(); #(#row_updates)* From 41ed957771641379db2e34bf1c1d85d1c071643c Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 29 Jul 2025 22:53:08 +0300 Subject: [PATCH 3/4] fix clippy for codegen --- codegen/src/worktable/parser/attribute.rs | 11 ++++---- codegen/src/worktable/parser/columns.rs | 31 +++++++++++------------ codegen/src/worktable/parser/name.rs | 9 +++---- 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/codegen/src/worktable/parser/attribute.rs b/codegen/src/worktable/parser/attribute.rs index a722cf80..47b10d0c 100644 --- a/codegen/src/worktable/parser/attribute.rs +++ b/codegen/src/worktable/parser/attribute.rs @@ -43,14 +43,13 @@ impl Parser { #[cfg(test)] mod tests { - use proc_macro2::TokenStream; use quote::quote; use crate::worktable::Parser; #[test] fn test_empty() { - let tokens = TokenStream::from(quote! {}); + let tokens = quote! {}; let mut parser = Parser::new(tokens); let empty = parser.parse_persist(); assert!(empty.is_ok()); @@ -59,7 +58,7 @@ mod tests { #[test] fn test_literal_field() { - let tokens = TokenStream::from(quote! {"nme": TestName,}); + let tokens = quote! {"nme": TestName,}; let mut parser = Parser::new(tokens); let name = parser.parse_persist(); assert!(name.is_err()); @@ -67,7 +66,7 @@ mod tests { #[test] fn test_persistence() { - let tokens = TokenStream::from(quote! {persist: true,}); + let tokens = quote! {persist: true,}; let mut parser = Parser::new(tokens); let name = parser.parse_persist(); assert!(name.is_ok()); @@ -76,7 +75,7 @@ mod tests { #[test] fn test_wrong_field() { - let tokens = TokenStream::from(quote! {nme: TestName,}); + let tokens = quote! {nme: TestName,}; let mut parser = Parser::new(tokens); let name = parser.parse_persist(); assert!(name.is_ok()); @@ -85,7 +84,7 @@ mod tests { #[test] fn test_no_comma() { - let tokens = TokenStream::from(quote! {name: TestName}); + let tokens = quote! {name: TestName}; let mut parser = Parser::new(tokens); let name = parser.parse_persist(); assert!(name.is_ok()); diff --git a/codegen/src/worktable/parser/columns.rs b/codegen/src/worktable/parser/columns.rs index ca8d2316..2ec55e24 100644 --- a/codegen/src/worktable/parser/columns.rs +++ b/codegen/src/worktable/parser/columns.rs @@ -130,17 +130,16 @@ impl Parser { mod tests { use std::collections::HashMap; - use proc_macro2::TokenStream; use quote::quote; use crate::worktable::Parser; #[test] fn test_columns_parse() { - let tokens = TokenStream::from(quote! {columns: { + let tokens = quote! {columns: { id: i64 primary_key, test: u64, - }}); + }}; let mut parser = Parser::new(tokens); let columns = parser.parse_columns(); @@ -160,10 +159,10 @@ mod tests { #[test] fn test_columns_parse_no_last_comma() { - let tokens = TokenStream::from(quote! {columns: { + let tokens = quote! {columns: { id: i64 primary_key, test: u64 - }}); + }}; let mut parser = Parser::new(tokens); let columns = parser.parse_columns(); @@ -183,10 +182,10 @@ mod tests { #[test] fn test_columns_parse_optional() { - let tokens = TokenStream::from(quote! {columns: { + let tokens = quote! {columns: { id: i64 primary_key, test: u64 optional, - }}); + }}; let mut parser = Parser::new(tokens); let columns = parser.parse_columns(); @@ -208,11 +207,11 @@ mod tests { #[test] fn test_columns_parse_three() { - let tokens = TokenStream::from(quote! {columns: { + let tokens = quote! {columns: { id: i64 primary_key, test: u64, a: u64 - }}); + }}; let mut parser = Parser::new(tokens); let columns = parser.parse_columns(); @@ -232,10 +231,10 @@ mod tests { #[test] fn test_columns_parse_no_primary_key() { - let tokens = TokenStream::from(quote! {columns: { + let tokens = quote! {columns: { id: i64, test: u64 - }}); + }}; let mut parser = Parser::new(tokens); let columns = parser.parse_columns(); @@ -247,7 +246,7 @@ mod tests { #[test] fn test_row_parse() { - let row_tokens = TokenStream::from(quote! {id: i64 primary_key,}); + let row_tokens = quote! {id: i64 primary_key,}; let mut parser = Parser::new(row_tokens); let row = parser.parse_row(); @@ -262,7 +261,7 @@ mod tests { #[test] fn test_row_parse_no_comma() { - let row_tokens = TokenStream::from(quote! {id: i64 primary_key TreeIndex}); + let row_tokens = quote! {id: i64 primary_key TreeIndex}; let mut parser = Parser::new(row_tokens); let row = parser.parse_row(); @@ -277,7 +276,7 @@ mod tests { #[test] fn test_row_parse_no_primary_key() { - let row_tokens = TokenStream::from(quote! {id: i64,}); + let row_tokens = quote! {id: i64,}; let mut parser = Parser::new(row_tokens); let row = parser.parse_row(); @@ -292,7 +291,7 @@ mod tests { #[test] fn test_row_parse_no_primary_key_no_comma() { - let row_tokens = TokenStream::from(quote! {id: i64}); + let row_tokens = quote! {id: i64}; let mut parser = Parser::new(row_tokens); let row = parser.parse_row(); @@ -307,7 +306,7 @@ mod tests { #[test] fn test_row_parse_optional() { - let row_tokens = TokenStream::from(quote! {id: i64 optional}); + let row_tokens = quote! {id: i64 optional}; let mut parser = Parser::new(row_tokens); let row = parser.parse_row(); diff --git a/codegen/src/worktable/parser/name.rs b/codegen/src/worktable/parser/name.rs index 6f9484bf..9cc449ba 100644 --- a/codegen/src/worktable/parser/name.rs +++ b/codegen/src/worktable/parser/name.rs @@ -44,14 +44,13 @@ impl Parser { #[cfg(test)] mod tests { - use proc_macro2::TokenStream; use quote::quote; use crate::worktable::Parser; #[test] fn test_name_parse() { - let tokens = TokenStream::from(quote! {name: TestName,}); + let tokens = quote! {name: TestName,}; let mut parser = Parser::new(tokens); let name = parser.parse_name(); @@ -64,7 +63,7 @@ mod tests { #[test] fn test_empty() { - let tokens = TokenStream::from(quote! {}); + let tokens = quote! {}; let mut parser = Parser::new(tokens); let name = parser.parse_name(); @@ -74,7 +73,7 @@ mod tests { #[test] fn test_literal_field() { - let tokens = TokenStream::from(quote! {"nme": TestName,}); + let tokens = quote! {"nme": TestName,}; let mut parser = Parser::new(tokens); let name = parser.parse_name(); @@ -84,7 +83,7 @@ mod tests { #[test] fn test_wrong_field() { - let tokens = TokenStream::from(quote! {nme: TestName,}); + let tokens = quote! {nme: TestName,}; let mut parser = Parser::new(tokens); let name = parser.parse_name(); From 202a637e41ff94138c116ecc0f542360b19888b7 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 29 Jul 2025 22:57:36 +0300 Subject: [PATCH 4/4] bump --- Cargo.toml | 4 ++-- codegen/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9658ec1c..be463780 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.6.12" +version = "0.6.13" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "0.6.12" } +worktable_codegen = { path = "codegen", version = "0.6.13" } eyre = "0.6.12" derive_more = { version = "1.0.0", features = ["from", "error", "display", "into"] } diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 6c80721c..be0b8f39 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.6.12" +version = "0.6.13" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate"