Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions next-plaid-api/src/handlers/documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ async fn process_batch(
// Acquire per-index lock using full path for isolation
let lock = get_index_lock_by_path(&path_str);
let _guard = lock.lock().await;
state.record_update_stage(index_name, "batching", "processing queued update batch");

// Run heavy work in blocking thread
let result = task::spawn_blocking(move || -> Result<BatchMetrics, String> {
Expand All @@ -383,6 +384,11 @@ async fn process_batch(
.get_index_config(&name_inner)
.ok_or_else(|| format!("Failed to load config for index '{}'", name_inner))?;

// Windows cannot replace files that are still memory-mapped by the loaded index.
// POSIX keeps the old inode alive for readers, so only unload on Windows.
#[cfg(target_os = "windows")]
state_clone.unload_index(&name_inner);

// Check and automatically repair sync issues between index and DB
if let Err(e) = repair_index_db_sync(&path_str) {
return Err(format!("Index/DB sync repair failed: {}", e));
Expand All @@ -403,8 +409,14 @@ async fn process_batch(

// STEP 1: Update vector index FIRST
let index_update_start = std::time::Instant::now();
let index_result =
MmapIndex::update_or_create(&embeddings, &path_str, &index_config, &update_config);
let progress_state = state_clone.clone();
let progress_name = name_inner.clone();
let index_result = next_plaid::update::with_update_progress(
move |stage, message| {
progress_state.record_update_stage(&progress_name, stage, message);
},
|| MmapIndex::update_or_create(&embeddings, &path_str, &index_config, &update_config),
);

let (mut index, doc_ids) = match index_result {
Ok((idx, ids)) => (idx, ids),
Expand All @@ -418,6 +430,7 @@ async fn process_batch(
let last_doc_id = doc_ids.last().copied();

// STEP 2: Update metadata DB using the ACTUAL doc_ids from the index
state_clone.record_update_stage(&name_inner, "metadata_write", "writing metadata database");
let metadata_update_start = std::time::Instant::now();
let db_existed = filtering::exists(&path_str);
let db_result = if db_existed {
Expand Down Expand Up @@ -471,7 +484,7 @@ async fn process_batch(
};

// Reload State
state_clone.unload_index(&name_inner);
state_clone.record_update_stage(&name_inner, "reload", "loading updated index");
let idx = MmapIndex::load(&path_str).map_err(|e| format!("Failed to load index: {}", e))?;
state_clone.register_index(&name_inner, idx);

Expand All @@ -489,6 +502,7 @@ async fn process_batch(

match result {
Ok(Ok(metrics)) => {
state.record_update_complete(index_name, doc_count, "update complete");
tracing::info!(
index = %index_name,
num_documents = doc_count,
Expand All @@ -512,6 +526,7 @@ async fn process_batch(
}
}
Ok(Err(e)) => {
state.record_update_failed(index_name, &e);
tracing::error!(
index = %index_name,
num_documents = doc_count,
Expand All @@ -521,6 +536,7 @@ async fn process_batch(
);
}
Err(e) => {
state.record_update_failed(index_name, &e.to_string());
tracing::error!(
index = %index_name,
num_documents = doc_count,
Expand Down Expand Up @@ -1159,6 +1175,7 @@ pub async fn add_documents(
})?;

let doc_count = embeddings.len();
state.record_update_queued(&name, doc_count, "document update queued");

// Spawn background task
tokio::spawn(async move {
Expand All @@ -1171,6 +1188,8 @@ pub async fn add_documents(
// Clone name AGAIN for the inner closure, so `name_clone` stays valid for error logging
let name_inner = name_clone.clone();
let start = std::time::Instant::now();
state_clone.record_update_stage(&name_inner, "batching", "processing document update");
let result_state = state_clone.clone();

// 2. Perform heavy IO work in a blocking task
let result = task::spawn_blocking(move || -> ApiResult<u64> {
Expand All @@ -1180,9 +1199,9 @@ pub async fn add_documents(
.to_string_lossy()
.to_string();

// Release the shared mmap-backed index before loading a writable instance.
// On Windows, update/delete paths that rewrite files fail with OS error 1224
// if another mapped copy is still held in the state cache.
// Windows cannot replace files that are still memory-mapped by the loaded index.
// POSIX keeps the old inode alive for readers, so only unload on Windows.
#[cfg(target_os = "windows")]
state_clone.unload_index(&name_inner);

// Check sync before updating: if filtering DB exists, counts must match
Expand All @@ -1205,7 +1224,14 @@ pub async fn add_documents(
// Update with metadata (metadata is required)
let update_config = UpdateConfig::default();
let index_update_start = std::time::Instant::now();
index.update_with_metadata(&embeddings, &update_config, Some(&metadata))?;
let progress_state = state_clone.clone();
let progress_name = name_inner.clone();
next_plaid::update::with_update_progress(
move |stage, message| {
progress_state.record_update_stage(&progress_name, stage, message);
},
|| index.update_with_metadata(&embeddings, &update_config, Some(&metadata)),
)?;
let index_update_ms = index_update_start.elapsed().as_millis() as u64;

// Eviction: consult the cached config instead of racing a fresh file read.
Expand All @@ -1224,6 +1250,7 @@ pub async fn add_documents(
}

// Reload state
state_clone.record_update_stage(&name_inner, "reload", "loading updated index");
state_clone.reload_index(&name_inner)?;
Ok(index_update_ms)
})
Expand All @@ -1234,6 +1261,11 @@ pub async fn add_documents(
// Log result
match result {
Ok(Ok(index_update_ms)) => {
result_state.record_update_complete(
&name_clone,
doc_count,
"document update complete",
);
tracing::info!(
index = %name_clone,
num_documents = doc_count,
Expand All @@ -1243,6 +1275,7 @@ pub async fn add_documents(
);
}
Ok(Err(e)) => {
result_state.record_update_failed(&name_clone, &e.to_string());
tracing::error!(
index = %name_clone,
num_documents = doc_count,
Expand All @@ -1252,6 +1285,7 @@ pub async fn add_documents(
);
}
Err(e) => {
result_state.record_update_failed(&name_clone, &e.to_string());
tracing::error!(
index = %name_clone,
num_documents = doc_count,
Expand Down Expand Up @@ -1526,6 +1560,7 @@ pub async fn update_index(
ApiError::Internal(format!("Batch worker for index '{}' is not running", name))
}
})?;
state.record_update_queued(&name, doc_count, "update queued for batching");

tracing::debug!(
index = %name,
Expand Down Expand Up @@ -1670,15 +1705,24 @@ pub async fn update_index_with_encoding(
ApiError::Internal(format!("Batch worker for index '{}' is not running", name))
}
})?;
state.record_update_queued(&name, req.documents.len(), "encoding queued documents");
state.record_update_stage(&name, "encoding", "encoding documents");

// Now encode - we have a guaranteed slot in the batch queue
let embeddings = encode_texts_internal(
let embeddings = match encode_texts_internal(
state.clone(),
&req.documents,
InputType::Document,
req.pool_factor,
)
.await?;
.await
{
Ok(embeddings) => embeddings,
Err(err) => {
state.record_update_failed(&name, &err.to_string());
return Err(err);
}
};

let doc_count = embeddings.len();

Expand All @@ -1690,6 +1734,7 @@ pub async fn update_index_with_encoding(

// Send using the reserved permit - this is guaranteed to succeed
permit.send(batch_item);
state.record_update_queued(&name, doc_count, "update queued for batching");

tracing::debug!(
index = %name,
Expand Down
2 changes: 2 additions & 0 deletions next-plaid-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ use state::{ApiConfig, AppState};
),
components(schemas(
models::HealthResponse,
models::UpdateHealthStatus,
models::ModelHealthInfo,
models::IndexSummary,
models::ErrorResponse,
Expand Down Expand Up @@ -236,6 +237,7 @@ async fn health(state: axum::extract::State<Arc<AppState>>) -> PrettyJson<Health
index_dir: state.config.index_dir.to_string_lossy().to_string(),
memory_usage_bytes,
indices: state.get_all_index_summaries(),
updates: state.get_update_health_statuses(),
model: model_info,
})
}
Expand Down
41 changes: 41 additions & 0 deletions next-plaid-api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,11 +510,52 @@ pub struct HealthResponse {
pub memory_usage_bytes: u64,
/// List of available indices with their configuration
pub indices: Vec<IndexSummary>,
/// Active and recent update progress
pub updates: Vec<UpdateHealthStatus>,
/// Model information (only present when --model is specified)
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<ModelHealthInfo>,
}

/// In-memory update progress surfaced by the health endpoint.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
pub struct UpdateHealthStatus {
/// Index name
#[schema(example = "my-index")]
pub index: String,
/// Update status: queued, running, complete, failed
#[schema(example = "running")]
pub status: String,
/// Current update stage
#[schema(example = "centroid_expansion")]
pub stage: String,
/// Number of documents queued for this update batch
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(example = 300)]
pub queued_documents: Option<usize>,
/// Number of documents processed when known
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(example = 120)]
pub processed_documents: Option<usize>,
/// RFC3339 timestamp for when this update started
#[schema(example = "2026-05-22T01:04:12Z")]
pub started_at: String,
/// RFC3339 timestamp for the last progress change
#[schema(example = "2026-05-22T01:05:01Z")]
pub updated_at: String,
/// Milliseconds elapsed since this update started
#[schema(example = 49000)]
pub elapsed_ms: u64,
/// Human-readable current progress note
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(example = "clustering outlier embeddings")]
pub message: Option<String>,
/// Failure message when status is failed
#[serde(skip_serializing_if = "Option::is_none")]
#[schema(example = "Index update failed: NPY file too small")]
pub error: Option<String>,
}

/// Model information for health endpoint.
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct ModelHealthInfo {
Expand Down
Loading
Loading