diff --git a/colgrep/src/index/mod.rs b/colgrep/src/index/mod.rs index 95ee2fb..7104d1f 100644 --- a/colgrep/src/index/mod.rs +++ b/colgrep/src/index/mod.rs @@ -45,6 +45,17 @@ const MAX_FILE_SIZE: u64 = 512 * 1024; /// Larger values reduce I/O overhead but use more memory. const INDEX_CHUNK_SIZE: usize = 1024; +/// Marker file (in the index dir, next to state.json) that records an in-progress +/// resumable build. Its presence routes the next run back into `build_resumable` so an +/// interrupted initial build (e.g. an agent's command timeout) resumes from where it left +/// off instead of restarting from scratch. +const BUILDING_MARKER: &str = ".building"; + +/// Approximate number of code units to embed per resumable-build batch before committing +/// state to disk. Each committed batch survives an interruption, so smaller values checkpoint +/// more often (more resumable) at the cost of more index-append overhead. +const BUILD_CHECKPOINT_UNITS: usize = 4096; + /// Threshold for switching to higher pool factor (fewer embeddings per doc). /// When encoding more than this many units, use LARGE_BATCH_POOL_FACTOR. const LARGE_BATCH_THRESHOLD: usize = 10_000; @@ -1259,7 +1270,18 @@ impl IndexBuilder { /// - Full rebuild if CLI version changed (clears outdated index) pub fn index(&mut self, languages: Option<&[Language]>, force: bool) -> Result { let _lock = acquire_index_lock(&self.index_dir)?; + self.run_indexing(languages, force) + } + /// Shared indexing dispatch, run while the index lock is held by the caller. + /// + /// Routing: + /// - forced or CLI-version change → atomic `full_rebuild` (protects a working index); + /// - a fresh build, or resuming an interrupted resumable build → `build_resumable` + /// (checkpoints per batch so interruptions keep their progress); + /// - corrupted filtering DB → atomic `full_rebuild`; + /// - otherwise → `incremental_update`. + fn run_indexing(&mut self, languages: Option<&[Language]>, force: bool) -> Result { // Clean up any leftover temp/old dirs from previous failed full rebuilds let _ = std::fs::remove_dir_all(self.index_dir.join("index.tmp")); let _ = std::fs::remove_dir_all(self.index_dir.join("index.old")); @@ -1273,20 +1295,29 @@ impl IndexBuilder { let index_path = index_dir.to_str().unwrap(); let index_exists = index_dir.join("metadata.json").exists(); let filtering_exists = filtering::exists(index_path); + let building = self.index_dir.join(BUILDING_MARKER).exists(); // Check if CLI version changed - if so, clear and rebuild the index let current_version = env!("CARGO_PKG_VERSION"); let version_mismatch = index_exists && !state.cli_version.is_empty() && state.cli_version != current_version; - // Need full rebuild if forced, index doesn't exist, filtering DB is missing, - // or CLI version changed - if force || !index_exists || !filtering_exists || version_mismatch { + // Forced or CLI-version change: clean atomic rebuild. Drop any in-progress + // resumable-build marker so we don't try to resume an index we're discarding. + if force || version_mismatch { + let _ = std::fs::remove_file(self.index_dir.join(BUILDING_MARKER)); return self.full_rebuild(languages); } - // Validate filtering DB is not corrupted (can be read) - if filtering::count(index_path).is_err() { + // Fresh first build, or resume of an interrupted resumable build: build directly + // into the real index dir and checkpoint state per batch, so an interrupted run + // (e.g. an agent command timeout) keeps its progress instead of restarting. + if building || !index_exists { + return self.build_resumable(languages); + } + + // metadata.json exists but the filtering DB is missing or unreadable → corrupted. + if !filtering_exists || filtering::count(index_path).is_err() { eprintln!("⚠️ Filtering database corrupted, rebuilding index..."); return self.full_rebuild(languages); } @@ -1353,74 +1384,7 @@ impl IndexBuilder { return Ok(None); }; - // Clean up any leftover temp/old dirs from previous failed full rebuilds - let _ = std::fs::remove_dir_all(self.index_dir.join("index.tmp")); - let _ = std::fs::remove_dir_all(self.index_dir.join("index.old")); - - // Fresh git worktree? Seed from a sibling's index so we re-embed only the diff - // instead of rebuilding from scratch. No-op for non-worktree projects. - self.maybe_seed_from_worktree(force); - - let state = IndexState::load(&self.index_dir)?; - let index_dir = get_vector_index_path(&self.index_dir); - let index_path = index_dir.to_str().unwrap(); - let index_exists = index_dir.join("metadata.json").exists(); - let filtering_exists = filtering::exists(index_path); - - let current_version = env!("CARGO_PKG_VERSION"); - let version_mismatch = - index_exists && !state.cli_version.is_empty() && state.cli_version != current_version; - - if force || !index_exists || !filtering_exists || version_mismatch { - return self.full_rebuild(languages).map(Some); - } - - if filtering::count(index_path).is_err() { - eprintln!("⚠️ Filtering database corrupted, rebuilding index..."); - return self.full_rebuild(languages).map(Some); - } - - let state = if state.files.is_empty() { - match self.reconstruct_state_from_filtering_db(index_path) { - Ok(reconstructed) => { - eprintln!( - "📋 Reconstructed state from index ({} files)", - reconstructed.files.len() - ); - reconstructed.save(&self.index_dir)?; - reconstructed - } - Err(_) => { - return self.full_rebuild(languages).map(Some); - } - } - } else { - state - }; - - if let Ok(metadata_count) = filtering::count(index_path) { - if let Ok(index_metadata) = Metadata::load_from_path(&index_dir) { - if metadata_count != index_metadata.num_documents { - match self.reconcile_document_counts( - index_path, - metadata_count, - index_metadata.num_documents, - ) { - Ok(()) => { - eprintln!( - "🔧 Reconciled index (filtering: {}, vector: {})", - metadata_count, index_metadata.num_documents - ); - } - Err(_) => { - return self.full_rebuild(languages).map(Some); - } - } - } - } - } - - self.incremental_update(&state, languages).map(Some) + self.run_indexing(languages, force).map(Some) } /// Index only specific files (for filtered search). @@ -1720,7 +1684,268 @@ impl IndexBuilder { Ok(false) } + /// Build (or resume building) the index directly into the real index directory, + /// committing `state.json` after every batch of ~[`BUILD_CHECKPOINT_UNITS`] units. + /// + /// Unlike [`full_rebuild`], which encodes into a throwaway `index.tmp` and only persists + /// on success, this commits incrementally — so an interrupted run (an agent's command + /// timeout, Ctrl-C) keeps the batches it already finished. The presence of the + /// [`BUILDING_MARKER`] file routes the next run back here to resume; it's removed once the + /// build completes. Used for the first build and its resumes, never to rebuild over an + /// already-complete index (that path stays on the atomic [`full_rebuild`]). + fn build_resumable(&mut self, languages: Option<&[Language]>) -> Result { + let index_dir = get_vector_index_path(&self.index_dir); + std::fs::create_dir_all(&index_dir)?; + let index_path = index_dir.to_str().unwrap().to_string(); + + // Mark the build in progress so the next run resumes here even after some chunks + // (and thus metadata.json) have been written. + let marker = self.index_dir.join(BUILDING_MARKER); + std::fs::write(&marker, "")?; + + // Resume: keep whatever a previous run already committed. + let mut state = IndexState::load(&self.index_dir)?; + + // A prior run interrupted mid-batch may have left the vector index and filtering DB + // slightly out of sync (a partially written chunk). Trim that before re-embedding so + // we don't accumulate orphan documents across resumes. + if index_dir.join("metadata.json").exists() && filtering::exists(&index_path) { + let _ = self.repair_index_db_sync(&index_dir); + } + + let (scanned, skipped) = self.scan_files(languages)?; + let scanned_set: HashSet = scanned.iter().cloned().collect(); + + // Drop files that disappeared since a prior partial run. + let stale: Vec = state + .files + .keys() + .filter(|p| !scanned_set.contains(*p)) + .cloned() + .collect(); + for path in &stale { + self.delete_file_from_index(&index_path, path)?; + state.files.remove(path); + } + + // Files still needing work: scanned, not already committed, not known-unparseable. + let todo: Vec = scanned + .iter() + .filter(|p| !state.files.contains_key(*p) && !state.ignored_files.contains(*p)) + .cloned() + .collect(); + + // Parse the remaining files (cheap relative to embedding) and build the call graph + // over them so `called_by` is populated for this build's units. + let pb = ProgressBar::new(todo.len() as u64); + pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} ({eta}) {msg}") + .unwrap() + .progress_chars("█▓░"), + ); + pb.enable_steady_tick(std::time::Duration::from_millis(100)); + pb.set_message("Parsing files..."); + + let mut all_units: Vec = Vec::new(); + let mut file_info: HashMap = HashMap::new(); + for parsed in parse_files_parallel(&self.project_root, &todo, Some(&pb)) { + if let Some(reason) = parsed.skip_reason { + eprintln!("⚠️ {}", reason); + state.ignored_files.insert(parsed.path); + continue; + } + if let Some(fi) = parsed.file_info { + file_info.insert(parsed.path.clone(), fi); + all_units.extend(parsed.units); + } + } + let parsing_interrupted = is_interrupted(); + pb.finish_and_clear(); + if parsing_interrupted { + // Nothing was embedded this run; committed batches (if any) already persisted. + state.dirty = false; + state.save(&self.index_dir)?; + anyhow::bail!("Indexing interrupted by user"); + } + + build_call_graph(&mut all_units); + + // Confirm once up front for very large codebases (auto-confirmed on non-TTY/agents). + if !self.auto_confirm + && all_units.len() > CONFIRMATION_THRESHOLD + && !prompt_large_index_confirmation(all_units.len()) + { + let _ = std::fs::remove_file(&marker); + anyhow::bail!("Indexing cancelled by user"); + } + + // Regroup units per file so each batch contains whole files — that lets us checkpoint + // `state` at file granularity after each committed batch. + let mut units_by_file: HashMap> = HashMap::new(); + for unit in all_units { + units_by_file + .entry(unit.file.clone()) + .or_default() + .push(unit); + } + + let encode_batch_size = self.encode_batch_size.unwrap_or(DEFAULT_ENCODE_BATCH_SIZE); + let index_chunk_size = self + .index_chunk_size + .unwrap_or(INDEX_CHUNK_SIZE) + .max(encode_batch_size); + + let total_units: usize = units_by_file.values().map(|u| u.len()).sum(); + let already = state.files.len(); + let mut added = 0usize; + + // Files that parsed but yielded no code units (empty or import-only files) need no + // embedding. Record them as done now — and persist — so they're not re-parsed on every + // resume round (parity with full_rebuild, which also stores them). `file_info` only + // holds this run's freshly-parsed files, none of which are in `state` yet. + let mut recorded_empty = false; + for (path, fi) in &file_info { + if !units_by_file.contains_key(path) { + state.files.insert(path.clone(), fi.clone()); + added += 1; + recorded_empty = true; + } + } + if recorded_empty { + state.dirty = false; + state.save(&self.index_dir)?; + } + + // Encode in file-coherent batches of ~BUILD_CHECKPOINT_UNITS units, committing state + // after each batch so interruptions keep finished work. + let encode_pb = ProgressBar::new(total_units as u64); + encode_pb.set_style( + ProgressStyle::default_bar() + .template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} {msg}") + .unwrap() + .progress_chars("█▓░"), + ); + encode_pb.enable_steady_tick(std::time::Duration::from_millis(100)); + encode_pb.set_message("Encoding..."); + + let mut batch_files: Vec = Vec::new(); + let mut batch_units: Vec = Vec::new(); + let mut interrupted = false; + + // Deterministic order for stable checkpoints/resumes. + let mut ordered: Vec = units_by_file.keys().cloned().collect(); + ordered.sort(); + + for file in ordered { + let units = units_by_file.remove(&file).unwrap_or_default(); + batch_units.extend(units); + batch_files.push(file); + + if batch_units.len() >= BUILD_CHECKPOINT_UNITS { + if self.flush_build_batch( + &index_path, + &batch_files, + &batch_units, + index_chunk_size, + Some(&encode_pb), + )? { + interrupted = true; + break; + } + for f in batch_files.drain(..) { + if let Some(fi) = file_info.get(&f) { + state.files.insert(f, fi.clone()); + added += 1; + } + } + batch_units.clear(); + state.dirty = false; + state.save(&self.index_dir)?; // checkpoint + } + } + + // Final partial batch. + if !interrupted && !batch_units.is_empty() { + if self.flush_build_batch( + &index_path, + &batch_files, + &batch_units, + index_chunk_size, + Some(&encode_pb), + )? { + interrupted = true; + } else { + for f in batch_files.drain(..) { + if let Some(fi) = file_info.get(&f) { + state.files.insert(f, fi.clone()); + added += 1; + } + } + state.dirty = false; + state.save(&self.index_dir)?; + } + } + + encode_pb.finish_and_clear(); + + if interrupted { + // Persist the committed batches; keep the marker so the next run resumes. + state.dirty = false; + state.save(&self.index_dir)?; + anyhow::bail!("Indexing interrupted by user"); + } + + // Build complete: drop the marker and persist final metadata. + state.dirty = false; + state.save(&self.index_dir)?; + ProjectMetadata::new(&self.project_root, &self.model_id).save(&self.index_dir)?; + let _ = std::fs::remove_file(&marker); + + Ok(UpdateStats { + added, + changed: 0, + deleted: stale.len(), + unchanged: already, + skipped, + }) + } + + /// Encode one resumable-build batch into the index. Each file's stale entries are deleted + /// first so re-running after a mid-batch interruption is idempotent (no duplicate docs). + /// Returns `Ok(true)` if encoding was interrupted. + fn flush_build_batch( + &mut self, + index_path: &str, + batch_files: &[PathBuf], + batch_units: &[CodeUnit], + index_chunk_size: usize, + pb: Option<&ProgressBar>, + ) -> Result { + if batch_units.is_empty() { + return Ok(false); + } + // Idempotent resume: clear any partial docs a prior interrupted run wrote for these files. + for file in batch_files { + self.delete_file_from_index(index_path, file)?; + } + self.ensure_model_created(batch_units.len())?; + let pool_factor = self.resolve_pool_factor(batch_units.len()); + let sorted_units = prepare_units_for_encoding(batch_units, index_chunk_size); + let was_interrupted = self.run_encoding_pipeline( + &sorted_units, + index_chunk_size, + pool_factor, + index_path, + pb, + )?; + Ok(was_interrupted || is_interrupted()) + } + fn full_rebuild(&mut self, languages: Option<&[Language]>) -> Result { + // A clean atomic rebuild supersedes any in-progress resumable build. + let _ = std::fs::remove_file(self.index_dir.join(BUILDING_MARKER)); + let index_path = get_vector_index_path(&self.index_dir); let temp_path = self.index_dir.join("index.tmp"); let old_path = self.index_dir.join("index.old");