diff --git a/colgrep/src/index/mod.rs b/colgrep/src/index/mod.rs index 7104d1f2..72f6c8d9 100644 --- a/colgrep/src/index/mod.rs +++ b/colgrep/src/index/mod.rs @@ -56,6 +56,94 @@ const BUILDING_MARKER: &str = ".building"; /// more often (more resumable) at the cost of more index-append overhead. const BUILD_CHECKPOINT_UNITS: usize = 4096; +/// Test-only counter of expensive `delete_from_index` invocations. +/// +/// Issue #116: deleting many files used to call the full-index-rewrite primitive once per +/// file, making an incremental update O(changed_files × index_size). Batching collapses that +/// to a single call; this counter lets regression tests assert the batching holds. +#[cfg(test)] +static DELETE_FROM_INDEX_CALLS: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +/// Wrapper over [`next_plaid::delete_from_index`] that counts calls under `cfg(test)`. +/// Zero overhead in release builds. +fn delete_from_index_counted(ids: &[i64], index_path: &str) -> Result { + #[cfg(test)] + DELETE_FROM_INDEX_CALLS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + Ok(delete_from_index(ids, index_path)?) +} + +/// Remove every index entry belonging to any of `files`, in a single index rewrite. +/// +/// Both deletion primitives are full-index operations: `delete_from_index` rewrites every +/// chunk and rebuilds the IVF, and `filtering::delete` rewrites the whole metadata table. +/// Calling them once per file makes an incremental update O(changed_files × index_size) — the +/// cause of issue #116, where ~276 deleted files hung for minutes on a ~1.2 GB index. +/// Collecting every doc ID up front and deleting once collapses that to a single O(index_size) +/// rewrite, regardless of how many files changed. +/// +/// The doc IDs for ALL files must be read before any deletion: both primitives renumber the +/// surviving documents, so interleaving reads and deletes would invalidate the IDs that haven't +/// been deleted yet. Returns the number of documents removed. +fn delete_files_from_index(index_path: &str, files: &[PathBuf]) -> Result { + if files.is_empty() { + return Ok(0); + } + + // Gather the doc IDs for every file first (dedup in case a path appears twice), then + // delete the whole set in one pass. + let mut ids: Vec = Vec::new(); + let mut seen: HashSet = HashSet::new(); + for file_path in files { + let file_str = file_path.to_string_lossy().to_string(); + let file_ids = + filtering::where_condition(index_path, "file = ?", &[serde_json::json!(file_str)]) + .unwrap_or_default(); + for id in file_ids { + if seen.insert(id) { + ids.push(id); + } + } + } + + if ids.is_empty() { + return Ok(0); + } + + delete_from_index_counted(&ids, index_path)?; + filtering::delete(index_path, &ids)?; + Ok(ids.len()) +} + +/// Decide whether a sibling worktree's index directory is a usable seed source, returning its +/// loaded [`IndexState`] when so (and `None` otherwise). +/// +/// A source is usable only if it is a **complete**, current-version, non-dirty index: +/// - No `.building` marker. A resumable build writes `metadata.json` and checkpoints state with +/// `dirty = false` after each committed batch, so an interrupted build looks complete by every +/// other check while only holding a fraction of its documents. The marker is the sole signal +/// that the build hasn't finished; seeding from it would copy a partial store and treat it as +/// whole. See issue #115. +/// - `metadata.json` present and a filtering DB present (rules out absent/stale-format stores +/// that `index()` would discard anyway). +/// - State loads, is non-empty, matches the current CLI version, and isn't dirty. +fn seed_source_state(src_dir: &Path, current_version: &str) -> Option { + if src_dir.join(BUILDING_MARKER).exists() { + return None; + } + + let src_vector = get_vector_index_path(src_dir); + let src_vector_str = src_vector.to_str()?; + if !src_vector.join("metadata.json").exists() || !filtering::exists(src_vector_str) { + return None; + } + + match IndexState::load(src_dir) { + Ok(s) if !s.files.is_empty() && s.cli_version == current_version && !s.dirty => Some(s), + _ => None, + } +} + /// Threshold for switching to higher pool factor (fewer embeddings per doc). /// When encoding more than this many units, use LARGE_BATCH_POOL_FACTOR. const LARGE_BATCH_THRESHOLD: usize = 10_000; @@ -1559,10 +1647,9 @@ impl IndexBuilder { // Delete changed files from index right before writing new data. // Deferred from earlier to minimize the window where data is missing - // from the index (for concurrent readers and interrupt safety). - for file_path in &files_changed { - self.delete_file_from_index(index_path, file_path)?; - } + // from the index (for concurrent readers and interrupt safety). Batched + // into a single index rewrite — see delete_files_from_index / issue #116. + delete_files_from_index(index_path, &files_changed)?; let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size); let was_interrupted = self.run_encoding_pipeline( @@ -1639,20 +1726,12 @@ impl IndexBuilder { for candidate in candidates { let src_dir = &candidate.index_dir; - let src_vector = get_vector_index_path(src_dir); - let Some(src_vector_str) = src_vector.to_str() else { + // Validate the sibling holds a complete, current-version, non-dirty index that + // isn't mid-build. Skip otherwise so we never seed from a half-built or stale store. + let Some(src_state) = seed_source_state(src_dir, current_version) else { continue; }; - - // The sibling must have a complete, current-version index. Skip otherwise so we - // never copy a half-built or stale-format index that index() would discard anyway. - if !src_vector.join("metadata.json").exists() || !filtering::exists(src_vector_str) { - continue; - } - let src_state = match IndexState::load(src_dir) { - Ok(s) if !s.files.is_empty() && s.cli_version == current_version && !s.dirty => s, - _ => continue, - }; + let src_vector = get_vector_index_path(src_dir); // Copy the vector/filtering store via a temp dir, then rename into place so an // interrupted copy never leaves a half-written index/ behind. @@ -1723,8 +1802,8 @@ impl IndexBuilder { .filter(|p| !scanned_set.contains(*p)) .cloned() .collect(); + delete_files_from_index(&index_path, &stale)?; for path in &stale { - self.delete_file_from_index(&index_path, path)?; state.files.remove(path); } @@ -1925,10 +2004,9 @@ impl IndexBuilder { if batch_units.is_empty() { return Ok(false); } - // Idempotent resume: clear any partial docs a prior interrupted run wrote for these files. - for file in batch_files { - self.delete_file_from_index(index_path, file)?; - } + // Idempotent resume: clear any partial docs a prior interrupted run wrote for these + // files, in a single batched index rewrite (issue #116). + delete_files_from_index(index_path, batch_files)?; self.ensure_model_created(batch_units.len())?; let pool_factor = self.resolve_pool_factor(batch_units.len()); let sorted_units = prepare_units_for_encoding(batch_units, index_chunk_size); @@ -2096,6 +2174,15 @@ impl IndexBuilder { && plan.deleted.is_empty() && orphaned_deleted == 0 { + // If the previous run left the index dirty, the repair above already brought the + // store back in sync — so clear the flag now. Returning without persisting would + // leave the index permanently dirty, forcing a (costly) repair on every future + // run even though nothing is wrong. See issue #115. + if old_state.dirty { + let mut state = old_state.clone(); + state.dirty = false; + state.save(&self.index_dir)?; + } return Ok(UpdateStats { added: 0, changed: 0, @@ -2112,10 +2199,9 @@ impl IndexBuilder { state.save(&self.index_dir)?; } - // 1. Delete chunks for deleted files (safe — not re-adding these) - for file_path in &plan.deleted { - self.delete_file_from_index(index_path, file_path)?; - } + // 1. Delete chunks for deleted files (safe — not re-adding these). Batched into a + // single index rewrite — see delete_files_from_index / issue #116. + delete_files_from_index(index_path, &plan.deleted)?; // Remove deleted files from state for path in &plan.deleted { @@ -2189,12 +2275,13 @@ impl IndexBuilder { } // Delete stale index entries for skipped files that were previously indexed - // (e.g., files that became unreadable due to invalid UTF-8) - for file_path in &skipped_files { - if plan.changed.contains(file_path) { - let _ = self.delete_file_from_index(index_path, file_path); - } - } + // (e.g., files that became unreadable due to invalid UTF-8). Batched into one rewrite. + let stale_skipped: Vec = skipped_files + .iter() + .filter(|p| plan.changed.contains(p)) + .cloned() + .collect(); + let _ = delete_files_from_index(index_path, &stale_skipped); // 3. Add new units to index let mut was_interrupted = false; @@ -2235,10 +2322,9 @@ impl IndexBuilder { // Delete changed files from index right before writing new data. // Deferred from earlier to minimize the window where data is missing - // from the index (for concurrent readers and interrupt safety). - for file_path in &plan.changed { - self.delete_file_from_index(index_path, file_path)?; - } + // from the index (for concurrent readers and interrupt safety). Batched + // into a single index rewrite — see delete_files_from_index / issue #116. + delete_files_from_index(index_path, &plan.changed)?; let sorted_units = prepare_units_for_encoding(&new_units, index_chunk_size); let pipeline_interrupted = self.run_encoding_pipeline( @@ -2581,22 +2667,6 @@ impl IndexBuilder { Ok(plan) } - /// Delete all chunks for a file from vector index and filtering DB. - fn delete_file_from_index(&self, index_path: &str, file_path: &Path) -> Result<()> { - let file_str = file_path.to_string_lossy().to_string(); - - // Get doc IDs directly from filtering DB - let ids = - filtering::where_condition(index_path, "file = ?", &[serde_json::json!(file_str)]) - .unwrap_or_default(); - - if !ids.is_empty() { - delete_from_index(&ids, index_path)?; - filtering::delete(index_path, &ids)?; - } - Ok(()) - } - /// Clean up orphaned entries: files in index but not on disk /// This handles directory deletion/rename and any state inconsistencies fn cleanup_orphaned_entries(&self, index_path: &str) -> Result { @@ -2606,27 +2676,15 @@ impl IndexBuilder { // tens-of-megabytes JSON deserialize on large indexes. let files = filtering::get_distinct_strings(index_path, "file").unwrap_or_default(); - let mut deleted_count = 0; - for file_str in files { - let full_path = self.project_root.join(&file_str); - if !full_path.exists() { - // File no longer exists on disk - delete from index - let ids = filtering::where_condition( - index_path, - "file = ?", - &[serde_json::json!(file_str)], - ) - .unwrap_or_default(); - - if !ids.is_empty() { - delete_from_index(&ids, index_path)?; - filtering::delete(index_path, &ids)?; - deleted_count += ids.len(); - } - } - } + // Collect every orphaned file, then remove them all in one batched index rewrite + // rather than rewriting the whole index once per orphan (issue #116). + let orphaned: Vec = files + .into_iter() + .map(PathBuf::from) + .filter(|rel| !self.project_root.join(rel).exists()) + .collect(); - Ok(deleted_count) + delete_files_from_index(index_path, &orphaned) } #[allow(dead_code)] @@ -4687,4 +4745,198 @@ mod tests { &force )); } + + /// Build a minimal `IndexBuilder` pointing at the given (project, index) directories, + /// without a model. Suitable for exercising index-maintenance paths that don't encode. + fn test_builder(project_root: &Path, index_dir: &Path) -> IndexBuilder { + IndexBuilder { + model: None, + model_path: PathBuf::from("/nonexistent-model"), + quantized: false, + parallel_sessions: None, + batch_size: None, + project_root: project_root.to_path_buf(), + index_dir: index_dir.to_path_buf(), + pool_factor: None, + encode_batch_size: None, + index_chunk_size: None, + dynamic_batch: true, + auto_confirm: true, + model_id: "test-model".to_string(), + } + } + + /// Build a small vector index + filtering DB at `index_path`, distributing `n` documents + /// evenly across `files` (each doc tagged with its file in the filtering DB). Model-free. + fn build_fixture_index(index_path: &str, files: &[&str], docs_per_file: usize) { + use ndarray::Array2; + use next_plaid::IndexConfig; + + let n = files.len() * docs_per_file; + let mut embeddings: Vec> = Vec::new(); + for i in 0..n { + let mut doc = Array2::::zeros((5, 32)); + for j in 0..5 { + for k in 0..32 { + doc[[j, k]] = (i as f32 * 0.1) + (j as f32 * 0.01) + (k as f32 * 0.001); + } + } + for mut row in doc.rows_mut() { + let norm: f32 = row.iter().map(|x| x * x).sum::().sqrt(); + if norm > 0.0 { + row.iter_mut().for_each(|x| *x /= norm); + } + } + embeddings.push(doc); + } + + let config = IndexConfig { + nbits: 2, + batch_size: 50, + seed: Some(42), + kmeans_niters: 2, + max_points_per_centroid: 256, + n_samples_kmeans: None, + start_from_scratch: 999, + force_cpu: false, + ..Default::default() + }; + MmapIndex::create_with_kmeans(&embeddings, index_path, &config).unwrap(); + + let metadata: Vec = (0..n) + .map(|i| serde_json::json!({ "file": files[i / docs_per_file] })) + .collect(); + let doc_ids: Vec = (0..n as i64).collect(); + filtering::create(index_path, &metadata, &doc_ids).unwrap(); + } + + /// Issue #116: deleting many files must collapse into a *single* full-index rewrite, not one + /// rewrite per file (which made incremental updates O(changed_files × index_size) and hung + /// for minutes). Asserts both the call count and that exactly the right documents survive. + #[test] + fn test_delete_files_from_index_is_a_single_rewrite() { + use std::sync::atomic::Ordering; + + let tmp = tempfile::tempdir().unwrap(); + let index_path = tmp.path().to_str().unwrap(); + + let files = ["a.rs", "b.rs", "c.rs", "d.rs"]; + let docs_per_file = 3; + build_fixture_index(index_path, &files, docs_per_file); + + // Delete three of the four files in one batched call. + let to_delete: Vec = ["a.rs", "b.rs", "c.rs"].iter().map(PathBuf::from).collect(); + + let before = DELETE_FROM_INDEX_CALLS.load(Ordering::Relaxed); + let removed = delete_files_from_index(index_path, &to_delete).unwrap(); + let calls = DELETE_FROM_INDEX_CALLS.load(Ordering::Relaxed) - before; + + assert_eq!(removed, 9, "should remove 3 files × 3 docs"); + assert_eq!( + calls, 1, + "issue #116: deleting N files must be ONE index rewrite, not one per file" + ); + + // Only d.rs survives, with its 3 documents. + let remaining = filtering::get_distinct_strings(index_path, "file").unwrap(); + assert_eq!(remaining, vec!["d.rs".to_string()]); + let idx = MmapIndex::load(index_path).unwrap(); + assert_eq!(idx.metadata.num_documents, 3); + } + + /// Deleting nothing (or only unknown files) must not rewrite the index at all. + #[test] + fn test_delete_files_from_index_noop_does_not_rewrite() { + use std::sync::atomic::Ordering; + + let tmp = tempfile::tempdir().unwrap(); + let index_path = tmp.path().to_str().unwrap(); + build_fixture_index(index_path, &["a.rs"], 2); + + let before = DELETE_FROM_INDEX_CALLS.load(Ordering::Relaxed); + assert_eq!(delete_files_from_index(index_path, &[]).unwrap(), 0); + assert_eq!( + delete_files_from_index(index_path, &[PathBuf::from("missing.rs")]).unwrap(), + 0 + ); + assert_eq!(DELETE_FROM_INDEX_CALLS.load(Ordering::Relaxed) - before, 0); + } + + /// Issue #115 (bug 1): an index left dirty by an interrupted run must be cleaned once the + /// repair has reconciled it, even when there are no file changes. Otherwise the dirty flag + /// is never cleared and every future run pays for a needless repair — "permanently dirty". + #[test] + fn test_incremental_update_clears_dirty_with_no_changes() { + let proj = tempfile::tempdir().unwrap(); + let idx = tempfile::tempdir().unwrap(); + + // Persist a dirty state with no tracked files; the project tree is empty, so the update + // plan is empty and we hit the "nothing to do" path. + let dirty_state = IndexState { + dirty: true, + ..Default::default() + }; + dirty_state.save(idx.path()).unwrap(); + assert!(IndexState::load(idx.path()).unwrap().dirty); + + let mut builder = test_builder(proj.path(), idx.path()); + builder.incremental_update(&dirty_state, None).unwrap(); + + assert!( + !IndexState::load(idx.path()).unwrap().dirty, + "issue #115: dirty flag must be cleared after a no-op update, not left set forever" + ); + } + + /// Issue #115 (bug 2): a sibling worktree whose resumable build is in progress / was + /// interrupted (a `.building` marker is present) must be rejected as a seed source. Such an + /// index passes every other completeness check — metadata.json, filtering DB, non-empty + /// non-dirty current-version state — yet only holds a fraction of its documents. + #[test] + fn test_seed_source_rejects_in_progress_build() { + let tmp = tempfile::tempdir().unwrap(); + let src_dir = tmp.path(); + let vector = get_vector_index_path(src_dir); + std::fs::create_dir_all(&vector).unwrap(); + std::fs::write(vector.join("metadata.json"), "{}").unwrap(); + filtering::create( + vector.to_str().unwrap(), + &[serde_json::json!({ "file": "a.rs" })], + &[0], + ) + .unwrap(); + + let mut state = IndexState::default(); + state.files.insert( + PathBuf::from("a.rs"), + FileInfo { + content_hash: 1, + mtime: 1, + }, + ); + state.save(src_dir).unwrap(); // save() stamps cli_version to the current version + let version = env!("CARGO_PKG_VERSION"); + + // Complete index, no marker → usable seed source. + assert!(seed_source_state(src_dir, version).is_some()); + + // Interrupted/in-progress build → rejected. + std::fs::write(src_dir.join(BUILDING_MARKER), "").unwrap(); + assert!( + seed_source_state(src_dir, version).is_none(), + "issue #115: an in-progress (.building) sibling index must not be used as a seed" + ); + std::fs::remove_file(src_dir.join(BUILDING_MARKER)).unwrap(); + + // Sanity: the other rejection reasons still hold. + assert!( + seed_source_state(src_dir, "0.0.0-other").is_none(), + "version mismatch" + ); + let mut dirty = IndexState::load(src_dir).unwrap(); + dirty.dirty = true; + // Re-save with dirty set (save() preserves the flag, only rewrites cli_version). + dirty.save(src_dir).unwrap(); + assert!(seed_source_state(src_dir, version).is_none(), "dirty index"); + } }