Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.6.1"
version = "0.6.2"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -25,9 +25,9 @@ lockfree = { version = "0.5.1" }
worktable_codegen = { path = "codegen", version = "0.6.0" }
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4", "v7"] }
data_bucket = "0.2.4"
data_bucket = "0.2.5"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "main" }
# data_bucket = { path = "../DataBucket", version = "0.2.3" }
# data_bucket = { path = "../DataBucket", version = "0.2.4" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
indexset = { version = "0.12.2", features = ["concurrent", "cdc", "multimap"] }
Expand Down
62 changes: 43 additions & 19 deletions src/persistence/operation.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};

use crate::persistence::space::{BatchChangeEvent, BatchData};
use crate::persistence::task::QueueInnerRow;
use crate::prelude::*;
use crate::prelude::{From, Order, SelectQueryExecutor};
use data_bucket::page::PageId;
use data_bucket::{Link, SizeMeasurable};
use derive_more::Display;
Expand All @@ -14,29 +12,55 @@ use rkyv::{Archive, Deserialize, Serialize};
use uuid::Uuid;
use worktable_codegen::{worktable, MemStat};

use crate::persistence::space::{BatchChangeEvent, BatchData};
use crate::persistence::task::QueueInnerRow;
use crate::prelude::*;
use crate::prelude::{From, Order, SelectQueryExecutor};

/// Represents page's identifier. Is unique within the table bounds
#[derive(
Archive,
Copy,
Clone,
Deserialize,
Debug,
Display,
Eq,
From,
Hash,
Ord,
PartialEq,
PartialOrd,
Serialize,
)]
#[derive(Archive, Copy, Clone, Deserialize, Debug, Display, From, Serialize)]
#[rkyv(derive(Debug, PartialOrd, PartialEq, Eq, Ord))]
pub enum OperationId {
#[from]
Single(Uuid),
Multi(Uuid),
}

impl OperationId {
fn get_id(&self) -> Uuid {
match self {
OperationId::Single(id) => *id,
OperationId::Multi(id) => *id,
}
}
}

impl Hash for OperationId {
fn hash<H: Hasher>(&self, state: &mut H) {
Hash::hash(&self.get_id(), state)
}
}

impl PartialEq for OperationId {
fn eq(&self, other: &Self) -> bool {
self.get_id().eq(&other.get_id())
}
}

impl Eq for OperationId {}

impl PartialOrd for OperationId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for OperationId {
fn cmp(&self, other: &Self) -> Ordering {
self.get_id().cmp(&other.get_id())
}
}

impl SizeMeasurable for OperationId {
fn aligned_size(&self) -> usize {
Uuid::default().aligned_size()
Expand Down
1 change: 1 addition & 0 deletions src/persistence/space/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ where
let page_to_update = if let Some(page) = page {
page
} else {
//println!("Trying to parse page {}", page_index);
let page = parse_page::<IndexPage<T>, INNER_PAGE_SIZE>(
&mut self.index_file,
page_index.into(),
Expand Down
21 changes: 16 additions & 5 deletions src/persistence/space/index/table_of_contents.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

Expand All @@ -24,7 +25,7 @@ pub struct IndexTableOfContents<T: Ord + Eq, const DATA_LENGTH: u32> {

impl<T, const DATA_LENGTH: u32> IndexTableOfContents<T, DATA_LENGTH>
where
T: SizeMeasurable + Ord + Eq,
T: Debug + SizeMeasurable + Ord + Eq,
{
pub fn new(space_id: SpaceId, next_page_id: Arc<AtomicU32>) -> Self {
let page_id = next_page_id.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -114,9 +115,19 @@ where
self.pages.iter().flat_map(|v| v.inner.iter())
}

pub fn update_key(&mut self, old_key: &T, new_key: T) {
pub fn update_key(&mut self, old_key: &T, new_key: T)
where
T: Clone + Debug,
{
let page = self.get_current_page_mut();
page.inner.update_key(old_key, new_key);
if page.inner.update_key(old_key, new_key.clone()).is_none() {
for page in self.pages.iter_mut() {
if page.inner.update_key(old_key, new_key.clone()).is_some() {
return;
}
}
panic!("Page with key {:?} not found", old_key);
}
}

pub fn pop_empty_page_id(&mut self) -> Option<PageId> {
Expand Down Expand Up @@ -166,15 +177,15 @@ where
})
} else {
let mut table_of_contents_pages = vec![page];
let mut index = 2;
let mut index = table_of_contents_pages[0].header.next_id.into();
let mut ind = false;

while !ind {
let page =
parse_page::<TableOfContentsPage<T>, DATA_LENGTH>(file, index).await?;
ind = page.header.next_id.is_empty();
index = page.header.next_id.into();
table_of_contents_pages.push(page);
index += 1;
}

Ok(Self {
Expand Down
5 changes: 3 additions & 2 deletions src/persistence/space/index/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use data_bucket::{
GeneralHeader, GeneralPage, IndexPage, Link, PageType, SizeMeasurable, UnsizedIndexPage,
VariableSizeMeasurable,
};
use std::fmt::Debug;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

Expand All @@ -14,7 +15,7 @@ pub fn map_index_pages_to_toc_and_general<T, const DATA_LENGTH: u32>(
Vec<GeneralPage<IndexPage<T>>>,
)
where
T: Clone + Default + Ord + Eq + SizeMeasurable,
T: Debug + Clone + Default + Ord + Eq + SizeMeasurable,
{
let mut general_index_pages = vec![];
let next_page_id = Arc::new(AtomicU32::new(1));
Expand Down Expand Up @@ -44,7 +45,7 @@ pub fn map_unsized_index_pages_to_toc_and_general<T, const DATA_LENGTH: u32>(
Vec<GeneralPage<UnsizedIndexPage<T, DATA_LENGTH>>>,
)
where
T: Clone + Default + Ord + Eq + SizeMeasurable + VariableSizeMeasurable,
T: Clone + Debug + Default + Ord + Eq + SizeMeasurable + VariableSizeMeasurable,
{
let mut general_index_pages = vec![];
let next_page_id = Arc::new(AtomicU32::new(1));
Expand Down
123 changes: 79 additions & 44 deletions src/persistence/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ worktable! (
},
);

const MAX_PAGE_AMOUNT: usize = 16;

pub struct QueueAnalyzer<PrimaryKeyGenState, PrimaryKey, SecondaryKeys> {
operations: OptimizedVec<Operation<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>>,
queue_inner_wt: QueueInnerWorkTable,
Expand Down Expand Up @@ -96,54 +98,87 @@ where
PrimaryKey: Clone,
SecondaryKeys: Clone,
{
let ops_rows = self
.queue_inner_wt
.select_by_operation_id(op_id)
.execute()?;

let mut ops_set = HashSet::new();
let mut used_page_ids = HashSet::new();

let used_page_ids = ops_rows.iter().map(|r| r.page_id).collect::<HashSet<_>>();
// We collect all ops available for pages that are used in our current op_id
for page_id in used_page_ids.iter() {
let page_ops = self.queue_inner_wt.select_by_page_id(*page_id).execute()?;
ops_set.extend(page_ops.into_iter().map(|r| r.operation_id));
}
// After we need to find out if multi ops are using same pages, and if not,
// we need to find the first multi op that blocks batch update by using
// another page.
let mut block_op_id = None;
for op_id in ops_set.iter().filter(|op_id| match op_id {
OperationId::Single(_) => false,
OperationId::Multi(_) => true,
}) {
let rows = self
let mut next_op_id = op_id;
let mut no_more_ops = false;
while used_page_ids.len() < MAX_PAGE_AMOUNT && !no_more_ops {
let ops_rows = self
.queue_inner_wt
.select_by_operation_id(*op_id)
.select_by_operation_id(next_op_id)
.execute()?;
let pages = rows.iter().map(|r| r.page_id).collect::<HashSet<_>>();
// if pages used by multi op are not available is used_page_ids set, it's blocker op.
for page in pages.iter() {
if !used_page_ids.contains(page) {
if let Some(block_op_id) = block_op_id.as_mut() {
if *block_op_id > *op_id {
*block_op_id = *op_id
match next_op_id {
OperationId::Single(_) => {
let page_id = ops_rows
.first()
.expect("at least one row should be available as operation exists")
.page_id;
used_page_ids.insert(page_id);
let page_ops = self.queue_inner_wt.select_by_page_id(page_id).execute()?;
let max_op_id = &mut next_op_id;
ops_set.extend(page_ops.into_iter().map(move |r| {
if r.operation_id > *max_op_id {
*max_op_id = r.operation_id
}
} else {
block_op_id = Some(*op_id)
r.operation_id
}));
}
OperationId::Multi(_) => {
let mut ops_set_to_extend = HashSet::new();
used_page_ids.extend(ops_rows.iter().map(|r| r.page_id));
for page_id in ops_rows.iter().map(|r| r.page_id) {
let page_ops = self.queue_inner_wt.select_by_page_id(page_id).execute()?;
ops_set_to_extend.extend(page_ops.into_iter().map(|r| r.operation_id));
}
let mut block_op_id = None;
for op_id in ops_set_to_extend.iter().filter(|op_id| match op_id {
OperationId::Single(_) => false,
OperationId::Multi(_) => true,
}) {
let rows = self
.queue_inner_wt
.select_by_operation_id(*op_id)
.execute()?;
let pages = rows.iter().map(|r| r.page_id).collect::<HashSet<_>>();
// if pages used by multi op are not available is used_page_ids set, it's blocker op
for page in pages.iter() {
if !used_page_ids.contains(page) {
if let Some(block_op_id) = block_op_id.as_mut() {
if *block_op_id > *op_id {
*block_op_id = *op_id
}
} else {
block_op_id = Some(*op_id)
}
}
}
}
// And if we found some blocker, we need to remove all ops after blocking op.
let ops_set_to_extend = if let Some(block_op_id) = block_op_id {
ops_set_to_extend
.into_iter()
.filter(|op_id| *op_id >= block_op_id)
.collect()
} else {
ops_set_to_extend
};
ops_set.extend(ops_set_to_extend);
no_more_ops = true;
}
};
let mut range = self
.queue_inner_wt
.0
.indexes
.operation_id_idx
.range(next_op_id..);
if let Some((id, _)) = range.nth(1) {
next_op_id = *id;
} else {
no_more_ops = true
}
}
// And if we found some blocker, we need to remove all ops after blocking op.
let ops_set = if let Some(block_op_id) = block_op_id {
ops_set
.into_iter()
.filter(|op_id| *op_id >= block_op_id)
.collect()
} else {
ops_set
};
// After this point, we have ops set ready for batch generation.
let mut ops_pos_set = HashSet::new();
for op_id in ops_set {
Expand Down Expand Up @@ -307,11 +342,11 @@ impl<PrimaryKeyGenState, PrimaryKey, SecondaryKeys>
tracing::warn!("Error collecting batch operation: {}", e);
} else {
let batch_op = batch_op.unwrap();
// println!(
// "Batch len is {}, queue len is {}",
// batch_op.ops.len(),
// analyzer.len()
// );
println!(
"Batch len is {}, queue len is {}",
batch_op.ops.len(),
analyzer.len()
);
let res = engine.apply_batch_operation(batch_op).await;
if let Err(e) = res {
tracing::warn!(
Expand Down
Binary file modified tests/data/expected/persist_index_table_of_contents.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/space_index/process_create_node.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/space_index/process_insert_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/space_index/process_remove_at_node_id.wt.idx
Binary file not shown.
Binary file modified tests/data/expected/space_index/process_remove_node.wt.idx
Binary file not shown.
Binary file modified tests/data/expected/space_index/process_split_node.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/space_index_unsized/process_insert_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/space_index_unsized/process_remove_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/expected/test_persist/another_idx.wt.idx
Binary file not shown.
Binary file modified tests/data/expected/test_persist/primary.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/persist_index_table_of_contents.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/indexset/process_create_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/indexset/process_insert_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index/process_create_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_create_node_after_remove.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_create_second_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_insert_at.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_insert_at_big_amount.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_insert_at_removed_place.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index/process_remove_at.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_remove_at_node_id.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_remove_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index/process_split_node.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index_unsized/indexset/process_insert_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_create_node.wt.idx
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_create_second_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_insert_at.wt.idx
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_remove_at.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_remove_at_node_id.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_remove_node.wt.idx
Binary file not shown.
Binary file modified tests/data/space_index_unsized/process_split_node.wt.idx
Binary file not shown.
Binary file modified tests/data/test_persist/another_idx.wt.idx
Binary file not shown.
Binary file modified tests/data/test_persist/primary.wt.idx
Binary file not shown.
Binary file modified tests/data/test_without_secondary_indexes/primary.wt.idx
Binary file not shown.
2 changes: 1 addition & 1 deletion tests/persistence/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn test_space_insert_many_sync() {
let table = TestSyncWorkTable::load_from_file(config.clone())
.await
.unwrap();
for i in 0..20 {
for i in 0..5_000 {
let pk = {
let row = TestSyncRow {
another: i,
Expand Down
Loading