Skip to content

[ENH]: More concurrent blockfilewriter #4889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 148 additions & 99 deletions rust/blockstore/src/arrow/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::arrow::root::CURRENT_VERSION;
use crate::arrow::sparse_index::SparseIndexWriter;
use crate::key::CompositeKey;
use crate::key::KeyWrapper;
use chroma_cache::AysncPartitionedMutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

There's a typo in the import name: AysncPartitionedMutex should be AsyncPartitionedMutex (missing the first 'n' in Async). This is present in several places in your code.

use chroma_error::ChromaError;
use chroma_error::ErrorCodes;
use chroma_storage::admissioncontrolleds3::StorageRequestPriority;
Expand All @@ -31,7 +32,7 @@ pub struct ArrowUnorderedBlockfileWriter {
block_deltas: Arc<Mutex<HashMap<Uuid, UnorderedBlockDelta>>>,
root: RootWriter,
id: Uuid,
write_mutex: Arc<tokio::sync::Mutex<()>>,
deltas_mutex: Arc<AysncPartitionedMutex<Uuid>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Great work implementing the partitioned mutex approach! This will significantly improve concurrency by allowing multiple operations on different blocks to proceed in parallel. One minor suggestion: consider adding a brief comment explaining the OCC/PCC approach near where the mutex is declared to help future developers understand the implementation.

}
// TODO: method visibility should not be pub(crate)

Expand Down Expand Up @@ -79,7 +80,7 @@ impl ArrowUnorderedBlockfileWriter {
block_deltas,
root: root_writer,
id,
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
deltas_mutex: Arc::new(AysncPartitionedMutex::new(())),
}
}

Expand All @@ -98,7 +99,7 @@ impl ArrowUnorderedBlockfileWriter {
block_deltas,
root: new_root,
id,
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
deltas_mutex: Arc::new(AysncPartitionedMutex::new(())),
}
}

Expand Down Expand Up @@ -158,118 +159,155 @@ impl ArrowUnorderedBlockfileWriter {
Ok(flusher)
}

