From afa92ccc8e07203d4ab3a6326d1bd54432bb3a04 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Sun, 3 Aug 2025 13:16:55 +0300 Subject: [PATCH 1/6] use new checked insert logic to remove gap if primary key is already exists --- Cargo.toml | 8 +++---- src/index/table_index/cdc.rs | 14 ++++++++++++ src/index/table_index/mod.rs | 9 ++++++++ src/lib.rs | 4 ++++ src/table/mod.rs | 23 ++++++++++++------- tests/worktable/index/insert.rs | 40 +++++++++++++++++++++++++++++++++ 6 files changed, 86 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 41623b1..92a5acf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,14 +27,14 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -data_bucket = "0.2.9" +# data_bucket = "0.2.9" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.2.7" } +data_bucket = { path = "../DataBucket", version = "0.2.9" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } -# indexset = { path = "../indexset", version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } -indexset = { package = "wt-indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } +indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } +# indexset = { package = "wt-indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" ordered-float = "5.0.0" parking_lot = "0.12.3" diff --git a/src/index/table_index/cdc.rs b/src/index/table_index/cdc.rs index 550ad0d..91551fb 100644 --- a/src/index/table_index/cdc.rs +++ b/src/index/table_index/cdc.rs @@ -11,6 +11,7 @@ use crate::{IndexMap, IndexMultiMap}; pub trait TableIndexCdc { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>); + fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>>; #[allow(clippy::type_complexity)] fn remove_cdc( &self, @@ -29,6 +30,15 @@ where (res, evs.into_iter().map(Into::into).collect()) } + fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>> { + let (res, evs) = self.insert_cdc(value, link); + if res.is_some() { + Some(evs.into_iter().map(Into::into).collect()) + } else { + None + } + } + fn remove_cdc( &self, value: T, @@ -48,6 +58,10 @@ where self.insert_cdc(value, link) } + fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>> { + self.checked_insert_cdc(value, link) + } + fn remove_cdc( &self, value: T, diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index b3b4e9f..4ed4d2d 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -14,6 +14,7 @@ pub use cdc::TableIndexCdc; pub trait TableIndex { fn insert(&self, value: T, link: Link) -> Option; + fn insert_checked(&self, value: T, link: Link) -> Option<()>; fn remove(&self, value: T, link: Link) -> Option<(T, Link)>; } @@ -26,6 +27,10 @@ where self.insert(value, link) } + fn insert_checked(&self, value: T, link: Link) -> Option<()> { + self.insert(value, link).map(|_| ()) + } + fn remove(&self, value: T, link: Link) -> Option<(T, Link)> { self.remove(&value, &link) } @@ -40,6 +45,10 @@ where self.insert(value, link) } + fn insert_checked(&self, value: T, link: Link) -> Option<()> { + self.checked_insert(value, link) + } + fn remove(&self, value: T, _: Link) -> Option<(T, Link)> { self.remove(&value) } diff --git a/src/lib.rs b/src/lib.rs index 024281d..76af12b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,3 +62,7 @@ pub mod prelude { pub const WT_INDEX_EXTENSION: &str = ".wt.idx"; pub const WT_DATA_EXTENSION: &str = ".wt.data"; } + +// TODO: +// 1. add checked inserts to indexset to not insert/remove but just insert with violation error +// 2. Add pre-update state storage to avoid ghost reads of updated data if it will be rolled back diff --git a/src/table/mod.rs b/src/table/mod.rs index 4711a34..c3d5949 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -154,8 +154,17 @@ where <::WrappedRow as Archive>::Archived: Deserialize<::WrappedRow, HighDeserializer>, { - let link = self.pk_map.get(&pk).map(|v| v.get().value)?; - self.data.select(link).ok() + let link = self.pk_map.get(&pk).map(|v| v.get().value); + if let Some(link) = link { + self.data.select(link).ok() + } else { + println!( + "{:?} Unavailable in primary index, vals available {:?}", + pk, + self.pk_map.iter().collect::>() + ); + None + } } #[cfg_attr( @@ -185,8 +194,7 @@ where .data .insert(row.clone()) .map_err(WorkTableError::PagesError)?; - if let Some(existed_link) = self.pk_map.insert(pk.clone(), link) { - self.pk_map.insert(pk.clone(), existed_link); + if self.pk_map.checked_insert(pk.clone(), link).is_none() { self.data.delete(link).map_err(WorkTableError::PagesError)?; return Err(WorkTableError::AlreadyExists("Primary".to_string())); }; @@ -249,9 +257,8 @@ where .data .insert_cdc(row.clone()) .map_err(WorkTableError::PagesError)?; - let (exists, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), link); - if let Some(existed_link) = exists { - self.pk_map.insert(pk.clone(), existed_link); + let primary_key_events = self.pk_map.checked_insert_cdc(pk.clone(), link); + if primary_key_events.is_none() { self.data.delete(link).map_err(WorkTableError::PagesError)?; return Err(WorkTableError::AlreadyExists("Primary".to_string())); } @@ -285,7 +292,7 @@ where let op = Operation::Insert(InsertOperation { id: OperationId::Single(Uuid::now_v7()), pk_gen_state: self.pk_gen.get_state(), - primary_key_events, + primary_key_events: primary_key_events.expect("should be checked before for existence"), secondary_keys_events: indexes_res.expect("was checked before"), bytes, link, diff --git a/tests/worktable/index/insert.rs b/tests/worktable/index/insert.rs index 85a7faf..6fb741a 100644 --- a/tests/worktable/index/insert.rs +++ b/tests/worktable/index/insert.rs @@ -233,3 +233,43 @@ fn insert_when_unique_violated() { h.join().unwrap(); } + +#[test] +fn insert_when_pk_violated() { + let table = Arc::new(TestWorkTable::default()); + + let row = TestRow { + id: table.get_next_pk().into(), + val: 13, + attr1: "Attribute".to_string(), + attr2: -128, + attr3: 123456789, + attr4: "Attribute4".to_string(), + }; + let _ = table.insert(row.clone()).unwrap(); + + let id = row.id; + + let shared = table.clone(); + let h = thread::spawn(move || { + for _ in 0..5_000 { + let row = TestRow { + id, + val: 13, + attr1: "Attribute".to_string(), + attr2: 128, + attr3: 123456789, + attr4: "Attribute__4".to_string(), + }; + assert!(shared.insert(row).is_err()); + } + }); + + for _ in 0..5000 { + let sel_row = table.select(id); + assert!(sel_row.is_some()); + assert_eq!(sel_row, Some(row.clone())) + } + + h.join().unwrap(); +} From 5e28caa4155119638482bea57a5624c1ecb9aa98 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 4 Aug 2025 00:57:04 +0300 Subject: [PATCH 2/6] add stability tests --- codegen/src/persist_table/generator/space_file/mod.rs | 1 + src/table/mod.rs | 9 ++++++--- src/table/system_info.rs | 2 +- tests/worktable/index/insert.rs | 2 ++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index 34876e5..07a8619 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -199,6 +199,7 @@ impl Generator { indexes, pk_gen: PrimaryKeyGeneratorState::from_state(self.data_info.inner.pk_gen_state), lock_map: LockMap::default(), + update_state: IndexMap::default(), table_name: "", pk_phantom: std::marker::PhantomData, }; diff --git a/src/table/mod.rs b/src/table/mod.rs index c3d5949..f0d943e 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -41,7 +41,7 @@ pub struct WorkTable< const DATA_LENGTH: usize = INNER_PAGE_SIZE, > where PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, - Row: StorableRow, + Row: StorableRow + Send + Clone + 'static, PkNodeType: NodeLike> + Send + 'static, { pub data: DataPages, @@ -54,6 +54,8 @@ pub struct WorkTable< pub lock_map: LockMap, + pub update_state: IndexMap, + pub table_name: &'static str, pub pk_phantom: PhantomData<(AvailableTypes, AvailableIndexes)>, @@ -87,7 +89,7 @@ where SecondaryIndexes: Default, PkGen: Default, PkNodeType: NodeLike> + Send + 'static, - Row: StorableRow, + Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, { fn default() -> Self { @@ -97,6 +99,7 @@ where indexes: SecondaryIndexes::default(), pk_gen: Default::default(), lock_map: LockMap::default(), + update_state: IndexMap::default(), table_name: "", pk_phantom: PhantomData, } @@ -129,7 +132,7 @@ where Row: TableRow, PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, PkNodeType: NodeLike> + Send + 'static, - Row: StorableRow, + Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, { pub fn get_next_pk(&self) -> PrimaryKey diff --git a/src/table/system_info.rs b/src/table/system_info.rs index ab43b9b..4238a78 100644 --- a/src/table/system_info.rs +++ b/src/table/system_info.rs @@ -69,7 +69,7 @@ impl< > where PrimaryKey: Debug + Clone + Ord + Send + 'static + std::hash::Hash, - Row: StorableRow, + Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, NodeType: NodeLike> + Send + 'static, SecondaryIndexes: MemStat + TableSecondaryIndexInfo, diff --git a/tests/worktable/index/insert.rs b/tests/worktable/index/insert.rs index 6fb741a..833c05d 100644 --- a/tests/worktable/index/insert.rs +++ b/tests/worktable/index/insert.rs @@ -224,6 +224,8 @@ fn insert_when_unique_violated() { }); for _ in 0..5000 { + let sel_row = table.select(row.id); + assert_eq!(sel_row, Some(row.clone())); let attr_1_rows = table.select_by_attr1(row.attr1.clone()).execute().unwrap(); assert_eq!(attr_1_rows.len(), 1); assert_eq!(attr_1_rows.first().unwrap(), &row); From f8b9a7e52ee5838db315560ac48b1c078fddbeec Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 4 Aug 2025 02:09:26 +0300 Subject: [PATCH 3/6] fix reinsert logic --- codegen/src/worktable/generator/index/cdc.rs | 36 ++++-- .../src/worktable/generator/index/usual.rs | 26 +++- .../src/worktable/generator/queries/update.rs | 27 ++++- src/in_memory/pages.rs | 1 + src/index/table_secondary_index/cdc.rs | 2 +- src/index/table_secondary_index/mod.rs | 10 +- src/table/mod.rs | 114 +++++++++++++----- tests/worktable/index/update_full.rs | 65 ++++++++++ 8 files changed, 230 insertions(+), 51 deletions(-) diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index 92eec50..97da367 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -49,15 +49,14 @@ impl Generator { let index_variant: TokenStream = camel_case_name.parse().unwrap(); quote! { - let (exists, events) = self.#index_field_name.insert_cdc(row.#i.clone(), link); - if let Some(link) = exists { - self.#index_field_name.insert_cdc(row.#i, link); + let #index_field_name = if let Some(events) = self.#index_field_name.insert_checked_cdc(row.#i.clone(), link) { + events.into_iter().map(|ev| ev.into()).collect() + } else { return Err(IndexError::AlreadyExists { at: #available_index_ident::#index_variant, inserted_already: inserted_indexes.clone(), }); - } - let #index_field_name = events.into_iter().map(|ev| ev.into()).collect(); + }; } }) .collect::>(); @@ -86,6 +85,7 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_type_ident = name_generator.get_row_type_ident(); let events_ident = name_generator.get_space_secondary_index_events_ident(); + let available_index_ident = name_generator.get_available_indexes_ident(); let reinsert_rows = self .columns @@ -93,6 +93,12 @@ impl Generator { .iter() .map(|(i, idx)| { let index_field_name = &idx.name; + let camel_case_name = index_field_name + .to_string() + .from_case(Case::Snake) + .to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + let remove = if idx.is_unique { quote! { if row_new.#i != row_old.#i { @@ -107,8 +113,14 @@ impl Generator { } }; quote! { - let (_, events) = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new); - let mut #index_field_name: Vec<_> = events.into_iter().map(|ev| ev.into()).collect(); + let mut #index_field_name: Vec<_> = if let Some(events) = self.#index_field_name.insert_checked_cdc(row_new.#i.clone(), link_new) { + events.into_iter().map(|ev| ev.into()).collect() + } else { + return Err(IndexError::AlreadyExists { + at: #available_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }); + }; #remove } }) @@ -121,7 +133,15 @@ impl Generator { .collect::>(); quote! { - fn reinsert_row_cdc(&self, row_old: #row_type_ident, link_old: Link, row_new: #row_type_ident, link_new: Link) -> eyre::Result<#events_ident> { + fn reinsert_row_cdc( + &self, + row_old: #row_type_ident, + link_old: Link, + row_new: #row_type_ident, + link_new: Link + ) -> Result<#events_ident, IndexError<#available_index_ident>> { + let mut inserted_indexes: Vec<#available_index_ident> = vec![]; + #(#reinsert_rows)* core::result::Result::Ok( #events_ident { diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index 21943eb..6dd713a 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -68,8 +68,7 @@ impl Generator { } }; quote! { - if let Some(link) = self.#index_field_name.insert(#row.clone(), link) { - self.#index_field_name.insert(#row, link); + if self.#index_field_name.insert_checked(#row.clone(), link).is_none() { return Err(IndexError::AlreadyExists { at: #available_index_ident::#index_variant, inserted_already: inserted_indexes.clone(), @@ -92,6 +91,7 @@ impl Generator { fn gen_reinsert_row_index_fn(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_type_ident = name_generator.get_row_type_ident(); + let available_index_ident = name_generator.get_available_indexes_ident(); let reinsert_rows = self .columns @@ -99,6 +99,11 @@ impl Generator { .iter() .map(|(i, idx)| { let index_field_name = &idx.name; + let camel_case_name = index_field_name + .to_string() + .from_case(Case::Snake) + .to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); let row = if is_float( self.columns .columns_map @@ -129,7 +134,13 @@ impl Generator { quote! { let row = &row_new; let val_new = #row.clone(); - self.#index_field_name.insert(val_new.clone(), link_new); + if self.#index_field_name.insert_checked(val_new.clone(), link_new).is_none() { + return Err(IndexError::AlreadyExists { + at: #available_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }) + } + inserted_indexes.push(#available_index_ident::#index_variant); let row = &row_old; let val_old = #row.clone(); @@ -139,7 +150,14 @@ impl Generator { .collect::>(); quote! { - fn reinsert_row(&self, row_old: #row_type_ident, link_old: Link, row_new: #row_type_ident, link_new: Link) -> eyre::Result<()> { + fn reinsert_row(&self, + row_old: #row_type_ident, + link_old: Link, + row_new: #row_type_ident, + link_new: Link + ) -> core::result::Result<(), IndexError<#available_index_ident>> + { + let mut inserted_indexes: Vec<#available_index_ident> = vec![]; #(#reinsert_rows)* core::result::Result::Ok(()) } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 5330528..4bfcb7a 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -67,8 +67,14 @@ impl Generator { #full_row_lock }; let row_old = self.0.data.select_non_ghosted(link)?; - self.reinsert(row_old, row)?; + if let Err(e) = self.reinsert(row_old, row) { + self.0.update_state.remove(&pk); + lock.unlock(); + return Err(e); + } + + self.0.update_state.remove(&pk); lock.unlock(); self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks @@ -90,6 +96,9 @@ impl Generator { .map(|v| v.get().value) .ok_or(WorkTableError::NotFound)?; + let row_old = self.0.data.select_non_ghosted(link)?; + self.0.update_state.insert(pk.clone(), row_old); + let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; #size_check @@ -103,6 +112,8 @@ impl Generator { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; + self.0.update_state.remove(&pk); + lock.unlock(); // Releases locks self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks @@ -250,7 +261,12 @@ impl Generator { let mut row_new = row_old.clone(); let pk = row_old.get_primary_key().clone(); #(#row_updates)* - self.reinsert(row_old, row_new)?; + if let Err(e) = self.reinsert(row_old, row_new) { + self.0.update_state.remove(&pk); + lock.unlock(); + + return Err(e); + } lock.unlock(); // Releases locks self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks @@ -483,7 +499,12 @@ impl Generator { let row_old = self.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); #(#row_updates)* - self.reinsert(row_old, row_new)?; + if let Err(e) = self.reinsert(row_old, row_new) { + self.0.update_state.remove(&pk); + lock.unlock(); + + return Err(e); + } lock.unlock(); // Releases locks self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 894cbd7..d960cf2 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -38,6 +38,7 @@ where pages: RwLock::WrappedRow, DATA_LENGTH>>>>, /// Stack with empty [`Link`]s. It stores [`Link`]s of rows that was deleted. + // TODO: Proper empty links registry + defragmentation empty_links: Stack, /// Count of saved rows. diff --git a/src/index/table_secondary_index/cdc.rs b/src/index/table_secondary_index/cdc.rs index 71b503b..b058089 100644 --- a/src/index/table_secondary_index/cdc.rs +++ b/src/index/table_secondary_index/cdc.rs @@ -16,7 +16,7 @@ pub trait TableSecondaryIndexCdc eyre::Result; + ) -> Result>; fn delete_row_cdc( &self, row: Row, diff --git a/src/index/table_secondary_index/mod.rs b/src/index/table_secondary_index/mod.rs index b9056a9..c6468df 100644 --- a/src/index/table_secondary_index/mod.rs +++ b/src/index/table_secondary_index/mod.rs @@ -20,7 +20,7 @@ pub trait TableSecondaryIndex { link_old: Link, row_new: Row, link_new: Link, - ) -> eyre::Result<()>; + ) -> Result<(), IndexError>; fn delete_row(&self, row: Row, link: Link) -> Result<(), IndexError>; @@ -48,7 +48,13 @@ where Ok(()) } - fn reinsert_row(&self, _: Row, _: Link, _: Row, _: Link) -> eyre::Result<()> { + fn reinsert_row( + &self, + _: Row, + _: Link, + _: Row, + _: Link, + ) -> Result<(), IndexError> { Ok(()) } diff --git a/src/table/mod.rs b/src/table/mod.rs index f0d943e..91f9280 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -252,8 +252,7 @@ where SecondaryIndexes: TableSecondaryIndex + TableSecondaryIndexCdc, PkGen: PrimaryKeyGeneratorState, - AvailableIndexes: Debug, - AvailableIndexes: AvailableIndex, + AvailableIndexes: Debug + AvailableIndex, { let pk = row.get_primary_key().clone(); let (link, _) = self @@ -326,6 +325,7 @@ where <::WrappedRow as Archive>::Archived: GhostWrapper, PrimaryKey: Clone, AvailableTypes: 'static, + AvailableIndexes: Debug + AvailableIndex, SecondaryIndexes: TableSecondaryIndex, LockType: 'static, { @@ -345,15 +345,39 @@ where .map_err(WorkTableError::PagesError)? } // we can not check for existence here. - self.pk_map.insert(pk.clone(), new_link); - self.indexes - .reinsert_row(row_old, old_link, row_new, new_link) - .map_err(|_| WorkTableError::SecondaryIndexError)?; - self.data - .delete(old_link) - .map_err(WorkTableError::PagesError)?; + if self.pk_map.checked_insert(pk.clone(), new_link).is_some() { + let indexes_res = + self.indexes + .reinsert_row(row_old, old_link, row_new.clone(), new_link); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.pk_map.remove(&pk); + self.indexes + .delete_from_indexes(row_new, new_link, inserted_already)?; + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; - Ok(pk) + Err(WorkTableError::AlreadyExists(at.to_string())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + self.data + .delete(old_link) + .map_err(WorkTableError::PagesError)?; + Ok(pk) + } else { + // primary key violated. + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; + Err(WorkTableError::AlreadyExists("Primary".to_string())) + } } #[allow(clippy::type_complexity)] @@ -383,7 +407,7 @@ where SecondaryIndexes: TableSecondaryIndex + TableSecondaryIndexCdc, PkGen: PrimaryKeyGeneratorState, - AvailableIndexes: Debug, + AvailableIndexes: Debug + AvailableIndex, { let pk = row_new.get_primary_key().clone(); let old_link = self @@ -400,30 +424,54 @@ where .with_mut_ref(new_link, |r| r.unghost()) .map_err(WorkTableError::PagesError)? } - // we can not check for existence here. - let (_, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), new_link); - let secondary_keys_events = self - .indexes - .reinsert_row_cdc(row_old, old_link, row_new, new_link) - .map_err(|_| WorkTableError::SecondaryIndexError)?; - self.data - .delete(old_link) - .map_err(WorkTableError::PagesError)?; - let bytes = self - .data - .select_raw(new_link) - .map_err(WorkTableError::PagesError)?; + if let Some(primary_key_events) = self.pk_map.checked_insert_cdc(pk.clone(), new_link) { + let indexes_res = + self.indexes + .reinsert_row_cdc(row_old, old_link, row_new.clone(), new_link); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.pk_map.remove(&pk); + self.indexes + .delete_from_indexes(row_new, new_link, inserted_already)?; + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; - let op = Operation::Insert(InsertOperation { - id: OperationId::Single(Uuid::now_v7()), - pk_gen_state: self.pk_gen.get_state(), - primary_key_events, - secondary_keys_events, - bytes, - link: new_link, - }); + Err(WorkTableError::AlreadyExists(at.to_string())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } - Ok((pk, op)) + self.data + .delete(old_link) + .map_err(WorkTableError::PagesError)?; + let bytes = self + .data + .select_raw(new_link) + .map_err(WorkTableError::PagesError)?; + + let op = Operation::Insert(InsertOperation { + id: OperationId::Single(Uuid::now_v7()), + pk_gen_state: self.pk_gen.get_state(), + primary_key_events, + secondary_keys_events: indexes_res.expect("was checked just before"), + bytes, + link: new_link, + }); + + Ok((pk, op)) + } else { + // primary key violated. + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; + Err(WorkTableError::AlreadyExists("Primary".to_string())) + } } } diff --git a/tests/worktable/index/update_full.rs b/tests/worktable/index/update_full.rs index cd9cc2d..27c5402 100644 --- a/tests/worktable/index/update_full.rs +++ b/tests/worktable/index/update_full.rs @@ -284,3 +284,68 @@ async fn update_by_full_row_non_unique_with_string_update() { .expect("rows"); assert_eq!(updated.first(), None); } + +#[tokio::test] +async fn update_by_full_row_with_reinsert_and_primary_key_violation() { + let test_table = Test3UniqueWorkTable::default(); + + let row1 = Test3UniqueRow { + val: 1, + attr1: "TEST".to_string(), + attr2: 1000, + attr3: 65000, + id: 0, + }; + test_table.insert(row1.clone()).unwrap(); + let row2 = Test3UniqueRow { + val: 1, + attr1: "TEST1".to_string(), + attr2: 1001, + attr3: 65001, + id: 1, + }; + test_table.insert(row2.clone()).unwrap(); + let mut update = row1.clone(); + update.id = row2.id.clone(); + update.attr1 = "TEST_______________________1".to_string(); + assert!(test_table.update(update).await.is_err()); + + assert_eq!( + test_table.select_by_attr1(row2.attr1.clone()).unwrap(), + row2 + ); + assert_eq!(test_table.select_by_attr2(row2.attr2).unwrap(), row2); + assert_eq!(test_table.select_by_attr3(row2.attr3).unwrap(), row2); +} + +#[tokio::test] +async fn update_by_full_row_with_reinsert_and_secondary_unique_violation() { + let test_table = Test3UniqueWorkTable::default(); + + let row1 = Test3UniqueRow { + val: 1, + attr1: "TEST".to_string(), + attr2: 1000, + attr3: 65000, + id: 0, + }; + test_table.insert(row1.clone()).unwrap(); + let row2 = Test3UniqueRow { + val: 1, + attr1: "TEST__________________1".to_string(), + attr2: 1001, + attr3: 65001, + id: 1, + }; + test_table.insert(row2.clone()).unwrap(); + let mut update = row1.clone(); + update.attr1 = row2.attr1.clone(); + assert!(test_table.update(update).await.is_err()); + + assert_eq!( + test_table.select_by_attr1(row2.attr1.clone()).unwrap(), + row2 + ); + assert_eq!(test_table.select_by_attr2(row2.attr2).unwrap(), row2); + assert_eq!(test_table.select_by_attr3(row2.attr3).unwrap(), row2); +} From 251e826d1c3c9522736a38c7e6cc7e8d12d675d5 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 5 Aug 2025 01:01:38 +0300 Subject: [PATCH 4/6] add diffs logic for non reinsert updates --- codegen/src/worktable/generator/index/cdc.rs | 102 +++++++++++++++--- codegen/src/worktable/generator/index/mod.rs | 2 +- .../src/worktable/generator/index/usual.rs | 84 ++++++++++++--- .../src/worktable/generator/queries/type.rs | 23 +++- .../src/worktable/generator/queries/update.rs | 87 +++++++++++++-- codegen/src/worktable/generator/row.rs | 15 +++ src/in_memory/mod.rs | 2 +- src/in_memory/row.rs | 4 + src/index/available_index.rs | 4 +- src/index/mod.rs | 2 +- src/index/table_secondary_index/cdc.rs | 11 +- src/index/table_secondary_index/mod.rs | 24 ++++- src/lib.rs | 2 +- src/persistence/operation/operation.rs | 12 +++ src/table/mod.rs | 8 +- 15 files changed, 324 insertions(+), 58 deletions(-) diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index 97da367..9a903ce 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -1,4 +1,4 @@ -use crate::name_generator::WorktableNameGenerator; +use crate::name_generator::{is_float, WorktableNameGenerator}; use crate::worktable::generator::queries::r#type::map_to_uppercase; use crate::worktable::generator::Generator; use convert_case::{Case, Casing}; @@ -17,15 +17,16 @@ impl Generator { let save_row_cdc = self.gen_save_row_cdc_index_fn(); let reinsert_row_cdc = self.gen_reinsert_row_cdc_index_fn(); let delete_row_cdc = self.gen_delete_row_cdc_index_fn(); - let process_diff_cdc = self.gen_process_diff_cdc_index_fn(); + let process_difference_insert_cdc = self.gen_process_difference_insert_cdc_index_fn(); + let process_difference_remove_cdc = self.gen_process_difference_remove_cdc_index_fn(); quote! { impl TableSecondaryIndexCdc<#row_type_ident, #available_types_ident, #events_ident, #available_index_ident> for #index_type_ident { #reinsert_row_cdc #save_row_cdc #delete_row_cdc - - #process_diff_cdc + #process_difference_insert_cdc + #process_difference_remove_cdc } } } @@ -57,6 +58,7 @@ impl Generator { inserted_already: inserted_indexes.clone(), }); }; + inserted_indexes.push(#available_index_ident::#index_variant); } }) .collect::>(); @@ -121,6 +123,7 @@ impl Generator { inserted_already: inserted_indexes.clone(), }); }; + inserted_indexes.push(#available_index_ident::#index_variant); #remove } }) @@ -189,9 +192,10 @@ impl Generator { } } - fn gen_process_diff_cdc_index_fn(&self) -> TokenStream { + fn gen_process_difference_remove_cdc_index_fn(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let avt_type_ident = name_generator.get_available_type_ident(); + let available_index_ident = name_generator.get_available_indexes_ident(); let events_ident = name_generator.get_space_secondary_index_events_ident(); let process_difference_rows = self.columns.indexes.iter().map(|(i, idx)| { @@ -202,10 +206,12 @@ impl Generator { let type_str = t.to_string(); let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); - let (new_value_expr, old_value_expr) = if type_str == "String" { - (quote! { new.to_string() }, quote! { old.to_string() }) + let old_value_expr = if type_str == "String" { + quote! { old.to_string() } + } else if is_float(type_str.as_str()) { + quote! { OrderedFloat(*old) } } else { - (quote! { *new }, quote! { *old }) + quote! { *old } }; quote! { @@ -216,11 +222,79 @@ impl Generator { let (_, evs) = TableIndexCdc::remove_cdc(&self.#index_field_name, key_old, link); events.extend_from_slice(evs.as_ref()); } + events + } else { + vec![] + }; + } + } else { + quote! {} + } + }); + let idents = self + .columns + .indexes + .values() + .map(|idx| &idx.name) + .collect::>(); + + quote! { + fn process_difference_remove_cdc( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> + ) -> Result<#events_ident, IndexError<#available_index_ident>> { + #(#process_difference_rows)* + core::result::Result::Ok( + #events_ident { + #(#idents,)* + } + ) + } + } + } + + fn gen_process_difference_insert_cdc_index_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let avt_type_ident = name_generator.get_available_type_ident(); + let available_index_ident = name_generator.get_available_indexes_ident(); + let events_ident = name_generator.get_space_secondary_index_events_ident(); + + let process_difference_insert_rows = self.columns.indexes.iter().map(|(i, idx)| { + let index_field_name = &idx.name; + let diff_key = Literal::string(i.to_string().as_str()); + + if let Some(t) = self.columns.columns_map.get(&idx.field) { + let type_str = t.to_string(); + let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); + let camel_case_name = index_field_name + .to_string() + .from_case(Case::Snake) + .to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + let new_value_expr = if type_str == "String" { + quote! { new.to_string() } + } else if is_float(type_str.as_str()) { + quote! { OrderedFloat(*new) } + } else { + quote! { *new } + }; + + quote! { + let #index_field_name = if let Some(diff) = difference.get(#diff_key) { + let mut events = vec![]; if let #avt_type_ident::#variant_ident(new) = &diff.new { let key_new = #new_value_expr; - let (_, evs) = TableIndexCdc::insert_cdc(&self.#index_field_name, key_new, link); - events.extend_from_slice(evs.as_ref()); + if let Some(evs) = TableIndexCdc::insert_checked_cdc(&self.#index_field_name, key_new, link) { + events.extend_from_slice(evs.as_ref()); + } else { + return Err(IndexError::AlreadyExists { + at: #available_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(#available_index_ident::#index_variant); } events } else { @@ -239,12 +313,14 @@ impl Generator { .collect::>(); quote! { - fn process_difference_cdc( + fn process_difference_insert_cdc( &self, link: Link, difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> - ) -> core::result::Result<#events_ident, WorkTableError> { - #(#process_difference_rows)* + ) -> Result<#events_ident, IndexError<#available_index_ident>> { + let mut inserted_indexes: Vec<#available_index_ident> = vec![]; + + #(#process_difference_insert_rows)* core::result::Result::Ok( #events_ident { #(#idents,)* diff --git a/codegen/src/worktable/generator/index/mod.rs b/codegen/src/worktable/generator/index/mod.rs index b7de951..29138c4 100644 --- a/codegen/src/worktable/generator/index/mod.rs +++ b/codegen/src/worktable/generator/index/mod.rs @@ -164,7 +164,7 @@ impl Generator { } impl AvailableIndex for #avt_type_ident { - fn to_string(&self) -> String { + fn to_string_value(&self) -> String { ToString::to_string(&self) } } diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index 6dd713a..c2c6314 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -17,7 +17,8 @@ impl Generator { let save_row_fn = self.gen_save_row_index_fn(); let reinsert_row_fn = self.gen_reinsert_row_index_fn(); let delete_row_fn = self.gen_delete_row_index_fn(); - let process_difference_fn = self.gen_process_difference_index_fn(); + let process_difference_insert_fn = self.gen_process_difference_insert_index_fn(); + let process_difference_remove_fn = self.gen_process_difference_remove_index_fn(); let delete_from_indexes = self.gen_index_delete_from_indexes_fn(); quote! { @@ -25,7 +26,8 @@ impl Generator { #save_row_fn #reinsert_row_fn #delete_row_fn - #process_difference_fn + #process_difference_insert_fn + #process_difference_remove_fn #delete_from_indexes } } @@ -214,13 +216,14 @@ impl Generator { } } - /// Generates `process_difference` function of `TableIndex` trait for index. It updates `Link` for all secondary indexes. + /// Generates `process_difference_remove` function of `TableIndex` trait for index. It updates `Link` for all secondary indexes. /// Uses HashMap<&str, Difference> for storing all changes - fn gen_process_difference_index_fn(&self) -> TokenStream { + fn gen_process_difference_remove_index_fn(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let avt_type_ident = name_generator.get_available_type_ident(); + let avt_index_ident = name_generator.get_available_indexes_ident(); - let process_difference_rows = self.columns.indexes.iter().map(|(i, idx)| { + let process_difference_remove_rows = self.columns.indexes.iter().map(|(i, idx)| { let index_field_name = &idx.name; let diff_key = Literal::string(i.to_string().as_str()); @@ -228,12 +231,12 @@ impl Generator { let type_str = t.to_string(); let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); - let (new_value_expr, old_value_expr) = if type_str == "String" { - (quote! { new.to_string() }, quote! { old.to_string() }) + let old_value_expr = if type_str == "String" { + quote! { old.to_string() } } else if is_float(type_str.as_str()) { - (quote! { OrderedFloat(*new) }, quote! { OrderedFloat(*old) }) + quote! { OrderedFloat(*old) } } else { - (quote! { *new }, quote! { *old }) + quote! { *old } }; quote! { @@ -242,10 +245,64 @@ impl Generator { let key_old = #old_value_expr; TableIndex::remove(&self.#index_field_name, key_old, link); } + } + } + } else { + quote! {} + } + }); + quote! { + fn process_difference_remove( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> + ) -> core::result::Result<(), IndexError<#avt_index_ident>> { + #(#process_difference_remove_rows)* + core::result::Result::Ok(()) + } + } + } + + /// Generates `process_difference_insert` function of `TableIndex` trait for index. It updates `Link` for all secondary indexes. + /// Uses HashMap<&str, Difference> for storing all changes + fn gen_process_difference_insert_index_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let avt_type_ident = name_generator.get_available_type_ident(); + let avt_index_ident = name_generator.get_available_indexes_ident(); + + let process_difference_insert_rows = self.columns.indexes.iter().map(|(i, idx)| { + let index_field_name = &idx.name; + let diff_key = Literal::string(i.to_string().as_str()); + + if let Some(t) = self.columns.columns_map.get(&idx.field) { + let type_str = t.to_string(); + let variant_ident = Ident::new(&map_to_uppercase(&type_str), Span::mixed_site()); + let camel_case_name = index_field_name + .to_string() + .from_case(Case::Snake) + .to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + + let new_value_expr = if type_str == "String" { + quote! { new.to_string() } + } else if is_float(type_str.as_str()) { + quote! { OrderedFloat(*new) } + } else { + quote! { *new } + }; + + quote! { + if let Some(diff) = difference.get(#diff_key) { if let #avt_type_ident::#variant_ident(new) = &diff.new { let key_new = #new_value_expr; - TableIndex::insert(&self.#index_field_name, key_new, link); + if TableIndex::insert_checked(&self.#index_field_name, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: #avt_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }) + } + inserted_indexes.push(#avt_index_ident::#index_variant); } } } @@ -255,12 +312,13 @@ impl Generator { }); quote! { - fn process_difference( + fn process_difference_insert( &self, link: Link, difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> - ) -> core::result::Result<(), WorkTableError> { - #(#process_difference_rows)* + ) -> core::result::Result<(), IndexError<#avt_index_ident>> { + let mut inserted_indexes: Vec<#avt_index_ident> = vec![]; + #(#process_difference_insert_rows)* core::result::Result::Ok(()) } } diff --git a/codegen/src/worktable/generator/queries/type.rs b/codegen/src/worktable/generator/queries/type.rs index f2aab0c..3985f0c 100644 --- a/codegen/src/worktable/generator/queries/type.rs +++ b/codegen/src/worktable/generator/queries/type.rs @@ -65,13 +65,16 @@ impl Generator { } pub fn gen_result_types_def(&mut self) -> syn::Result { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let row_ident = name_generator.get_row_type_ident(); + if let Some(queries) = &self.queries { let query_defs = queries .updates .keys() .map(|v| { let ident = Ident::new(format!("{v}Query").as_str(), Span::mixed_site()); - let rows = queries + let (rows, updates): (Vec<_>, Vec<_>) = queries .updates .get(v) .expect("exists") @@ -105,9 +108,15 @@ impl Generator { quote! {pub #i: #type_,} }; - Ok::<_, syn::Error>(def) + let update = quote! { + row.#i = self.#i; + }; + + Ok::<_, syn::Error>((def, update)) }) - .collect::, _>>()?; + .collect::, _>>()? + .into_iter() + .unzip(); Ok::<_, syn::Error>(quote! { @@ -116,6 +125,14 @@ impl Generator { pub struct #ident { #(#rows)* } + + impl Query<#row_ident> for #ident { + fn merge(self, mut row: #row_ident) -> #row_ident { + #(#updates)* + + row + } + } }) }) .collect::, _>>()?; diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 4bfcb7a..8aeda8b 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -53,7 +53,9 @@ impl Generator { .map(|idx| idx.field.clone()) .collect(); - let diff_process = self.gen_process_diffs_on_index(idents.as_slice(), Some(&idents)); + let diff_process_insert = + self.gen_process_diffs_insert_on_index(idents.as_slice(), Some(&idents)); + let diff_process_remove = self.gen_process_diffs_remove_on_index(Some(&idents)); let persist_call = self.gen_persist_call(); let persist_op = self.gen_persist_op(); let full_row_lock = self.gen_full_lock_for_update(); @@ -105,13 +107,15 @@ impl Generator { let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#row_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; let op_id = OperationId::Single(uuid::Uuid::now_v7()); - #diff_process + #diff_process_insert #persist_op unsafe { self.0.data.with_mut_ref(link, move |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; + #diff_process_remove + self.0.update_state.remove(&pk); lock.unlock(); // Releases locks @@ -302,7 +306,7 @@ impl Generator { } } - fn gen_process_diffs_on_index( + fn gen_process_diffs_insert_on_index( &self, idents: &[Ident], idx_idents: Option<&Vec>, @@ -350,7 +354,22 @@ impl Generator { let process_difference = if self.is_persist { if idx_idents.is_some() { quote! { - let secondary_keys_events = self.0.indexes.process_difference_cdc(link, diffs)?; + let indexes_res = self.0.indexes.process_difference_insert_cdc(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes + .delete_from_indexes(row_new.merge(row_old.clone()), link, inserted_already)?; + + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + let mut secondary_keys_events = indexes_res.expect("was just checked for correctness"); } } else { quote! { @@ -359,7 +378,21 @@ impl Generator { } } else if idx_idents.is_some() { quote! { - self.0.indexes.process_difference(link, diffs)?; + let indexes_res = self.0.indexes.process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes + .delete_from_indexes(row_new.merge(row_old.clone()), link, inserted_already)?; + + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } } } else { quote! {} @@ -372,6 +405,29 @@ impl Generator { } } + fn gen_process_diffs_remove_on_index(&self, idx_idents: Option<&Vec>) -> TokenStream { + let process_difference = if self.is_persist { + if idx_idents.is_some() { + quote! { + let secondary_keys_events_remove = self.0.indexes.process_difference_remove_cdc(link, diffs)?; + op.extend_secondary_key_events(secondary_keys_events_remove); + } + } else { + quote! {} + } + } else if idx_idents.is_some() { + quote! { + self.0.indexes.process_difference_remove(link, diffs)?; + } + } else { + quote! {} + }; + + quote! { + #process_difference + } + } + fn gen_pk_update( &self, snake_case_name: String, @@ -398,7 +454,8 @@ impl Generator { .collect::>(); let size_check = self.gen_size_check(unsized_fields, idents); - let diff_process = self.gen_process_diffs_on_index(idents, idx_idents); + let diff_process_insert = self.gen_process_diffs_insert_on_index(idents, idx_idents); + let diff_process_remove = self.gen_process_diffs_remove_on_index(idx_idents); let persist_call = self.gen_persist_call(); let persist_op = self.gen_persist_op(); let custom_lock = self.gen_custom_lock_for_update(lock_ident); @@ -423,13 +480,15 @@ impl Generator { let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check - #diff_process + #diff_process_insert #persist_op unsafe { self.0.data.with_mut_ref(link, |archived| { #(#row_updates)* }).map_err(WorkTableError::PagesError)? }; + #diff_process_remove + lock.unlock(); self.0.lock_map.remove_with_lock_check(&pk).await; @@ -515,7 +574,8 @@ impl Generator { } else { quote! {} }; - let diff_process = self.gen_process_diffs_on_index(idents, idx_idents); + let diff_process_insert = self.gen_process_diffs_insert_on_index(idents, idx_idents); + let diff_process_remove = self.gen_process_diffs_remove_on_index(idx_idents); let persist_call = self.gen_persist_call(); let persist_op = self.gen_persist_op(); let by = if is_float(by_ident.to_string().as_str()) { @@ -556,7 +616,7 @@ impl Generator { }; #size_check - #diff_process + #diff_process_insert #persist_op unsafe { @@ -565,6 +625,8 @@ impl Generator { }).map_err(WorkTableError::PagesError)?; } + #diff_process_remove + #persist_call } for (pk, lock) in pk_to_unlock { @@ -603,7 +665,8 @@ impl Generator { }) .collect::>(); let size_check = self.gen_size_check(unsized_fields, idents); - let diff_process = self.gen_process_diffs_on_index(idents, idx_idents); + let diff_process_insert = self.gen_process_diffs_insert_on_index(idents, idx_idents); + let diff_process_remove = self.gen_process_diffs_remove_on_index(idx_idents); let persist_call = self.gen_persist_call(); let persist_op = self.gen_persist_op(); let by = if is_float(by_ident.to_string().as_str()) { @@ -644,7 +707,7 @@ impl Generator { let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check - #diff_process + #diff_process_insert #persist_op unsafe { @@ -653,6 +716,8 @@ impl Generator { }).map_err(WorkTableError::PagesError)?; } + #diff_process_remove + lock.unlock(); self.0.lock_map.remove_with_lock_check(&pk).await; diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index 142b6db..d2cf2bc 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -10,11 +10,13 @@ impl Generator { let def = self.gen_row_type(); let table_row_impl = self.gen_row_table_row_impl(); let row_fields_enum = self.gen_row_fields_enum(); + let query_impl = self.gen_query_impl(); quote! { #def #table_row_impl #row_fields_enum + #query_impl } } @@ -93,6 +95,19 @@ impl Generator { } } + fn gen_query_impl(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let ident = name_generator.get_row_type_ident(); + + quote! { + impl Query<#ident> for #ident { + fn merge(self, row: #ident) -> #ident { + row + } + } + } + } + /// Generates `RowFields` enum for row. fn gen_row_fields_enum(&self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index 43b59e2..bc9a6b6 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -4,4 +4,4 @@ mod row; pub use data::{Data, ExecutionError as DataExecutionError, DATA_INNER_LENGTH}; pub use pages::{DataPages, ExecutionError as PagesExecutionError}; -pub use row::{GhostWrapper, RowWrapper, StorableRow}; +pub use row::{GhostWrapper, Query, RowWrapper, StorableRow}; diff --git a/src/in_memory/row.rs b/src/in_memory/row.rs index 8404607..2ef9bf0 100644 --- a/src/in_memory/row.rs +++ b/src/in_memory/row.rs @@ -18,3 +18,7 @@ pub trait RowWrapper { pub trait GhostWrapper { fn unghost(&mut self); } + +pub trait Query { + fn merge(self, row: Row) -> Row; +} diff --git a/src/index/available_index.rs b/src/index/available_index.rs index 71e6bd1..32b1899 100644 --- a/src/index/available_index.rs +++ b/src/index/available_index.rs @@ -1,9 +1,9 @@ pub trait AvailableIndex { - fn to_string(&self) -> String; + fn to_string_value(&self) -> String; } impl AvailableIndex for () { - fn to_string(&self) -> String { + fn to_string_value(&self) -> String { "".to_string() } } diff --git a/src/index/mod.rs b/src/index/mod.rs index ce3e09b..a164ecb 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -15,7 +15,7 @@ pub use table_secondary_index::{ }; pub use unsized_node::UnsizedNode; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Difference { pub old: AvailableTypes, pub new: AvailableTypes, diff --git a/src/index/table_secondary_index/cdc.rs b/src/index/table_secondary_index/cdc.rs index b058089..8bea76b 100644 --- a/src/index/table_secondary_index/cdc.rs +++ b/src/index/table_secondary_index/cdc.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use data_bucket::Link; -use crate::{Difference, IndexError, WorkTableError}; +use crate::{Difference, IndexError}; pub trait TableSecondaryIndexCdc { fn save_row_cdc( @@ -22,9 +22,14 @@ pub trait TableSecondaryIndexCdc Result>; - fn process_difference_cdc( + fn process_difference_insert_cdc( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result; + ) -> Result>; + fn process_difference_remove_cdc( + &self, + link: Link, + differences: HashMap<&str, Difference>, + ) -> Result>; } diff --git a/src/index/table_secondary_index/mod.rs b/src/index/table_secondary_index/mod.rs index c6468df..2f1ca7b 100644 --- a/src/index/table_secondary_index/mod.rs +++ b/src/index/table_secondary_index/mod.rs @@ -31,11 +31,17 @@ pub trait TableSecondaryIndex { indexes: Vec, ) -> Result<(), IndexError>; - fn process_difference( + fn process_difference_insert( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result<(), WorkTableError>; + ) -> Result<(), IndexError>; + + fn process_difference_remove( + &self, + link: Link, + differences: HashMap<&str, Difference>, + ) -> Result<(), IndexError>; } impl @@ -71,11 +77,19 @@ where Ok(()) } - fn process_difference( + fn process_difference_insert( + &self, + _: Link, + _: HashMap<&str, Difference>, + ) -> Result<(), IndexError> { + Ok(()) + } + + fn process_difference_remove( &self, _: Link, _: HashMap<&str, Difference>, - ) -> Result<(), WorkTableError> { + ) -> Result<(), IndexError> { Ok(()) } } @@ -98,7 +112,7 @@ where IndexError::AlreadyExists { at, inserted_already: _, - } => WorkTableError::AlreadyExists(at.to_string()), + } => WorkTableError::AlreadyExists(at.to_string_value()), IndexError::NotFound => WorkTableError::NotFound, } } diff --git a/src/lib.rs b/src/lib.rs index 76af12b..511e853 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,7 @@ pub use table::*; pub use worktable_codegen::worktable; pub mod prelude { - pub use crate::in_memory::{Data, DataPages, GhostWrapper, RowWrapper, StorableRow}; + pub use crate::in_memory::{Data, DataPages, GhostWrapper, Query, RowWrapper, StorableRow}; pub use crate::lock::LockMap; pub use crate::lock::{Lock, RowLock}; pub use crate::mem_stat::MemStat; diff --git a/src/persistence/operation/operation.rs b/src/persistence/operation/operation.rs index dd6b92a..79ea01a 100644 --- a/src/persistence/operation/operation.rs +++ b/src/persistence/operation/operation.rs @@ -6,6 +6,7 @@ use indexset::cdc::change::ChangeEvent; use indexset::core::pair::Pair; use crate::persistence::{OperationId, OperationType}; +use crate::TableSecondaryIndexEventsOps; #[derive(Clone, Debug)] pub enum Operation { @@ -86,6 +87,17 @@ impl } } + pub fn extend_secondary_key_events(&mut self, evs: SecondaryKeys) + where + SecondaryKeys: TableSecondaryIndexEventsOps, + { + match self { + Operation::Insert(insert) => insert.secondary_keys_events.extend(evs), + Operation::Update(update) => update.secondary_keys_events.extend(evs), + Operation::Delete(delete) => delete.secondary_keys_events.extend(evs), + } + } + pub fn pk_gen_state(&self) -> Option<&PrimaryKeyGenState> { match &self { Operation::Insert(insert) => Some(&insert.pk_gen_state), diff --git a/src/table/mod.rs b/src/table/mod.rs index 91f9280..549903c 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -212,7 +212,7 @@ where self.indexes .delete_from_indexes(row, link, inserted_already)?; - Err(WorkTableError::AlreadyExists(at.to_string())) + Err(WorkTableError::AlreadyExists(at.to_string_value())) } IndexError::NotFound => Err(WorkTableError::NotFound), }; @@ -276,7 +276,7 @@ where self.indexes .delete_from_indexes(row, link, inserted_already)?; - Err(WorkTableError::AlreadyExists(at.to_string())) + Err(WorkTableError::AlreadyExists(at.to_string_value())) } IndexError::NotFound => Err(WorkTableError::NotFound), }; @@ -362,7 +362,7 @@ where .delete(new_link) .map_err(WorkTableError::PagesError)?; - Err(WorkTableError::AlreadyExists(at.to_string())) + Err(WorkTableError::AlreadyExists(at.to_string_value())) } IndexError::NotFound => Err(WorkTableError::NotFound), }; @@ -441,7 +441,7 @@ where .delete(new_link) .map_err(WorkTableError::PagesError)?; - Err(WorkTableError::AlreadyExists(at.to_string())) + Err(WorkTableError::AlreadyExists(at.to_string_value())) } IndexError::NotFound => Err(WorkTableError::NotFound), }; From 2e58320ab115f84bf5c2573f558e8d249a6bf5ef Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 5 Aug 2025 02:55:26 +0300 Subject: [PATCH 5/6] finalize --- codegen/src/worktable/generator/index/cdc.rs | 40 +++-- .../src/worktable/generator/index/usual.rs | 38 +++-- .../worktable/generator/queries/unsized_.rs | 2 +- .../src/worktable/generator/queries/update.rs | 10 +- codegen/src/worktable/generator/row.rs | 2 +- src/index/table_index/cdc.rs | 5 +- src/index/table_index/mod.rs | 6 +- src/table/mod.rs | 143 ++++++++---------- tests/worktable/index/update_by_pk.rs | 90 +++++++++++ tests/worktable/index/update_full.rs | 53 +++++++ 10 files changed, 277 insertions(+), 112 deletions(-) diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index 9a903ce..f2239df 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -89,7 +89,7 @@ impl Generator { let events_ident = name_generator.get_space_secondary_index_events_ident(); let available_index_ident = name_generator.get_available_indexes_ident(); - let reinsert_rows = self + let (insert_rows, remove_rows): (Vec<_>, Vec<_>) = self .columns .indexes .iter() @@ -103,31 +103,42 @@ impl Generator { let remove = if idx.is_unique { quote! { - if row_new.#i != row_old.#i { + if row_new.#i == row_old.#i { + let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1; + #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); + } else { let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old); #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); } } } else { quote! { + let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1; + #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old); #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); } }; - quote! { - let mut #index_field_name: Vec<_> = if let Some(events) = self.#index_field_name.insert_checked_cdc(row_new.#i.clone(), link_new) { - events.into_iter().map(|ev| ev.into()).collect() + let insert = quote! { + let mut #index_field_name = if row_new.#i != row_old.#i { + let #index_field_name: Vec<_> = if let Some(events) = self.#index_field_name.insert_checked_cdc(row_new.#i.clone(), link_new) { + events.into_iter().map(|ev| ev.into()).collect() + } else { + return Err(IndexError::AlreadyExists { + at: #available_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }); + }; + inserted_indexes.push(#available_index_ident::#index_variant); + + #index_field_name } else { - return Err(IndexError::AlreadyExists { - at: #available_index_ident::#index_variant, - inserted_already: inserted_indexes.clone(), - }); + vec![] }; - inserted_indexes.push(#available_index_ident::#index_variant); - #remove - } + }; + (insert, remove) }) - .collect::>(); + .unzip(); let idents = self .columns .indexes @@ -145,7 +156,8 @@ impl Generator { ) -> Result<#events_ident, IndexError<#available_index_ident>> { let mut inserted_indexes: Vec<#available_index_ident> = vec![]; - #(#reinsert_rows)* + #(#insert_rows)* + #(#remove_rows)* core::result::Result::Ok( #events_ident { #(#idents,)* diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index c2c6314..4f89c98 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -95,7 +95,7 @@ impl Generator { let row_type_ident = name_generator.get_row_type_ident(); let available_index_ident = name_generator.get_available_indexes_ident(); - let reinsert_rows = self + let (insert_rows, remove_rows): (Vec<_>, Vec<_>) = self .columns .indexes .iter() @@ -124,32 +124,43 @@ impl Generator { }; let remove = if idx.is_unique { quote! { - if val_new != val_old { + if val_new == val_old { + self.#index_field_name.insert(val_new.clone(), link_new); + } else { TableIndex::remove(&self.#index_field_name, val_old, link_old); } } } else { quote! { + self.#index_field_name.insert(val_new.clone(), link_new); TableIndex::remove(&self.#index_field_name, val_old, link_old); } }; - quote! { + let insert = quote! { let row = &row_new; let val_new = #row.clone(); - if self.#index_field_name.insert_checked(val_new.clone(), link_new).is_none() { - return Err(IndexError::AlreadyExists { - at: #available_index_ident::#index_variant, - inserted_already: inserted_indexes.clone(), - }) + let row = &row_old; + let val_old = #row.clone(); + if val_new != val_old { + if self.#index_field_name.insert_checked(val_new.clone(), link_new).is_none() { + return Err(IndexError::AlreadyExists { + at: #available_index_ident::#index_variant, + inserted_already: inserted_indexes.clone(), + }) + } + inserted_indexes.push(#available_index_ident::#index_variant); } - inserted_indexes.push(#available_index_ident::#index_variant); - + }; + let remove = quote! { + let row = &row_new; + let val_new = #row.clone(); let row = &row_old; let val_old = #row.clone(); #remove - } + }; + (insert, remove) }) - .collect::>(); + .unzip(); quote! { fn reinsert_row(&self, @@ -160,7 +171,8 @@ impl Generator { ) -> core::result::Result<(), IndexError<#available_index_ident>> { let mut inserted_indexes: Vec<#available_index_ident> = vec![]; - #(#reinsert_rows)* + #(#insert_rows)* + #(#remove_rows)* core::result::Result::Ok(()) } } diff --git a/codegen/src/worktable/generator/queries/unsized_.rs b/codegen/src/worktable/generator/queries/unsized_.rs index 316455f..a1392d4 100644 --- a/codegen/src/worktable/generator/queries/unsized_.rs +++ b/codegen/src/worktable/generator/queries/unsized_.rs @@ -38,7 +38,7 @@ impl Generator { quote! { fn #fn_ident(&self, link: Link) -> core::result::Result { self.0.data - .with_ref(link, |row_ref| row_ref.inner.#f.len()) + .with_ref(link, |row_ref| row_ref.inner.#f.as_str().to_string().aligned_size()) .map_err(WorkTableError::PagesError) } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 8aeda8b..aefd6fd 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -238,7 +238,7 @@ impl Generator { .map(|f| { let fn_ident = Ident::new(format!("get_{f}_size").as_str(), Span::call_site()); quote! { - need_to_reinsert |= archived_row.#fn_ident() > self.#fn_ident(link)?; + need_to_reinsert |= archived_row.#fn_ident() >= self.#fn_ident(link)?; } }) .collect(); @@ -253,7 +253,7 @@ impl Generator { let full_row_lock = self.gen_full_lock_for_update(); quote! { - let mut need_to_reinsert = false; + let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { lock.unlock(); @@ -532,7 +532,7 @@ impl Generator { .map(|f| { let fn_ident = Ident::new(format!("get_{f}_size").as_str(), Span::call_site()); quote! { - need_to_reinsert |= archived_row.#fn_ident() > self.#fn_ident(link)?; + need_to_reinsert |= archived_row.#fn_ident() >= self.#fn_ident(link)?; } }) .collect(); @@ -547,7 +547,7 @@ impl Generator { let full_row_lock = self.gen_full_lock_for_update(); quote! { - let mut need_to_reinsert = false; + let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { let op_lock = locks.remove(&pk).expect("should not be deleted as links are unique"); @@ -567,6 +567,8 @@ impl Generator { lock.unlock(); // Releases locks self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks + + continue; } else { pk_to_unlock.insert(pk.clone(), locks.remove(&pk).expect("should not be deleted as links are unique")); } diff --git a/codegen/src/worktable/generator/row.rs b/codegen/src/worktable/generator/row.rs index d2cf2bc..5d35368 100644 --- a/codegen/src/worktable/generator/row.rs +++ b/codegen/src/worktable/generator/row.rs @@ -102,7 +102,7 @@ impl Generator { quote! { impl Query<#ident> for #ident { fn merge(self, row: #ident) -> #ident { - row + self } } } diff --git a/src/index/table_index/cdc.rs b/src/index/table_index/cdc.rs index 91551fb..eef930a 100644 --- a/src/index/table_index/cdc.rs +++ b/src/index/table_index/cdc.rs @@ -30,12 +30,13 @@ where (res, evs.into_iter().map(Into::into).collect()) } + // TODO: refactor this to be more straightforward fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>> { let (res, evs) = self.insert_cdc(value, link); if res.is_some() { - Some(evs.into_iter().map(Into::into).collect()) - } else { None + } else { + Some(evs.into_iter().map(Into::into).collect()) } } diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index 4ed4d2d..6272cbb 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -28,7 +28,11 @@ where } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - self.insert(value, link).map(|_| ()) + if self.insert(value, link).is_some() { + None + } else { + Some(()) + } } fn remove(&self, value: T, link: Link) -> Option<(T, Link)> { diff --git a/src/table/mod.rs b/src/table/mod.rs index 549903c..97d172a 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -330,6 +330,9 @@ where LockType: 'static, { let pk = row_new.get_primary_key().clone(); + if pk != row_old.get_primary_key() { + return Err(WorkTableError::PrimaryUpdateTry); + } let old_link = self .pk_map .get(&pk) @@ -344,40 +347,32 @@ where .with_mut_ref(new_link, |r| r.unghost()) .map_err(WorkTableError::PagesError)? } - // we can not check for existence here. - if self.pk_map.checked_insert(pk.clone(), new_link).is_some() { - let indexes_res = - self.indexes - .reinsert_row(row_old, old_link, row_new.clone(), new_link); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.pk_map.remove(&pk); - self.indexes - .delete_from_indexes(row_new, new_link, inserted_already)?; - self.data - .delete(new_link) - .map_err(WorkTableError::PagesError)?; + self.pk_map.insert(pk.clone(), new_link); - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - self.data - .delete(old_link) - .map_err(WorkTableError::PagesError)?; - Ok(pk) - } else { - // primary key violated. - self.data - .delete(new_link) - .map_err(WorkTableError::PagesError)?; - Err(WorkTableError::AlreadyExists("Primary".to_string())) + let indexes_res = self + .indexes + .reinsert_row(row_old, old_link, row_new.clone(), new_link); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.indexes + .delete_from_indexes(row_new, new_link, inserted_already)?; + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; + + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; } + self.data + .delete(old_link) + .map_err(WorkTableError::PagesError)?; + Ok(pk) } #[allow(clippy::type_complexity)] @@ -410,6 +405,9 @@ where AvailableIndexes: Debug + AvailableIndex, { let pk = row_new.get_primary_key().clone(); + if pk != row_old.get_primary_key() { + return Err(WorkTableError::PrimaryUpdateTry); + } let old_link = self .pk_map .get(&pk) @@ -424,54 +422,46 @@ where .with_mut_ref(new_link, |r| r.unghost()) .map_err(WorkTableError::PagesError)? } - if let Some(primary_key_events) = self.pk_map.checked_insert_cdc(pk.clone(), new_link) { - let indexes_res = - self.indexes - .reinsert_row_cdc(row_old, old_link, row_new.clone(), new_link); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.pk_map.remove(&pk); - self.indexes - .delete_from_indexes(row_new, new_link, inserted_already)?; - self.data - .delete(new_link) - .map_err(WorkTableError::PagesError)?; + let (_, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), new_link); + let indexes_res = + self.indexes + .reinsert_row_cdc(row_old, old_link, row_new.clone(), new_link); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.indexes + .delete_from_indexes(row_new, new_link, inserted_already)?; + self.data + .delete(new_link) + .map_err(WorkTableError::PagesError)?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } - self.data - .delete(old_link) - .map_err(WorkTableError::PagesError)?; - let bytes = self - .data - .select_raw(new_link) - .map_err(WorkTableError::PagesError)?; + self.data + .delete(old_link) + .map_err(WorkTableError::PagesError)?; + let bytes = self + .data + .select_raw(new_link) + .map_err(WorkTableError::PagesError)?; - let op = Operation::Insert(InsertOperation { - id: OperationId::Single(Uuid::now_v7()), - pk_gen_state: self.pk_gen.get_state(), - primary_key_events, - secondary_keys_events: indexes_res.expect("was checked just before"), - bytes, - link: new_link, - }); + let op = Operation::Insert(InsertOperation { + id: OperationId::Single(Uuid::now_v7()), + pk_gen_state: self.pk_gen.get_state(), + primary_key_events, + secondary_keys_events: indexes_res.expect("was checked just before"), + bytes, + link: new_link, + }); - Ok((pk, op)) - } else { - // primary key violated. - self.data - .delete(new_link) - .map_err(WorkTableError::PagesError)?; - Err(WorkTableError::AlreadyExists("Primary".to_string())) - } + Ok((pk, op)) } } @@ -482,5 +472,6 @@ pub enum WorkTableError { AlreadyExists(#[error(not(source))] String), SerializeError, SecondaryIndexError, + PrimaryUpdateTry, PagesError(in_memory::PagesExecutionError), } diff --git a/tests/worktable/index/update_by_pk.rs b/tests/worktable/index/update_by_pk.rs index 7230735..e211b81 100644 --- a/tests/worktable/index/update_by_pk.rs +++ b/tests/worktable/index/update_by_pk.rs @@ -122,3 +122,93 @@ async fn update_by_pk_non_unique_indexes() { .expect("rows"); assert_eq!(updated.first(), None); } + +#[tokio::test] +async fn update_by_pk_with_reinsert_and_secondary_unique_violation() { + let test_table = Test3UniqueWorkTable::default(); + + let row1 = Test3UniqueRow { + val: 1, + attr1: "TEST".to_string(), + attr2: 1000, + attr3: 65000, + id: 0, + }; + test_table.insert(row1.clone()).unwrap(); + let row2 = Test3UniqueRow { + val: 1, + attr1: "TEST__________________1".to_string(), + attr2: 1001, + attr3: 65001, + id: 1, + }; + test_table.insert(row2.clone()).unwrap(); + let update = UniqueThreeAttrByIdQuery { + attr1: row2.attr1.clone(), + attr2: 999, + attr3: 0, + }; + assert!(test_table + .update_unique_three_attr_by_id(update, row1.id) + .await + .is_err()); + + assert_eq!( + test_table.select_by_attr1(row1.attr1.clone()).unwrap(), + row1 + ); + assert_eq!(test_table.select_by_attr2(row1.attr2).unwrap(), row1); + assert_eq!(test_table.select_by_attr3(row1.attr3).unwrap(), row1); + + assert_eq!( + test_table.select_by_attr1(row2.attr1.clone()).unwrap(), + row2 + ); + assert_eq!(test_table.select_by_attr2(row2.attr2).unwrap(), row2); + assert_eq!(test_table.select_by_attr3(row2.attr3).unwrap(), row2); +} + +#[tokio::test] +async fn update_by_pk_with_secondary_unique_violation() { + let test_table = Test3UniqueWorkTable::default(); + + let row1 = Test3UniqueRow { + val: 1, + attr1: "TEST".to_string(), + attr2: 1000, + attr3: 65000, + id: 0, + }; + test_table.insert(row1.clone()).unwrap(); + let row2 = Test3UniqueRow { + val: 1, + attr1: "TEST__________________1".to_string(), + attr2: 1001, + attr3: 65001, + id: 1, + }; + test_table.insert(row2.clone()).unwrap(); + let update = UniqueThreeAttrByIdQuery { + attr1: row1.attr1.clone(), + attr2: row2.attr2, + attr3: 0, + }; + assert!(test_table + .update_unique_three_attr_by_id(update, row1.id) + .await + .is_err()); + + assert_eq!( + test_table.select_by_attr1(row1.attr1.clone()).unwrap(), + row1 + ); + assert_eq!(test_table.select_by_attr2(row1.attr2).unwrap(), row1); + assert_eq!(test_table.select_by_attr3(row1.attr3).unwrap(), row1); + + assert_eq!( + test_table.select_by_attr1(row2.attr1.clone()).unwrap(), + row2 + ); + assert_eq!(test_table.select_by_attr2(row2.attr2).unwrap(), row2); + assert_eq!(test_table.select_by_attr3(row2.attr3).unwrap(), row2); +} diff --git a/tests/worktable/index/update_full.rs b/tests/worktable/index/update_full.rs index 27c5402..ceb2661 100644 --- a/tests/worktable/index/update_full.rs +++ b/tests/worktable/index/update_full.rs @@ -310,6 +310,13 @@ async fn update_by_full_row_with_reinsert_and_primary_key_violation() { update.attr1 = "TEST_______________________1".to_string(); assert!(test_table.update(update).await.is_err()); + assert_eq!( + test_table.select_by_attr1(row1.attr1.clone()).unwrap(), + row1 + ); + assert_eq!(test_table.select_by_attr2(row1.attr2).unwrap(), row1); + assert_eq!(test_table.select_by_attr3(row1.attr3).unwrap(), row1); + assert_eq!( test_table.select_by_attr1(row2.attr1.clone()).unwrap(), row2 @@ -342,6 +349,52 @@ async fn update_by_full_row_with_reinsert_and_secondary_unique_violation() { update.attr1 = row2.attr1.clone(); assert!(test_table.update(update).await.is_err()); + assert_eq!( + test_table.select_by_attr1(row1.attr1.clone()).unwrap(), + row1 + ); + assert_eq!(test_table.select_by_attr2(row1.attr2).unwrap(), row1); + assert_eq!(test_table.select_by_attr3(row1.attr3).unwrap(), row1); + + assert_eq!( + test_table.select_by_attr1(row2.attr1.clone()).unwrap(), + row2 + ); + assert_eq!(test_table.select_by_attr2(row2.attr2).unwrap(), row2); + assert_eq!(test_table.select_by_attr3(row2.attr3).unwrap(), row2); +} + +#[tokio::test] +async fn update_by_full_row_with_secondary_unique_violation() { + let test_table = Test3UniqueWorkTable::default(); + + let row1 = Test3UniqueRow { + val: 1, + attr1: "TEST".to_string(), + attr2: 1000, + attr3: 65000, + id: 0, + }; + test_table.insert(row1.clone()).unwrap(); + let row2 = Test3UniqueRow { + val: 1, + attr1: "TEST1".to_string(), + attr2: 1001, + attr3: 65001, + id: 1, + }; + test_table.insert(row2.clone()).unwrap(); + let mut update = row1.clone(); + update.attr2 = row2.attr2.clone(); + assert!(test_table.update(update).await.is_err()); + + assert_eq!( + test_table.select_by_attr1(row1.attr1.clone()).unwrap(), + row1 + ); + assert_eq!(test_table.select_by_attr2(row1.attr2).unwrap(), row1); + assert_eq!(test_table.select_by_attr3(row1.attr3).unwrap(), row1); + assert_eq!( test_table.select_by_attr1(row2.attr1.clone()).unwrap(), row2 From a709147e108be667a5234e8bbd500eb63cc1b0e7 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 5 Aug 2025 03:03:16 +0300 Subject: [PATCH 6/6] bump --- Cargo.toml | 12 ++++++------ codegen/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 92a5acf..86f3d58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.7.0" +version = "0.7.1" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "0.7.0" } +worktable_codegen = { path = "codegen", version = "0.7.1" } eyre = "0.6.12" derive_more = { version = "1.0.0", features = ["from", "error", "display", "into"] } @@ -27,14 +27,14 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -# data_bucket = "0.2.9" +data_bucket = "0.2.10" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -data_bucket = { path = "../DataBucket", version = "0.2.9" } +# data_bucket = { path = "../DataBucket", version = "0.2.9" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } -indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } -# indexset = { package = "wt-indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } +# indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.5", features = ["concurrent", "cdc", "multimap"] } +indexset = { package = "wt-indexset", version = "0.12.6", features = ["concurrent", "cdc", "multimap"] } convert_case = "0.6.0" ordered-float = "5.0.0" parking_lot = "0.12.3" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 50751a5..18eddf2 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.7.0" +version = "0.7.1" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate"