Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codegen/src/persist_index/space.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Generator {
.collect();

quote! {
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct #ident {
#(#fields)*
}
Expand Down
230 changes: 138 additions & 92 deletions codegen/src/worktable/generator/queries/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ impl Generator {
fn gen_full_row_update(&mut self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let row_ident = name_generator.get_row_type_ident();
let avt_type_ident = name_generator.get_available_type_ident();
let lock_ident = name_generator.get_lock_type_ident();

let row_updates = self
Expand All @@ -55,25 +54,9 @@ impl Generator {
.map(|idx| idx.field.clone())
.collect();

let diff_container = quote! {
let row_old = self.select(pk.clone()).unwrap();
let row_new = row.clone();
let updated_bytes: Vec<u8> = vec![];
let mut diffs: std::collections::HashMap<&str, Difference<#avt_type_ident>> = std::collections::HashMap::new();
};
let diff_process = self.gen_process_diffs_on_index(idents.as_slice(), idents.as_slice());
let persist_call = if self.is_persist {
quote! {
if let Operation::Update(op) = &mut op {
op.bytes = self.0.data.select_raw(link)?;
} else {
unreachable!("")
};
self.2.apply_operation(op);
}
} else {
quote! {}
};
let diff_process = self.gen_process_diffs_on_index(idents.as_slice(), Some(&idents));
let persist_call = self.gen_persist_call();
let persist_op = self.gen_persist_op();

quote! {
pub async fn update(&self, row: #row_ident) -> core::result::Result<(), WorkTableError> {
Expand All @@ -94,8 +77,8 @@ impl Generator {
.map(|v| v.get().value)
.ok_or(WorkTableError::NotFound)?;

#diff_container
#diff_process
#persist_op

unsafe { self.0.data.with_mut_ref(link, move |archived| {
#(#row_updates)*
Expand Down Expand Up @@ -142,9 +125,21 @@ impl Generator {
let index_name = &index.name;

if index.is_unique {
self.gen_unique_update(snake_case_name, name, index_name, idents)
self.gen_unique_update(
snake_case_name,
name,
index_name,
idents,
indexes_columns.as_ref(),
)
} else {
self.gen_non_unique_update(snake_case_name, name, index_name, idents)
self.gen_non_unique_update(
snake_case_name,
name,
index_name,
idents,
indexes_columns.as_ref(),
)
}
} else if self.columns.primary_keys.len() == 1 {
if *self.columns.primary_keys.first().unwrap() == op.by {
Expand All @@ -163,36 +158,28 @@ impl Generator {
}
}

fn gen_process_diffs_on_index(&self, idents: &[Ident], idx_idents: &[Ident]) -> TokenStream {
fn gen_persist_call(&self) -> TokenStream {
if self.is_persist {
quote! {
if let Operation::Update(op) = &mut op {
op.bytes = self.0.data.select_raw(link)?;
} else {
unreachable!("")
};
self.2.apply_operation(op);
}
} else {
quote! {}
}
}

fn gen_persist_op(&self) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let avt_type_ident = name_generator.get_available_type_ident();
let secondary_events_ident = name_generator.get_space_secondary_index_events_ident();
let primary_key_ident = name_generator.get_primary_key_type_ident();

let diff = idents
.iter()
.filter(|i| idx_idents.contains(i))
.map(|i| {
let diff_key = Literal::string(i.to_string().as_str());
quote! {
let old = &row_old.#i;
let new = &row_new.#i;

if old != new {
let diff = Difference::<#avt_type_ident> {
old: old.clone().into(),
new: new.clone().into(),
};

diffs.insert(#diff_key, diff);
}
}
})
.collect::<Vec<_>>();

let process_difference = if self.is_persist {
if self.is_persist {
quote! {
let secondary_keys_events = self.0.indexes.process_difference_cdc(link, diffs)?;
let mut op: Operation<
<<#primary_key_ident as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State,
#primary_key_ident,
Expand All @@ -205,12 +192,75 @@ impl Generator {
});
}
} else {
quote! {}
}
}

fn gen_process_diffs_on_index(
&self,
idents: &[Ident],
idx_idents: Option<&Vec<Ident>>,
) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let avt_type_ident = name_generator.get_available_type_ident();
let diff_container = if idx_idents.is_some() {
quote! {
let row_old = self.select(pk.clone()).unwrap();
let row_new = row.clone();
let updated_bytes: Vec<u8> = vec![];
let mut diffs: std::collections::HashMap<&str, Difference<#avt_type_ident>> = std::collections::HashMap::new();
}
} else {
quote! {
let updated_bytes: Vec<u8> = vec![];
}
};

let diff = if let Some(idx_idents) = idx_idents {
idents
.iter()
.filter(|i| idx_idents.contains(i))
.map(|i| {
let diff_key = Literal::string(i.to_string().as_str());
quote! {
let old = &row_old.#i;
let new = &row_new.#i;

if old != new {
let diff = Difference::<#avt_type_ident> {
old: old.clone().into(),
new: new.clone().into(),
};

diffs.insert(#diff_key, diff);
}
}
})
.collect::<Vec<_>>()
} else {
vec![]
};

let process_difference = if self.is_persist {
if idx_idents.is_some() {
quote! {
let secondary_keys_events = self.0.indexes.process_difference_cdc(link, diffs)?;
}
} else {
quote! {
let secondary_keys_events = core::default::Default::default();
}
}
} else if idx_idents.is_some() {
quote! {
self.0.indexes.process_difference(link, diffs)?;
}
} else {
quote! {}
};

quote! {
#diff_container
#(#diff)*
#process_difference
}
Expand Down Expand Up @@ -255,63 +305,38 @@ impl Generator {
})
.collect::<Vec<_>>();

let diff_process = if let Some(idx_idents) = idx_idents {
let avt_type_ident = name_generator.get_available_type_ident();
let diff_container = quote! {
let row_old = self.select(by.clone()).unwrap();
let row_new = row.clone();
let updated_bytes: Vec<u8> = vec![];
let mut diffs: std::collections::HashMap<&str, Difference<#avt_type_ident>> = std::collections::HashMap::new();
};

let process = self.gen_process_diffs_on_index(idents, idx_idents.as_slice());
quote! {
#diff_container
#process
}
} else {
quote! {}
};
let persist_call = if self.is_persist {
quote! {
if let Operation::Update(op) = &mut op {
op.bytes = self.0.data.select_raw(link)?;
} else {
unreachable!("")
};
self.2.apply_operation(op);
}
} else {
quote! {}
};
let diff_process = self.gen_process_diffs_on_index(idents, idx_idents);
let persist_call = self.gen_persist_call();
let persist_op = self.gen_persist_op();

quote! {
pub async fn #method_ident(&self, row: #query_ident, by: #pk_ident) -> core::result::Result<(), WorkTableError> {
if let Some(lock) = self.0.lock_map.get(&by) {
pub async fn #method_ident(&self, row: #query_ident, pk: #pk_ident) -> core::result::Result<(), WorkTableError> {
if let Some(lock) = self.0.lock_map.get(&pk) {
lock.#lock_await_ident().await; // Waiting for all locks released
}
let lock_id = self.0.lock_map.next_id();
let mut lock = #lock_type_ident::new(lock_id.into()); //Creates new LockType with None
lock.#lock_ident();

self.0.lock_map.insert(by.clone(), std::sync::Arc::new(lock.clone()));
self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone()));

let mut bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&row).map_err(|_| WorkTableError::SerializeError)?;
let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() };
let link = self.0
.pk_map
.get(&by)
.get(&pk)
.map(|v| v.get().value)
.ok_or(WorkTableError::NotFound)?;

#diff_process
#persist_op

unsafe { self.0.data.with_mut_ref(link, |archived| {
#(#row_updates)*
}).map_err(WorkTableError::PagesError)? };

lock.#unlock_ident();
self.0.lock_map.remove(&by);
self.0.lock_map.remove(&pk);

#persist_call

Expand All @@ -326,6 +351,7 @@ impl Generator {
name: &Ident,
index: &Ident,
idents: &[Ident],
idx_idents: Option<&Vec<Ident>>,
) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let lock_type_ident = name_generator.get_lock_type_ident();
Expand Down Expand Up @@ -355,11 +381,15 @@ impl Generator {
.iter()
.map(|i| {
quote! {
std::mem::swap(&mut archived.inner.#i, &mut row.#i);
std::mem::swap(&mut archived.inner.#i, &mut archived_row.#i);
}
})
.collect::<Vec<_>>();

let diff_process = self.gen_process_diffs_on_index(idents, idx_idents);
let persist_call = self.gen_persist_call();
let persist_op = self.gen_persist_op();

quote! {
pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> {
for (_, link) in self.0.indexes.#index.get(&by) {
Expand All @@ -378,18 +408,26 @@ impl Generator {
}

for (_, link) in self.0.indexes.#index.get(&by) {
let link = *link;
let pk = self.0.data.select(link)?.get_primary_key().clone();
let mut bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&row)
.map_err(|_| WorkTableError::SerializeError)?;
.map_err(|_| WorkTableError::SerializeError)?;

let mut archived_row = unsafe {
rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..])
.unseal_unchecked()
};

#diff_process
#persist_op

let mut row = unsafe {
rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..])
.unseal_unchecked()
};
unsafe {
self.0.data.with_mut_ref(*link, |archived| {
self.0.data.with_mut_ref(link, |archived| {
#(#row_updates)*
}).map_err(WorkTableError::PagesError)?;
}

#persist_call
}
for (_, link) in self.0.indexes.#index.get(&by) {
let pk = self.0.data.select(*link)?.get_primary_key();
Expand All @@ -409,6 +447,7 @@ impl Generator {
name: &Ident,
index: &Ident,
idents: &[Ident],
idx_idents: Option<&Vec<Ident>>,
) -> TokenStream {
let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string());
let lock_type_ident = name_generator.get_lock_type_ident();
Expand Down Expand Up @@ -438,17 +477,20 @@ impl Generator {
.iter()
.map(|i| {
quote! {
std::mem::swap(&mut archived.inner.#i, &mut row.#i);
std::mem::swap(&mut archived.inner.#i, &mut archived_new_row.#i);
}
})
.collect::<Vec<_>>();
let diff_process = self.gen_process_diffs_on_index(idents, idx_idents);
let persist_call = self.gen_persist_call();
let persist_op = self.gen_persist_op();

quote! {
pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> {
let mut bytes = rkyv::to_bytes::<rkyv::rancor::Error>(&row)
.map_err(|_| WorkTableError::SerializeError)?;

let mut row = unsafe {
let mut archived_new_row = unsafe {
rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..])
.unseal_unchecked()
};
Expand All @@ -457,7 +499,6 @@ impl Generator {
.get(&by)
.map(|kv| kv.get().value)
.ok_or(WorkTableError::NotFound)?;

let pk = self.0.data.select(link)?.get_primary_key();

if let Some(lock) = self.0.lock_map.get(&pk) {
Expand All @@ -468,6 +509,9 @@ impl Generator {
lock.#lock_ident();
self.0.lock_map.insert(pk.clone(), std::sync::Arc::new(lock.clone()));

#diff_process
#persist_op

unsafe {
self.0.data.with_mut_ref(link, |archived| {
#(#row_updates)*
Expand All @@ -477,6 +521,8 @@ impl Generator {
lock.#unlock_ident();
self.0.lock_map.remove(&pk);

#persist_call

core::result::Result::Ok(())
}
}
Expand Down
Loading
Loading