diff --git a/chromadb/test/property/test_add.py b/chromadb/test/property/test_add.py index 9ee3eea7d1d..88795caff63 100644 --- a/chromadb/test/property/test_add.py +++ b/chromadb/test/property/test_add.py @@ -21,6 +21,32 @@ collection_st = st.shared(strategies.collections(with_hnsw_params=True), key="coll") +@given( + collection=collection_st, + record_set=strategies.recordsets(collection_st, min_size=1, max_size=5), +) +@settings( + deadline=None, + parent=override_hypothesis_profile( + normal=hypothesis.settings(max_examples=500), + fast=hypothesis.settings(max_examples=200), + ), + max_examples=2 +) +def test_add_miniscule( + client: ClientAPI, + collection: strategies.Collection, + record_set: strategies.RecordSet, +) -> None: + if ( + client.get_settings().chroma_api_impl + == "chromadb.api.async_fastapi.AsyncFastAPI" + ): + pytest.skip( + "TODO @jai, come back and debug why CI runners fail with async + sync" + ) + _test_add(client, collection, record_set, True, always_compact=True) + # Hypothesis tends to generate smaller values so we explicitly segregate the # the tests into tiers, Small, Medium. Hypothesis struggles to generate large @@ -104,6 +130,7 @@ def _test_add( record_set: strategies.RecordSet, should_compact: bool, batch_ann_accuracy: bool = False, + always_compact: bool = False, ) -> None: create_isolated_database(client) @@ -132,7 +159,7 @@ def _test_add( if ( not NOT_CLUSTER_ONLY and should_compact - and len(normalized_record_set["ids"]) > 10 + and (len(normalized_record_set["ids"]) > 10 or always_compact) ): # Wait for the model to be updated wait_for_version_increase(client, collection.name, initial_version) diff --git a/rust/log-service/src/bin/chroma-inspect-dirty-log.rs b/rust/log-service/src/bin/chroma-inspect-dirty-log.rs index 1b6dc2523e1..7ba9253153f 100644 --- a/rust/log-service/src/bin/chroma-inspect-dirty-log.rs +++ b/rust/log-service/src/bin/chroma-inspect-dirty-log.rs @@ -15,7 +15,7 @@ async fn main() { .connect() .await .expect("could not connect to log service"); - let mut client = LogServiceClient::new(logservice); + let mut client = LogServiceClient::new(logservice).max_decoding_message_size(256 << 20); let dirty = client .inspect_dirty_log(InspectDirtyLogRequest {}) .await diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 3f3ec63b9d8..81511042c31 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -4,6 +4,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::collections::HashSet; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime}; @@ -29,6 +30,7 @@ use chroma_types::chroma_proto::{ use chroma_types::chroma_proto::{ForkLogsRequest, ForkLogsResponse}; use chroma_types::CollectionUuid; use figment::providers::{Env, Format, Yaml}; +use futures::stream::StreamExt; use opentelemetry::metrics::Meter; use parking_lot::Mutex; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -285,6 +287,10 @@ fn cache_key_for_manifest(collection_id: CollectionUuid) -> String { format!("{collection_id}::MANIFEST") } +fn cache_key_for_cursor(collection_id: CollectionUuid, name: &CursorName) -> String { + format!("{collection_id}::cursor::{}", name.path()) +} + ////////////////////////////////////////// CachedFragment ////////////////////////////////////////// #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] @@ -354,20 +360,13 @@ impl RollupPerCollection { // // The consequence of this breaking is that the offset in the log will be behind sysdb. self.start_log_position = witness - .map(|x| x.1.position) + .map(|x| x.cursor.position) .unwrap_or(LogPosition::from_offset(1)); - if self.start_log_position > self.limit_log_position { - tracing::error!( - "invariant violation; will patch: {:?} > {:?}", - self.start_log_position, - self.limit_log_position - ); - } self.limit_log_position = self.limit_log_position.max(self.start_log_position); } fn is_empty(&self) -> bool { - self.start_log_position == self.limit_log_position + self.start_log_position >= self.limit_log_position } fn dirty_marker(&self, collection_id: CollectionUuid) -> DirtyMarker { @@ -391,6 +390,15 @@ impl RollupPerCollection { } } +////////////////////////////////////////////// Rollups ///////////////////////////////////////////// + +struct Rollup { + witness: Option, + cursor: Cursor, + last_record_witnessed: LogPosition, + rollups: HashMap, +} + //////////////////////////////////////////// DirtyMarker /////////////////////////////////////////// #[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] @@ -439,9 +447,9 @@ impl DirtyMarker { fn coalesce_markers( markers: &[(LogPosition, DirtyMarker)], - ) -> Result, wal3::Error> { - let mut rollups = HashMap::new(); - let mut forget = vec![]; + rollups: &mut HashMap, + forget: &mut HashSet, + ) -> Result<(), wal3::Error> { for (_, marker) in markers { match marker { DirtyMarker::MarkDirty { @@ -466,15 +474,15 @@ impl DirtyMarker { ); } DirtyMarker::Purge { collection_id } => { - forget.push(*collection_id); + forget.insert(*collection_id); } DirtyMarker::Cleared => {} } } - for collection_id in forget { - rollups.remove(&collection_id); + for collection_id in forget.iter() { + rollups.remove(collection_id); } - Ok(rollups) + Ok(()) } } @@ -522,6 +530,13 @@ impl wal3::MarkDirty for MarkDirty { ///////////////////////////////////////////// LogServer //////////////////////////////////////////// +#[derive(Default)] +struct RollupTransientState { + rollups: HashMap, + forget: HashSet, + largest_log_position_read: LogPosition, +} + pub struct LogServer { config: LogServerConfig, storage: Arc, @@ -846,20 +861,36 @@ impl LogServer { .as_micros() as u64, writer: "TODO".to_string(), }; - if let Some(witness) = witness { + let witness = if let Some(witness) = witness.as_ref() { cursor_store - .save(cursor_name, &cursor, &witness) + .save(cursor_name, &cursor, witness) .await .map_err(|err| { Status::new(err.code().into(), format!("Failed to save cursor: {}", err)) - })?; + })? } else { cursor_store .init(cursor_name, cursor) .await .map_err(|err| { Status::new(err.code().into(), format!("Failed to init cursor: {}", err)) - })?; + })? + }; + if let Some(cache) = self.cache.as_ref() { + let cache_key = cache_key_for_cursor(collection_id, cursor_name); + match serde_json::to_string(&witness) { + Ok(json_witness) => { + let value = CachedBytes { + bytes: Vec::from(json_witness), + }; + let insert_span = tracing::info_span!("cache insert"); + cache.insert(cache_key, value).instrument(insert_span).await; + } + Err(err) => { + tracing::error!("could not serialize cursor: {err}"); + cache.remove(&cache_key).await; + } + } } let mut need_to_compact = self.need_to_compact.lock(); if let Entry::Occupied(mut entry) = need_to_compact.entry(collection_id) { @@ -868,7 +899,7 @@ impl LogServer { rollup.start_log_position, LogPosition::from_offset(adjusted_log_offset as u64), ); - if rollup.start_log_position >= rollup.limit_log_position { + if rollup.is_empty() { entry.remove(); } } @@ -887,9 +918,16 @@ impl LogServer { { let need_to_compact = self.need_to_compact.lock(); for (collection_id, rollup) in need_to_compact.iter() { - if rollup.limit_log_position >= rollup.start_log_position + if (rollup.limit_log_position >= rollup.start_log_position && rollup.limit_log_position - rollup.start_log_position - >= request.min_compaction_size + >= request.min_compaction_size) + || rollup.reinsert_count >= self.config.reinsert_threshold + || SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("time never moves to before epoch") + .as_micros() + .saturating_sub(rollup.initial_insertion_epoch_us as u128) + >= self.config.timeout_us as u128 { selected_rollups.push((*collection_id, *rollup)); } @@ -897,7 +935,7 @@ impl LogServer { } // Then allocate the collection ID strings outside the lock. let mut all_collection_info = Vec::with_capacity(selected_rollups.len()); - for (collection_id, rollup) in selected_rollups { + for (collection_id, rollup) in selected_rollups.into_iter() { all_collection_info.push(CollectionInfo { collection_id: collection_id.to_string(), first_log_offset: rollup.start_log_position.offset() as i64, @@ -913,15 +951,13 @@ impl LogServer { /// /// This will rewrite the dirty log's coalesced contents at the tail and adjust the cursor to /// said position so that the next read is O(1) if there are no more writes. - #[tracing::instrument(skip(self), err(Display))] + #[tracing::instrument(skip(self))] async fn roll_dirty_log(&self) -> Result<(), Error> { // Ensure at most one request at a time. let _guard = self.rolling_up.lock().await; - let (witness, cursor, dirty_markers) = self.read_dirty_log().await?; - self.metrics - .dirty_log_records_read - .add(dirty_markers.len() as u64, &[]); - let Some((last_record_inserted, _)) = dirty_markers.last() else { + let mut rollup = self.read_and_coalesce_dirty_log().await?; + if rollup.rollups.is_empty() { + tracing::info!("rollups is empty"); let backpressure = vec![]; self.set_backpressure(&backpressure); let mut need_to_compact = self.need_to_compact.lock(); @@ -929,12 +965,20 @@ impl LogServer { std::mem::swap(&mut *need_to_compact, &mut rollups); return Ok(()); }; - let mut rollups = DirtyMarker::coalesce_markers(&dirty_markers)?; - self.enrich_dirty_log(&mut rollups).await?; + let collections = rollup.rollups.len(); + tracing::event!( + tracing::Level::INFO, + collections = ?collections, + ); + self.enrich_dirty_log(&mut rollup.rollups).await?; + self.save_dirty_log(rollup).await + } + + async fn save_dirty_log(&self, mut rollup: Rollup) -> Result<(), Error> { let mut markers = vec![]; let mut backpressure = vec![]; let mut total_uncompacted = 0; - for (collection_id, rollup) in rollups.iter() { + for (collection_id, rollup) in rollup.rollups.iter() { if rollup.is_empty() { continue; } @@ -951,18 +995,18 @@ impl LogServer { if markers.is_empty() { markers.push(serde_json::to_string(&DirtyMarker::Cleared).map(Vec::from)?); } - let mut new_cursor = cursor.clone(); + let mut new_cursor = rollup.cursor.clone(); self.dirty_log.append_many(markers).await?; - new_cursor.position = *last_record_inserted + 1u64; + new_cursor.position = rollup.last_record_witnessed + 1u64; let Some(cursors) = self.dirty_log.cursors(CursorStoreOptions::default()) else { return Err(Error::CouldNotGetDirtyLogCursors); }; tracing::info!( "Advancing dirty log cursor {:?} -> {:?}", - cursor.position, + rollup.cursor.position, new_cursor.position ); - if let Some(witness) = witness { + if let Some(witness) = rollup.witness { cursors.save(&STABLE_PREFIX, &new_cursor, &witness).await?; } else { cursors.init(&STABLE_PREFIX, new_cursor).await?; @@ -972,16 +1016,14 @@ impl LogServer { .record(total_uncompacted as f64, &[]); self.set_backpressure(&backpressure); let mut need_to_compact = self.need_to_compact.lock(); - std::mem::swap(&mut *need_to_compact, &mut rollups); + std::mem::swap(&mut *need_to_compact, &mut rollup.rollups); Ok(()) } /// Read the entirety of a prefix of the dirty log. #[tracing::instrument(skip(self), err(Display))] #[allow(clippy::type_complexity)] - async fn read_dirty_log( - &self, - ) -> Result<(Option, Cursor, Vec<(LogPosition, DirtyMarker)>), Error> { + async fn read_and_coalesce_dirty_log(&self) -> Result { let Some(reader) = self.dirty_log.reader(LogReaderOptions::default()) else { return Err(Error::CouldNotGetDirtyLogReader); }; @@ -1000,41 +1042,84 @@ impl LogServer { .scan( cursor.position, Limits { - max_files: Some(10_000), - max_bytes: Some(1_000_000_000), - max_records: Some(10_000), + max_files: None, + max_bytes: None, + max_records: None, }, ) .await?; if dirty_fragments.is_empty() { - return Ok((witness, cursor, vec![])); + let last_record_witnessed = cursor.position; + let rollups = HashMap::default(); + let rollup = Rollup { + witness, + cursor, + last_record_witnessed, + rollups, + }; + tracing::info!("empty dirty log"); + return Ok(rollup); } - if dirty_fragments.len() >= 1_000 { + if dirty_fragments.len() >= 1000 { tracing::error!("Too many dirty fragments: {}", dirty_fragments.len()); } + let rollup = Mutex::new(RollupTransientState::default()); + let markers_read = AtomicU64::new(0); let dirty_futures = dirty_fragments .iter() - .map(|fragment| reader.read_parquet(fragment)) + .map(|fragment| async { + let (_, records, _) = reader.read_parquet(fragment).await?; + let records = records + .into_iter() + .flat_map(|x| match serde_json::from_slice::(&x.1) { + Ok(marker) => Some((x.0, marker)), + Err(err) => { + tracing::error!( + "could not read marker for {}: {err}", + String::from_utf8_lossy(&x.1) + ); + None + } + }) + .collect::>(); + markers_read.fetch_add(records.len() as u64, Ordering::Relaxed); + let mut rollup = rollup.lock(); + if let Some(max) = records.iter().map(|x| x.0).max() { + rollup.largest_log_position_read = + std::cmp::max(max, rollup.largest_log_position_read); + } + // We create a new hash set for forget because we cannot borrow rollup mutably + // twice. Further, we need to track every forget call to remove down below before + // we return the rollup. + let mut forget = HashSet::default(); + DirtyMarker::coalesce_markers(&records, &mut rollup.rollups, &mut forget)?; + rollup.forget.extend(forget); + Ok::<(), Error>(()) + }) .collect::>(); - let dirty_raw = futures::future::try_join_all(dirty_futures).await?; - let mut dirty_markers = vec![]; - for (_, records, _) in dirty_raw { - let records = records - .into_iter() - .flat_map(|x| match serde_json::from_slice::(&x.1) { - Ok(marker) => Some((x.0, marker)), - Err(err) => { - tracing::error!( - "could not read marker for {}: {err}", - String::from_utf8_lossy(&x.1) - ); - None - } - }) - .collect::>(); - dirty_markers.extend(records); + + let stream = futures::stream::iter(dirty_futures); + let mut buffered = stream.buffer_unordered(50); + while let Some(res) = buffered.next().await { + if let Err(err) = res { + tracing::error!(error = ?err); + } } - Ok((witness, cursor, dirty_markers)) + self.metrics + .dirty_log_records_read + .add(markers_read.load(Ordering::Relaxed), &[]); + let mut transient = rollup.lock(); + let last_record_witnessed = transient.largest_log_position_read; + let mut rollups = std::mem::take(&mut transient.rollups); + for forget in transient.forget.iter() { + rollups.remove(forget); + } + Ok(Rollup { + witness, + cursor, + rollups, + last_record_witnessed, + }) } /// Enrich a rolled up dirty log by reading cursors and manifests to determine what still needs @@ -1045,7 +1130,7 @@ impl LogServer { &self, rollups: &mut HashMap, ) -> Result<(), Error> { - let load_cursor = |storage, collection_id: CollectionUuid| async move { + let load_witness = |storage, collection_id: CollectionUuid| async move { let cursor = &COMPACTION; let cursor_store = CursorStore::new( CursorStoreOptions::default(), @@ -1053,17 +1138,64 @@ impl LogServer { collection_id.storage_prefix_for_log(), "rollup".to_string(), ); - let span = tracing::info_span!("cursor load", collection_id = ?collection_id); - cursor_store.load(cursor).instrument(span).await + let witness = if let Some(cache) = self.cache.as_ref() { + let key = LogKey { collection_id }; + let handle = self.open_logs.get_or_create_state(key); + let mut _active = handle.active.lock().await; + let cache_key = cache_key_for_cursor(collection_id, cursor); + let cache_span = tracing::info_span!("cache get", cache_key = ?cache_key); + if let Ok(Some(json_witness)) = cache.get(&cache_key).instrument(cache_span).await { + let witness: Witness = serde_json::from_slice(&json_witness.bytes)?; + return Ok(Some(witness)); + } + let load_span = tracing::info_span!("cursor load"); + let res = cursor_store.load(cursor).instrument(load_span).await?; + if let Some(witness) = res.as_ref() { + let json_witness = serde_json::to_string(&witness)?; + let value = CachedBytes { + bytes: Vec::from(json_witness), + }; + let insert_span = tracing::info_span!("cache insert"); + cache.insert(cache_key, value).instrument(insert_span).await; + } + res + } else { + let span = tracing::info_span!("cursor load", collection_id = ?collection_id); + cursor_store.load(cursor).instrument(span).await? + }; + Ok::, Error>(witness) }; + let mut futures = Vec::with_capacity(rollups.len()); for (collection_id, mut rollup) in std::mem::take(rollups) { - let cursor = load_cursor(&self.storage, collection_id).await?; - // NOTE(rescrv): There are two spreads that we have. - // `rollup` tracks the minimum and maximum offsets of a record on the dirty log. - // The spread between cursor (if it exists) and manifest.maximum_log_offset tracks the - // data that needs to be compacted. - rollup.witness_cursor(cursor.as_ref()); - if !rollup.is_empty() { + let load_witness = &load_witness; + futures.push(async move { + let witness = match load_witness(&self.storage, collection_id).await { + Ok(witness) => witness, + Err(err) => { + tracing::warn!("could not load cursor: {err}"); + return Some((collection_id, rollup)); + } + }; + // NOTE(rescrv): There are two spreads that we have. + // `rollup` tracks the minimum and maximum offsets of a record on the dirty log. + // The spread between cursor (if it exists) and manifest.maximum_log_offset tracks the + // data that needs to be compacted. + if let Some(witness) = witness { + rollup.witness_cursor(Some(&witness)); + } + if !rollup.is_empty() { + Some((collection_id, rollup)) + } else { + None + } + }); + } + if !futures.is_empty() { + for (collection_id, rollup) in futures::future::join_all(futures) + .await + .into_iter() + .flatten() + { rollups.insert(collection_id, rollup); } } @@ -1428,7 +1560,7 @@ impl LogServer { })?; // This is the existing compaction_offset, which is the next record to compact. let offset = witness - .map(|x| x.1.position) + .map(|x| x.cursor.position) .unwrap_or(LogPosition::from_offset(1)); tracing::event!(Level::INFO, offset = ?offset); wal3::copy( @@ -1457,7 +1589,7 @@ impl LogServer { format!("max_offset={:?} < offset={:?}", max_offset, offset), )); } - if offset != max_offset{ + if offset != max_offset { let mark_dirty = MarkDirty { collection_id: target_collection_id, dirty_log: Arc::clone(&self.dirty_log), @@ -2319,9 +2451,12 @@ mod tests { }, ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(1, rollup.len()); - let rollup = rollup.get(&collection_id).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + assert_eq!(1, rollups.len()); + let rollup = rollups.get(&collection_id).unwrap(); assert_eq!(LogPosition::from_offset(1), rollup.start_log_position); assert_eq!(LogPosition::from_offset(3), rollup.limit_log_position); assert_eq!(2, rollup.reinsert_count); @@ -2360,9 +2495,12 @@ mod tests { }, ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(2, rollup.len()); - let rollup_blocking = rollup.get(&collection_id_blocking).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + assert_eq!(2, rollups.len()); + let rollup_blocking = rollups.get(&collection_id_blocking).unwrap(); assert_eq!( LogPosition::from_offset(1), rollup_blocking.start_log_position @@ -2373,7 +2511,7 @@ mod tests { ); assert_eq!(0, rollup_blocking.reinsert_count); assert_eq!(now, rollup_blocking.initial_insertion_epoch_us); - let rollup_acting = rollup.get(&collection_id_acting).unwrap(); + let rollup_acting = rollups.get(&collection_id_acting).unwrap(); assert_eq!( LogPosition::from_offset(1), rollup_acting.start_log_position @@ -2499,9 +2637,16 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); // The purge should remove all markers for the collection, even ones that come after - assert_eq!(0, rollup.len()); + assert_eq!(1, forget.len()); + assert!(forget.contains(&collection_id)); + for collection_id in &forget { + rollups.remove(collection_id); + } + assert_eq!(0, rollups.len()); } #[test] @@ -2553,14 +2698,21 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); // collection_id1 should be completely removed due to purge // collection_id2 should remain - assert_eq!(1, rollup.len()); - assert!(rollup.contains_key(&collection_id2)); - assert!(!rollup.contains_key(&collection_id1)); + assert_eq!(1, forget.len()); + assert!(forget.contains(&collection_id1)); + for collection_id in &forget { + rollups.remove(collection_id); + } + assert_eq!(1, rollups.len()); + assert!(rollups.contains_key(&collection_id2)); + assert!(!rollups.contains_key(&collection_id1)); - let rollup2 = rollup.get(&collection_id2).unwrap(); + let rollup2 = rollups.get(&collection_id2).unwrap(); assert_eq!(LogPosition::from_offset(10), rollup2.start_log_position); assert_eq!(LogPosition::from_offset(15), rollup2.limit_log_position); } @@ -2727,8 +2879,11 @@ mod tests { #[test] fn dirty_marker_coalesce_empty_markers() { - let rollup = DirtyMarker::coalesce_markers(&[]).unwrap(); - assert!(rollup.is_empty()); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&[], &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + assert!(rollups.is_empty()); } #[test] @@ -2773,18 +2928,21 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(2, rollup.len()); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + assert_eq!(2, rollups.len()); // Check collection_id1 rollup - let rollup1 = rollup.get(&collection_id1).unwrap(); + let rollup1 = rollups.get(&collection_id1).unwrap(); assert_eq!(LogPosition::from_offset(10), rollup1.start_log_position); assert_eq!(LogPosition::from_offset(33), rollup1.limit_log_position); assert_eq!(1, rollup1.reinsert_count); // max of 1 and 0 assert_eq!(now - 1000, rollup1.initial_insertion_epoch_us); // max of now and now-1000 // Check collection_id2 rollup - let rollup2 = rollup.get(&collection_id2).unwrap(); + let rollup2 = rollups.get(&collection_id2).unwrap(); assert_eq!(LogPosition::from_offset(20), rollup2.start_log_position); assert_eq!(LogPosition::from_offset(30), rollup2.limit_log_position); assert_eq!(2, rollup2.reinsert_count); @@ -2852,8 +3010,11 @@ mod tests { }, )]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - let collection_rollup = rollup.get(&collection_id).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + let collection_rollup = rollups.get(&collection_id).unwrap(); assert_eq!( LogPosition::from_offset(u64::MAX - 1), collection_rollup.start_log_position @@ -2883,8 +3044,11 @@ mod tests { }, )]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - let collection_rollup = rollup.get(&collection_id).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + let collection_rollup = rollups.get(&collection_id).unwrap(); assert_eq!( LogPosition::from_offset(10), collection_rollup.start_log_position @@ -2927,8 +3091,11 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - let collection_rollup = rollup.get(&collection_id).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + let collection_rollup = rollups.get(&collection_id).unwrap(); assert_eq!(u64::MAX, collection_rollup.reinsert_count); } @@ -3067,8 +3234,15 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(0, rollup.len()); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert_eq!(1, forget.len()); + assert!(forget.contains(&collection_id)); + for collection_id in &forget { + rollups.remove(collection_id); + } + assert_eq!(0, rollups.len()); } #[test] @@ -3154,9 +3328,12 @@ mod tests { )); } - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(1, rollup.len()); - let collection_rollup = rollup.get(&collection_id).unwrap(); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert!(forget.is_empty()); + assert_eq!(1, rollups.len()); + let collection_rollup = rollups.get(&collection_id).unwrap(); assert_eq!( LogPosition::from_offset(0), collection_rollup.start_log_position @@ -3218,8 +3395,15 @@ mod tests { ), ]; - let rollup = DirtyMarker::coalesce_markers(&markers).unwrap(); - assert_eq!(0, rollup.len()); + let mut rollups = HashMap::new(); + let mut forget = HashSet::new(); + DirtyMarker::coalesce_markers(&markers, &mut rollups, &mut forget).unwrap(); + assert_eq!(1, forget.len()); + assert!(forget.contains(&collection_id)); + for collection_id in &forget { + rollups.remove(collection_id); + } + assert_eq!(0, rollups.len()); } #[test] diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index 8016d36cf60..ce7eb32beaa 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -138,7 +138,7 @@ impl ChromaError for GrpcUpdateCollectionLogOffsetError { #[derive(Error, Debug)] pub enum GrpcPurgeDirtyForCollectionError { - #[error("Failed to update collection log offset")] + #[error("Failed to purge dirty: {0}")] FailedToPurgeDirty(#[from] tonic::Status), } diff --git a/rust/log/tests/log-offsets.rs b/rust/log/tests/log-offsets.rs index f6a1630e91f..08cd7648900 100644 --- a/rust/log/tests/log-offsets.rs +++ b/rust/log/tests/log-offsets.rs @@ -169,24 +169,32 @@ async fn test_k8s_integration_log_offsets_empty_log_50054() { let resp = resp.into_inner(); assert_eq!(1, resp.records.len()); assert_eq!(1, resp.records[0].log_offset); - // Wait 15 seconds for the background interval. It's a magic constant in log service code. - tokio::time::sleep(std::time::Duration::from_secs(15)).await; - // "compact" said record. - let resp = rust_log_service - .get_all_collection_info_to_compact(GetAllCollectionInfoToCompactRequest { - min_compaction_size: 1, - }) - .await - .unwrap(); - let resp = resp.into_inner(); - let Some(coll) = resp - .all_collection_info - .iter() - .find(|c| c.collection_id == collection_id) - else { - panic!("collection not found"); - }; - assert_eq!(1, coll.first_log_offset); + let mut found = false; + // Wait 20 seconds for the background interval. 10s is a magic constant in log service code. + for _ in 0..200 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // "compact" said record. + let resp = rust_log_service + .get_all_collection_info_to_compact(GetAllCollectionInfoToCompactRequest { + min_compaction_size: 1, + }) + .await + .unwrap(); + let resp = resp.into_inner(); + let Some(coll) = resp + .all_collection_info + .iter() + .find(|c| c.collection_id == collection_id) + else { + continue; + }; + assert_eq!(1, coll.first_log_offset); + found = true; + break; + } + if !found { + panic!("never saw collection info"); + } // "finish" the compaction. let _resp = rust_log_service .update_collection_log_offset(UpdateCollectionLogOffsetRequest { @@ -195,8 +203,8 @@ async fn test_k8s_integration_log_offsets_empty_log_50054() { }) .await .unwrap(); - // Wait 15 seconds for the background interval. It's a magic constant in log service code. - tokio::time::sleep(std::time::Duration::from_secs(15)).await; + // Wait 20 seconds for the background interval. It's a magic constant in log service code. + tokio::time::sleep(std::time::Duration::from_secs(20)).await; // said record no longer shows in compaction. let resp = rust_log_service .get_all_collection_info_to_compact(GetAllCollectionInfoToCompactRequest { diff --git a/rust/wal3/src/cursors.rs b/rust/wal3/src/cursors.rs index 8f7c3e7f8f2..a24b5b2cd97 100644 --- a/rust/wal3/src/cursors.rs +++ b/rust/wal3/src/cursors.rs @@ -51,17 +51,23 @@ impl CursorName<'_> { ////////////////////////////////////////////// Witness ///////////////////////////////////////////// #[derive(Clone, Debug, Eq, PartialEq, serde::Deserialize, serde::Serialize)] -pub struct Witness(ETag, pub Cursor); +pub struct Witness { + e_tag: ETag, + pub cursor: Cursor, +} impl Witness { /// This method constructs a witness that will likely fail, but that contains a new cursor. /// Useful in tests and not much else. pub fn default_etag_with_cursor(cursor: Cursor) -> Self { - Self(ETag("NO MATCH".to_string()), cursor) + Self { + e_tag: ETag("NO MATCH".to_string()), + cursor, + } } pub fn cursor(&self) -> &Cursor { - &self.1 + &self.cursor } } @@ -135,7 +141,7 @@ impl CursorStore { let cursor: Cursor = serde_json::from_slice(&data).map_err(|e| { Error::CorruptCursor(format!("Failed to deserialize cursor {}: {}", name.0, e)) })?; - Ok(Some(Witness(e_tag, cursor))) + Ok(Some(Witness { e_tag, cursor })) } pub async fn init<'a>(&self, name: &CursorName<'a>, cursor: Cursor) -> Result { @@ -151,7 +157,7 @@ impl CursorStore { witness: &Witness, ) -> Result { // Semaphore taken by put. - let options = PutOptions::if_matches(&witness.0, StorageRequestPriority::P0); + let options = PutOptions::if_matches(&witness.e_tag, StorageRequestPriority::P0); self.put(name, cursor.clone(), options).await } @@ -179,7 +185,7 @@ impl CursorStore { name.0 ))); }; - Ok(Witness(e_tag, cursor)) + Ok(Witness { e_tag, cursor }) } pub async fn list(&self) -> Result>, Error> { @@ -271,8 +277,8 @@ mod tests { .await .unwrap() .unwrap(); - assert_eq!(LogPosition::from_offset(99), witness.1.position); - assert_eq!(54321u64, witness.1.epoch_us); - assert_eq!("test-writer", witness.1.writer); + assert_eq!(LogPosition::from_offset(99), witness.cursor.position); + assert_eq!(54321u64, witness.cursor.epoch_us); + assert_eq!("test-writer", witness.cursor.writer); } } diff --git a/rust/wal3/src/writer.rs b/rust/wal3/src/writer.rs index e72390599b1..aa6195dc3eb 100644 --- a/rust/wal3/src/writer.rs +++ b/rust/wal3/src/writer.rs @@ -718,7 +718,7 @@ impl OnceLogWriter { let mut collect_up_to = None; for cursor_name in cursors.list().await? { let witness = cursors.load(&cursor_name).await?; - let Some(cursor) = witness.map(|w| w.1) else { + let Some(cursor) = witness.map(|w| w.cursor) else { return Err(Error::LogContentionFailure); }; if cursor.position <= collect_up_to.unwrap_or(cursor.position) { diff --git a/rust/worker/tilt_config.yaml b/rust/worker/tilt_config.yaml index da39c7e270f..1f67394d245 100644 --- a/rust/worker/tilt_config.yaml +++ b/rust/worker/tilt_config.yaml @@ -127,11 +127,11 @@ compaction_service: num_worker_threads: 4 dispatcher_queue_size: 100 worker_queue_size: 100 - task_queue_limit: 1000 - active_io_tasks: 100 + task_queue_limit: 10000 + active_io_tasks: 10000 compactor: compaction_manager_queue_size: 1000 - max_concurrent_jobs: 100 + max_concurrent_jobs: 50 compaction_interval_sec: 10 min_compaction_size: 10 max_compaction_size: 10000 @@ -170,6 +170,7 @@ compaction_service: log_service: num_records_before_backpressure: 100000 + reinsert_threshold: 0 opentelemetry: service_name: "rust-log-service" endpoint: "http://otel-collector:4317"