// TODO: value must be smaller than the block size except for position lists, which are a special case
// where we split the value across multiple blocks
pub(crate) async fn set<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
prefix: &str,
key: K,
value: V,
) -> Result<(), Box<dyn ChromaError>> {
// TODO: for now the BF writer locks the entire write operation
let _guard = self.write_mutex.lock().await;

// TODO: value must be smaller than the block size except for position lists, which are a special case
// where we split the value across multiple blocks

// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory
let (_guard, target_block_id) = loop {
// Get the target block id for the key
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// Lock the delta for the target block id
let delta_guard = self.deltas_mutex.lock(&target_block_id).await;

// Recheck if the target block id is still the same. Otherwise, someone concurrently
// modified the root and we need to restart.
let new_target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
// Someone concurrently converted the block to delta and/or split it so restart all over.
if new_target_block_id != target_block_id {
continue;
}
break (delta_guard, target_block_id);
};

let delta = {
let deltas = self.block_deltas.lock();
deltas.get(&target_block_id).cloned()
};

let delta = match delta {
None => {
let block = match self
.block_manager
.get(
&self.root.prefix_path,
&target_block_id,
StorageRequestPriority::P0,
)
.await
{
Ok(Some(block)) => block,
Ok(None) => {
return Err(Box::new(ArrowBlockfileError::BlockNotFound));
}
Err(e) => {
return Err(Box::new(e));
}
};
let new_delta = match self
.block_manager
.fork::<K, V, UnorderedBlockDelta>(&block.id, &self.root.prefix_path)
.await
{
Ok(delta) => delta,
Err(e) => {
return Err(Box::new(e));
if let Some(delta) = delta {
// Add the key, value pair to delta.
// Then check if its over size and split as needed
delta.add(prefix, key, value);

if delta.get_size::<K, V>() > self.block_manager.max_block_size_bytes() {
let new_blocks = delta.split::<K, V>(self.block_manager.max_block_size_bytes());
// First add to deltas before making it visible through the sparse index.
// This prevents dangling references.
let blocks_to_add = {
let mut blocks_to_add = Vec::with_capacity(new_blocks.len());
let mut deltas = self.block_deltas.lock();
for (split_key, new_delta) in new_blocks {
blocks_to_add.push((split_key, new_delta.id));
deltas.insert(new_delta.id, new_delta);
}
blocks_to_add
};
let new_id = new_delta.id;
// Blocks can be empty.
// Update the sparse index atomically in one batch.
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
deltas.insert(new_id, new_delta.clone());
}
new_delta
.apply_updates(vec![], blocks_to_add)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;
}
Some(delta) => delta,
};
} else {
// Fetch the block and convert to delta.
let block = match self
.block_manager
.get(
&self.root.prefix_path,
&target_block_id,
StorageRequestPriority::P0,
)
.await
{
Ok(Some(block)) => block,
Ok(None) => {
return Err(Box::new(ArrowBlockfileError::BlockNotFound));
}
Err(e) => {
return Err(Box::new(e));
}
};
let new_delta = match self
.block_manager
.fork::<K, V, UnorderedBlockDelta>(&block.id, &self.root.prefix_path)
.await
{
Ok(delta) => delta,
Err(e) => {
return Err(Box::new(e));
}
};

// Add the key, value pair to delta.
// Then check if its over size and split as needed
delta.add(prefix, key, value);
// Add the key, value pair to delta.
// Then check if its over size and split as needed
new_delta.add(prefix, key, value);

if delta.get_size::<K, V>() > self.block_manager.max_block_size_bytes() {
let new_blocks = delta.split::<K, V>(self.block_manager.max_block_size_bytes());
for (split_key, new_delta) in new_blocks {
if new_delta.get_size::<K, V>() > self.block_manager.max_block_size_bytes() {
// First add to deltas before making it visible through the sparse index.
// This prevents dangling references.
let new_blocks = new_delta.split::<K, V>(self.block_manager.max_block_size_bytes());
let (blocks_to_add, blocks_to_replace) = {
let mut blocks_to_add = Vec::with_capacity(new_blocks.len());
let mut deltas = self.block_deltas.lock();
for (split_key, new_delta) in new_blocks {
blocks_to_add.push((split_key, new_delta.id));
deltas.insert(new_delta.id, new_delta);
}
deltas.insert(new_delta.id, new_delta.clone());
(blocks_to_add, vec![(target_block_id, new_delta.id)])
};
// Update the sparse index atomically in one batch.
self.root
.sparse_index
.add_block(split_key, new_delta.id)
.apply_updates(blocks_to_replace, blocks_to_add)
.map_err(|e| Box::new(e) as Box<dyn ChromaError>)?;

} else {
let mut deltas = self.block_deltas.lock();
deltas.insert(new_delta.id, new_delta);
deltas.insert(new_delta.id, new_delta.clone());
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
}
}

Ok(())
}

#[allow(dead_code)]
pub async fn get_owned<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
prefix: &str,
key: K,
) -> Result<Option<V::PreparedValue>, Box<dyn ChromaError>> {
// TODO: for now the BF writer locks the entire write operation
let _guard = self.write_mutex.lock().await;

// TODO: value must be smaller than the block size except for position lists, which are a special case
// where we split the value across multiple blocks

// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory
let (_guard, target_block_id) = loop {
// Get the target block id for the key
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// Lock the delta for the target block id
let delta_guard = self.deltas_mutex.lock(&target_block_id).await;

// Recheck if the target block id is still the same. Otherwise, someone concurrently
// modified the root and we need to restart.
let new_target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
// Someone concurrently converted the block to delta and/or split it so restart all over.
if new_target_block_id != target_block_id {
continue;
}
break (delta_guard, target_block_id);
};

let delta = {
let deltas = self.block_deltas.lock();
deltas.get(&target_block_id).cloned()
};

let delta = match delta {
Ok(match delta {
None => {
let block = match self
.block_manager
Expand Down Expand Up @@ -298,40 +336,50 @@ impl ArrowUnorderedBlockfileWriter {
return Err(Box::new(e));
}
};
let new_id = new_delta.id;
// Blocks can be empty.
// Read the value before making the delta visible through the sparse index.
let value = V::get_owned_value_from_delta(prefix, key.into(), &new_delta);
// Insert to delta first and then make it visible through the sparse index to
// prevent dangling references.
let mut deltas = self.block_deltas.lock();
deltas.insert(new_delta.id, new_delta.clone());
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
deltas.insert(new_id, new_delta.clone());
}
new_delta
value
}
Some(delta) => delta,
};

Ok(V::get_owned_value_from_delta(prefix, key.into(), &delta))
Some(delta) => V::get_owned_value_from_delta(prefix, key.into(), &delta),
})
}

pub(crate) async fn delete<K: ArrowWriteableKey, V: ArrowWriteableValue>(
&self,
prefix: &str,
key: K,
) -> Result<(), Box<dyn ChromaError>> {
let _guard = self.write_mutex.lock().await;
// Get the target block id for the key
let search_key = CompositeKey::new(prefix.to_string(), key.clone());
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
let (_guard, target_block_id) = loop {
// Get the target block id for the key
let target_block_id = self.root.sparse_index.get_target_block_id(&search_key);

// Lock the delta for the target block id
let delta_guard = self.deltas_mutex.lock(&target_block_id).await;

// Recheck if the target block id is still the same. Otherwise, someone concurrently
// modified the root and we need to restart.
let new_target_block_id = self.root.sparse_index.get_target_block_id(&search_key);
// Someone concurrently converted the block to delta and/or split it so restart all over.
if new_target_block_id != target_block_id {
continue;
}
break (delta_guard, target_block_id);
};

// TODO: clean this up as its redudant with the set method
let delta = {
let deltas = self.block_deltas.lock();
deltas.get(&target_block_id).cloned()
};

let delta = match delta {
match delta {
None => {
let block = match self
.block_manager
Expand Down Expand Up @@ -360,19 +408,20 @@ impl ArrowUnorderedBlockfileWriter {
return Err(Box::new(e));
}
};
let new_id = new_delta.id;
// Delete the key before making the delta visible through the sparse index.
new_delta.delete::<K, V>(prefix, key);
// Insert to delta first and then make it visible through the sparse index to
// prevent dangling references.
let mut deltas = self.block_deltas.lock();
deltas.insert(new_delta.id, new_delta.clone());
self.root
.sparse_index
.replace_block(target_block_id, new_delta.id);
{
let mut deltas = self.block_deltas.lock();
deltas.insert(new_id, new_delta.clone());
}
new_delta
}
Some(delta) => delta,
Some(delta) => {
delta.delete::<K, V>(prefix, key);
}
};
delta.delete::<K, V>(prefix, key);
Ok(())
}

Expand Down Expand Up @@ -759,7 +808,7 @@ mod tests {
arrow::config::TEST_MAX_BLOCK_SIZE_BYTES, arrow::provider::ArrowBlockfileProvider,
};
use crate::{BlockfileReader, BlockfileWriter, BlockfileWriterOptions};
use chroma_cache::new_cache_for_test;
use chroma_cache::{new_cache_for_test, AysncPartitionedMutex};
use chroma_storage::{local::LocalStorage, Storage};
use chroma_types::{CollectionUuid, DataRecord, DatabaseUuid, MetadataValue, SegmentUuid};
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -2025,7 +2074,7 @@ mod tests {
block_deltas,
root: root_writer,
id: Uuid::new_v4(),
write_mutex: Arc::new(tokio::sync::Mutex::new(())),
deltas_mutex: Arc::new(AysncPartitionedMutex::new(())),
};

let n = 2000;
Expand Down
4 changes: 3 additions & 1 deletion rust/blockstore/src/arrow/concurrency_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ mod tests {
// race condition outside the cache.
let block_cache = new_cache_for_test();
let sparse_index_cache = new_cache_for_test();
let max_block_size_bytes = 500;
let blockfile_provider = ArrowBlockfileProvider::new(
storage,
TEST_MAX_BLOCK_SIZE_BYTES,
max_block_size_bytes,
block_cache,
sparse_index_cache,
);
Expand All @@ -46,6 +47,7 @@ mod tests {
// Make the max threads the number of cores * 2
let max_threads = num_cpus::get() * 2;
let t = shuttle::rand::thread_rng().gen_range(2..max_threads);
println!("Writing {} keys with {} threads", n, t);
let mut join_handles = Vec::with_capacity(t);
for i in 0..t {
let range_start = i * n / t;
Expand Down
Loading