diff --git a/next-plaid-api/Dockerfile b/next-plaid-api/Dockerfile index 7577a57b..e0f775f6 100644 --- a/next-plaid-api/Dockerfile +++ b/next-plaid-api/Dockerfile @@ -37,6 +37,7 @@ # RATE_LIMIT_ENABLED: Enable rate limiting (default: false) # RATE_LIMIT_PER_SECOND: Max requests/s when rate limiting is enabled (default: 50) # RATE_LIMIT_BURST_SIZE: Burst size when rate limiting is enabled (default: 100) +# INDEX_DEFAULT_START_FROM_SCRATCH: Default rebuild-from-scratch threshold for new indexes (default: 999) # ============================================================================= # ============================================================================= @@ -225,6 +226,10 @@ ENV INDEX_DIR=/data/indices # x86_64: MKL statically linked (sequential, no thread explosion) # aarch64: OpenBLAS with single thread to avoid contention with rayon ENV OPENBLAS_NUM_THREADS=1 +# Default start_from_scratch threshold for new indexes: indexes holding fewer documents +# are rebuilt from scratch on update (higher-quality centroids), larger ones update +# incrementally. Override per-deployment, or per-index via the create-index config. +ENV INDEX_DEFAULT_START_FROM_SCRATCH=999 # Memory-mapped indices are always enabled for efficient memory usage # Set HF_MODEL_ID to auto-download model from HuggingFace Hub # Example: ENV HF_MODEL_ID=lightonai/GTE-ModernColBERT-v1 @@ -286,6 +291,10 @@ ENV RUST_LOG=info ENV INDEX_DIR=/data/indices # Prevent OpenBLAS thread explosion when used alongside rayon parallelism ENV OPENBLAS_NUM_THREADS=1 +# Default start_from_scratch threshold for new indexes: indexes holding fewer documents +# are rebuilt from scratch on update (higher-quality centroids), larger ones update +# incrementally. Override per-deployment, or per-index via the create-index config. +ENV INDEX_DEFAULT_START_FROM_SCRATCH=999 # Memory-mapped indices are always enabled for efficient memory usage # Set HF_MODEL_ID to auto-download model from HuggingFace Hub # Example: ENV HF_MODEL_ID=lightonai/GTE-ModernColBERT-v1 diff --git a/next-plaid-api/README.md b/next-plaid-api/README.md index 303e2681..b47d000d 100644 --- a/next-plaid-api/README.md +++ b/next-plaid-api/README.md @@ -805,6 +805,19 @@ Rate limiting is **optional and disabled by default**. Enable it by setting `RAT ## Environment Variables +### Indexing + +| Variable | Default | Description | +| --------------------------------- | ------- | --------------------------------------------------------------------------- | +| `INDEX_DEFAULT_START_FROM_SCRATCH` | `999` | Default `start_from_scratch` threshold for new indexes (see below) | + +When an index update runs, indexes holding **fewer** documents than `start_from_scratch` +are rebuilt from scratch (higher quality centroids) instead of updated incrementally; +larger indexes are updated incrementally (faster). This env var sets the **default** +threshold applied when an index is created without an explicit `config.start_from_scratch`. +A per-index value supplied at creation time always takes precedence. The value is read once +at startup. + ### Rate Limiting & Concurrency | Variable | Default | Description | diff --git a/next-plaid-api/src/handlers/documents.rs b/next-plaid-api/src/handlers/documents.rs index c5901854..c679b42b 100644 --- a/next-plaid-api/src/handlers/documents.rs +++ b/next-plaid-api/src/handlers/documents.rs @@ -45,6 +45,38 @@ fn max_queued_tasks_per_index() -> usize { }) } +fn build_update_config(stored_config: &IndexConfigStored) -> UpdateConfig { + let default_update_config = UpdateConfig::default(); + UpdateConfig { + batch_size: stored_config.batch_size, + kmeans_niters: default_update_config.kmeans_niters, + max_points_per_centroid: default_update_config.max_points_per_centroid, + n_samples_kmeans: default_update_config.n_samples_kmeans, + seed: stored_config.seed.unwrap_or(default_update_config.seed), + start_from_scratch: stored_config.start_from_scratch, + buffer_size: default_update_config.buffer_size, + force_cpu: default_update_config.force_cpu, + } +} + +fn build_stored_index_config(req: &CreateIndexRequest) -> IndexConfigStored { + IndexConfigStored { + nbits: req.config.nbits.unwrap_or(4), + batch_size: req.config.batch_size.unwrap_or(50_000), + seed: req.config.seed, + start_from_scratch: req + .config + .start_from_scratch + .unwrap_or(next_plaid::default_start_from_scratch()), + max_documents: req.config.max_documents, + fts_tokenizer: req + .config + .fts_tokenizer + .clone() + .unwrap_or_else(|| "unicode61".to_string()), + } +} + // --- Index/DB Sync Repair --- /// Global registry of per-index repair locks to prevent concurrent repair operations. @@ -405,7 +437,7 @@ async fn process_batch( fts_tokenizer: fts_tokenizer.clone(), ..Default::default() }; - let update_config = UpdateConfig::default(); + let update_config = build_update_config(&stored_config); // STEP 1: Update vector index FIRST let index_update_start = std::time::Instant::now(); @@ -965,18 +997,7 @@ pub async fn create_index( } // Build stored config - let stored_config = IndexConfigStored { - nbits: req.config.nbits.unwrap_or(4), - batch_size: req.config.batch_size.unwrap_or(50_000), - seed: req.config.seed, - start_from_scratch: req.config.start_from_scratch.unwrap_or(999), - max_documents: req.config.max_documents, - fts_tokenizer: req - .config - .fts_tokenizer - .clone() - .unwrap_or_else(|| "unicode61".to_string()), - }; + let stored_config = build_stored_index_config(&req); // Create index directory std::fs::create_dir_all(&index_path) @@ -1222,7 +1243,10 @@ pub async fn add_documents( let mut index = MmapIndex::load(&path_str)?; // Update with metadata (metadata is required) - let update_config = UpdateConfig::default(); + let stored_config = state_clone + .get_index_config(&name_inner) + .ok_or_else(|| ApiError::IndexNotFound(name_inner.clone()))?; + let update_config = build_update_config(&stored_config); let index_update_start = std::time::Instant::now(); let progress_state = state_clone.clone(); let progress_name = name_inner.clone(); @@ -1745,3 +1769,68 @@ pub async fn update_index_with_encoding( // Immediate Response Ok((StatusCode::ACCEPTED, Json("Update queued for batching"))) } + +#[cfg(test)] +mod tests { + use super::{build_stored_index_config, build_update_config}; + use crate::models::{CreateIndexRequest, IndexConfigRequest, IndexConfigStored}; + + #[test] + fn build_update_config_uses_stored_start_from_scratch() { + let stored_config = IndexConfigStored { + nbits: 4, + batch_size: 12_345, + seed: Some(7), + start_from_scratch: 17, + max_documents: None, + fts_tokenizer: "unicode61".to_string(), + }; + + let update_config = build_update_config(&stored_config); + + assert_eq!(update_config.batch_size, 12_345); + assert_eq!(update_config.seed, 7); + assert_eq!(update_config.start_from_scratch, 17); + } + + #[test] + fn build_stored_index_config_uses_request_start_from_scratch_when_present() { + let req = CreateIndexRequest { + name: "idx".to_string(), + config: IndexConfigRequest { + nbits: Some(4), + batch_size: Some(50_000), + seed: Some(42), + start_from_scratch: Some(23), + max_documents: None, + fts_tokenizer: Some("unicode61".to_string()), + }, + }; + + let stored_config = build_stored_index_config(&req); + + assert_eq!(stored_config.start_from_scratch, 23); + } + + #[test] + fn build_stored_index_config_uses_runtime_default_when_request_omits_threshold() { + let req = CreateIndexRequest { + name: "idx".to_string(), + config: IndexConfigRequest { + nbits: Some(4), + batch_size: Some(50_000), + seed: Some(42), + start_from_scratch: None, + max_documents: None, + fts_tokenizer: Some("unicode61".to_string()), + }, + }; + + let stored_config = build_stored_index_config(&req); + + assert_eq!( + stored_config.start_from_scratch, + next_plaid::default_start_from_scratch() + ); + } +} diff --git a/next-plaid-api/src/models.rs b/next-plaid-api/src/models.rs index b2336a71..155f1503 100644 --- a/next-plaid-api/src/models.rs +++ b/next-plaid-api/src/models.rs @@ -98,7 +98,7 @@ pub struct IndexConfigStored { } fn default_start_from_scratch() -> usize { - 999 + next_plaid::default_start_from_scratch() } fn default_fts_tokenizer() -> String { diff --git a/next-plaid-api/tests/start_from_scratch_env_test.rs b/next-plaid-api/tests/start_from_scratch_env_test.rs new file mode 100644 index 00000000..6579ed82 --- /dev/null +++ b/next-plaid-api/tests/start_from_scratch_env_test.rs @@ -0,0 +1,147 @@ +use std::sync::Arc; +use std::time::Duration; + +use axum::{ + extract::DefaultBodyLimit, + http::StatusCode, + routing::{get, post, put}, + Json, Router, +}; +use next_plaid_api::{ + handlers, + models::CreateIndexResponse, + state::{ApiConfig, AppState}, +}; +use serde_json::json; +use tempfile::TempDir; +use tokio::net::TcpListener; +use tower_http::{ + cors::{Any, CorsLayer}, + timeout::TimeoutLayer, +}; + +fn build_test_router(state: Arc) -> Router { + let index_routes = Router::new() + .route( + "/", + get(handlers::list_indices).post(handlers::create_index), + ) + .route( + "/{name}", + get(handlers::get_index_info).delete(handlers::delete_index), + ); + + let document_routes = Router::new() + .route( + "/{name}/documents", + post(handlers::add_documents).delete(handlers::delete_documents), + ) + .route("/{name}/update", post(handlers::update_index)) + .route("/{name}/config", put(handlers::update_index_config)); + + let metadata_routes = Router::new() + .route("/{name}/metadata", get(handlers::get_all_metadata)) + .route("/{name}/metadata/count", get(handlers::get_metadata_count)) + .route("/{name}/metadata/check", post(handlers::check_metadata)) + .route("/{name}/metadata/query", post(handlers::query_metadata)) + .route("/{name}/metadata/get", post(handlers::get_metadata)); + + let search_routes = Router::new() + .route("/{name}/search", post(handlers::search)) + .route("/{name}/search/filtered", post(handlers::search_filtered)); + + let rerank_route = Router::new().route("/rerank", post(handlers::rerank)); + + let indices_router = Router::new() + .merge(index_routes) + .merge(document_routes) + .merge(metadata_routes) + .merge(search_routes); + + let health_handler = |state: axum::extract::State>| async move { + Json(json!({ + "status": "healthy", + "loaded_indices": state.loaded_count() + })) + }; + + Router::new() + .route("/health", get(health_handler)) + .nest("/indices", indices_router) + .merge(rerank_route) + .layer(DefaultBodyLimit::max(100 * 1024 * 1024)) + .layer(TimeoutLayer::with_status_code( + StatusCode::REQUEST_TIMEOUT, + Duration::from_secs(30), + )) + .layer( + CorsLayer::new() + .allow_origin(Any) + .allow_methods(Any) + .allow_headers(Any), + ) + .with_state(state) +} + +#[tokio::test] +async fn create_index_uses_env_default_start_from_scratch_when_request_omits_it() { + unsafe { + std::env::set_var("INDEX_DEFAULT_START_FROM_SCRATCH", "159"); + } + assert_eq!( + std::env::var("INDEX_DEFAULT_START_FROM_SCRATCH") + .ok() + .as_deref(), + Some("159") + ); + + let temp_dir = TempDir::new().expect("temp dir should exist"); + let config = ApiConfig { + index_dir: temp_dir.path().to_path_buf(), + default_top_k: 10, + }; + + #[cfg(feature = "model")] + let state = Arc::new(AppState::with_model(config, None, None, None)); + #[cfg(not(feature = "model"))] + let state = Arc::new(AppState::new(config)); + + let app = build_test_router(state); + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("listener should bind"); + let addr = listener.local_addr().expect("local addr should exist"); + let base_url = format!("http://{}", addr); + + tokio::spawn(async move { + axum::serve(listener, app).await.expect("server should run"); + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .build() + .expect("client should build"); + + let response = client + .post(format!("{}/indices", base_url)) + .json(&json!({ + "name": "env-default-index", + "config": { + "nbits": 4 + } + })) + .send() + .await + .expect("request should succeed"); + + assert_eq!(response.status(), reqwest::StatusCode::OK); + + let body: CreateIndexResponse = response + .json() + .await + .expect("response body should deserialize"); + + assert_eq!(body.config.start_from_scratch, 159); +} diff --git a/next-plaid/src/index.rs b/next-plaid/src/index.rs index 49277919..9d66d9ba 100644 --- a/next-plaid/src/index.rs +++ b/next-plaid/src/index.rs @@ -74,7 +74,7 @@ pub struct IndexConfig { } fn default_start_from_scratch() -> usize { - 999 + crate::default_start_from_scratch() } fn default_kmeans_niters() -> usize { @@ -94,7 +94,7 @@ impl Default for IndexConfig { kmeans_niters: 4, max_points_per_centroid: 256, n_samples_kmeans: None, - start_from_scratch: 999, + start_from_scratch: crate::default_start_from_scratch(), force_cpu: false, fts_tokenizer: crate::text_search::FtsTokenizer::default(), } @@ -1804,6 +1804,10 @@ mod tests { assert_eq!(config.nbits, 4); assert_eq!(config.batch_size, 50_000); assert_eq!(config.seed, Some(42)); + assert_eq!( + config.start_from_scratch, + crate::default_start_from_scratch() + ); } #[test] diff --git a/next-plaid/src/lib.rs b/next-plaid/src/lib.rs index 706f66a1..f1809191 100644 --- a/next-plaid/src/lib.rs +++ b/next-plaid/src/lib.rs @@ -3,6 +3,8 @@ //! This crate provides a pure Rust, CPU-only implementation of the PLAID algorithm //! for efficient multi-vector search (late interaction retrieval). +use std::sync::OnceLock; + // Link BLAS implementation when feature is enabled #[cfg(feature = "accelerate")] extern crate blas_src; @@ -42,6 +44,23 @@ pub use search::{QueryResult, SearchParameters}; pub use text_search::FtsTokenizer; pub use update::UpdateConfig; +const DEFAULT_START_FROM_SCRATCH: usize = 999; + +fn parse_usize(raw: &str) -> Option { + raw.trim().parse::().ok() +} + +pub fn default_start_from_scratch() -> usize { + static VALUE: OnceLock = OnceLock::new(); + *VALUE.get_or_init(|| { + std::env::var("INDEX_DEFAULT_START_FROM_SCRATCH") + .ok() + .as_deref() + .and_then(parse_usize) + .unwrap_or(DEFAULT_START_FROM_SCRATCH) + }) +} + #[cfg(feature = "cuda")] pub use cuda::{clear_cuda_broken, is_cuda_broken, mark_cuda_broken, CudaContext}; diff --git a/next-plaid/src/update.rs b/next-plaid/src/update.rs index a7f1dc16..1019225d 100644 --- a/next-plaid/src/update.rs +++ b/next-plaid/src/update.rs @@ -100,7 +100,7 @@ impl Default for UpdateConfig { max_points_per_centroid: 256, n_samples_kmeans: None, seed: 42, - start_from_scratch: 999, + start_from_scratch: crate::default_start_from_scratch(), buffer_size: 100, force_cpu: false, } @@ -1132,7 +1132,10 @@ mod tests { let config = UpdateConfig::default(); assert_eq!(config.batch_size, 50_000); assert_eq!(config.buffer_size, 100); - assert_eq!(config.start_from_scratch, 999); + assert_eq!( + config.start_from_scratch, + crate::default_start_from_scratch() + ); } #[test] diff --git a/next-plaid/tests/start_from_scratch_env_test.rs b/next-plaid/tests/start_from_scratch_env_test.rs new file mode 100644 index 00000000..6407a5a6 --- /dev/null +++ b/next-plaid/tests/start_from_scratch_env_test.rs @@ -0,0 +1,17 @@ +use next_plaid::{IndexConfig, UpdateConfig}; + +#[test] +fn defaults_use_index_default_start_from_scratch_env() { + unsafe { + std::env::set_var("INDEX_DEFAULT_START_FROM_SCRATCH", "159"); + } + assert_eq!( + std::env::var("INDEX_DEFAULT_START_FROM_SCRATCH") + .ok() + .as_deref(), + Some("159") + ); + assert_eq!(next_plaid::default_start_from_scratch(), 159); + assert_eq!(IndexConfig::default().start_from_scratch, 159); + assert_eq!(UpdateConfig::default().start_from_scratch, 159); +}