Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions next-plaid-api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
# RATE_LIMIT_ENABLED: Enable rate limiting (default: false)
# RATE_LIMIT_PER_SECOND: Max requests/s when rate limiting is enabled (default: 50)
# RATE_LIMIT_BURST_SIZE: Burst size when rate limiting is enabled (default: 100)
# INDEX_DEFAULT_START_FROM_SCRATCH: Default rebuild-from-scratch threshold for new indexes (default: 999)
# =============================================================================

# =============================================================================
Expand Down Expand Up @@ -225,6 +226,10 @@ ENV INDEX_DIR=/data/indices
# x86_64: MKL statically linked (sequential, no thread explosion)
# aarch64: OpenBLAS with single thread to avoid contention with rayon
ENV OPENBLAS_NUM_THREADS=1
# Default start_from_scratch threshold for new indexes: indexes holding fewer documents
# are rebuilt from scratch on update (higher-quality centroids), larger ones update
# incrementally. Override per-deployment, or per-index via the create-index config.
ENV INDEX_DEFAULT_START_FROM_SCRATCH=999
# Memory-mapped indices are always enabled for efficient memory usage
# Set HF_MODEL_ID to auto-download model from HuggingFace Hub
# Example: ENV HF_MODEL_ID=lightonai/GTE-ModernColBERT-v1
Expand Down Expand Up @@ -286,6 +291,10 @@ ENV RUST_LOG=info
ENV INDEX_DIR=/data/indices
# Prevent OpenBLAS thread explosion when used alongside rayon parallelism
ENV OPENBLAS_NUM_THREADS=1
# Default start_from_scratch threshold for new indexes: indexes holding fewer documents
# are rebuilt from scratch on update (higher-quality centroids), larger ones update
# incrementally. Override per-deployment, or per-index via the create-index config.
ENV INDEX_DEFAULT_START_FROM_SCRATCH=999
# Memory-mapped indices are always enabled for efficient memory usage
# Set HF_MODEL_ID to auto-download model from HuggingFace Hub
# Example: ENV HF_MODEL_ID=lightonai/GTE-ModernColBERT-v1
Expand Down
13 changes: 13 additions & 0 deletions next-plaid-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,19 @@ Rate limiting is **optional and disabled by default**. Enable it by setting `RAT

## Environment Variables

### Indexing

| Variable | Default | Description |
| --------------------------------- | ------- | --------------------------------------------------------------------------- |
| `INDEX_DEFAULT_START_FROM_SCRATCH` | `999` | Default `start_from_scratch` threshold for new indexes (see below) |

When an index update runs, indexes holding **fewer** documents than `start_from_scratch`
are rebuilt from scratch (higher quality centroids) instead of updated incrementally;
larger indexes are updated incrementally (faster). This env var sets the **default**
threshold applied when an index is created without an explicit `config.start_from_scratch`.
A per-index value supplied at creation time always takes precedence. The value is read once
at startup.

### Rate Limiting & Concurrency

| Variable | Default | Description |
Expand Down
117 changes: 103 additions & 14 deletions next-plaid-api/src/handlers/documents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,38 @@ fn max_queued_tasks_per_index() -> usize {
})
}

fn build_update_config(stored_config: &IndexConfigStored) -> UpdateConfig {
let default_update_config = UpdateConfig::default();
UpdateConfig {
batch_size: stored_config.batch_size,
kmeans_niters: default_update_config.kmeans_niters,
max_points_per_centroid: default_update_config.max_points_per_centroid,
n_samples_kmeans: default_update_config.n_samples_kmeans,
seed: stored_config.seed.unwrap_or(default_update_config.seed),
start_from_scratch: stored_config.start_from_scratch,
buffer_size: default_update_config.buffer_size,
force_cpu: default_update_config.force_cpu,
}
}

fn build_stored_index_config(req: &CreateIndexRequest) -> IndexConfigStored {
IndexConfigStored {
nbits: req.config.nbits.unwrap_or(4),
batch_size: req.config.batch_size.unwrap_or(50_000),
seed: req.config.seed,
start_from_scratch: req
.config
.start_from_scratch
.unwrap_or(next_plaid::default_start_from_scratch()),
max_documents: req.config.max_documents,
fts_tokenizer: req
.config
.fts_tokenizer
.clone()
.unwrap_or_else(|| "unicode61".to_string()),
}
}

// --- Index/DB Sync Repair ---

/// Global registry of per-index repair locks to prevent concurrent repair operations.
Expand Down Expand Up @@ -405,7 +437,7 @@ async fn process_batch(
fts_tokenizer: fts_tokenizer.clone(),
..Default::default()
};
let update_config = UpdateConfig::default();
let update_config = build_update_config(&stored_config);

// STEP 1: Update vector index FIRST
let index_update_start = std::time::Instant::now();
Expand Down Expand Up @@ -965,18 +997,7 @@ pub async fn create_index(
}

// Build stored config
let stored_config = IndexConfigStored {
nbits: req.config.nbits.unwrap_or(4),
batch_size: req.config.batch_size.unwrap_or(50_000),
seed: req.config.seed,
start_from_scratch: req.config.start_from_scratch.unwrap_or(999),
max_documents: req.config.max_documents,
fts_tokenizer: req
.config
.fts_tokenizer
.clone()
.unwrap_or_else(|| "unicode61".to_string()),
};
let stored_config = build_stored_index_config(&req);

// Create index directory
std::fs::create_dir_all(&index_path)
Expand Down Expand Up @@ -1222,7 +1243,10 @@ pub async fn add_documents(
let mut index = MmapIndex::load(&path_str)?;

// Update with metadata (metadata is required)
let update_config = UpdateConfig::default();
let stored_config = state_clone
.get_index_config(&name_inner)
.ok_or_else(|| ApiError::IndexNotFound(name_inner.clone()))?;
let update_config = build_update_config(&stored_config);
let index_update_start = std::time::Instant::now();
let progress_state = state_clone.clone();
let progress_name = name_inner.clone();
Expand Down Expand Up @@ -1745,3 +1769,68 @@ pub async fn update_index_with_encoding(
// Immediate Response
Ok((StatusCode::ACCEPTED, Json("Update queued for batching")))
}

#[cfg(test)]
mod tests {
use super::{build_stored_index_config, build_update_config};
use crate::models::{CreateIndexRequest, IndexConfigRequest, IndexConfigStored};

#[test]
fn build_update_config_uses_stored_start_from_scratch() {
let stored_config = IndexConfigStored {
nbits: 4,
batch_size: 12_345,
seed: Some(7),
start_from_scratch: 17,
max_documents: None,
fts_tokenizer: "unicode61".to_string(),
};

let update_config = build_update_config(&stored_config);

assert_eq!(update_config.batch_size, 12_345);
assert_eq!(update_config.seed, 7);
assert_eq!(update_config.start_from_scratch, 17);
}

#[test]
fn build_stored_index_config_uses_request_start_from_scratch_when_present() {
let req = CreateIndexRequest {
name: "idx".to_string(),
config: IndexConfigRequest {
nbits: Some(4),
batch_size: Some(50_000),
seed: Some(42),
start_from_scratch: Some(23),
max_documents: None,
fts_tokenizer: Some("unicode61".to_string()),
},
};

let stored_config = build_stored_index_config(&req);

assert_eq!(stored_config.start_from_scratch, 23);
}

#[test]
fn build_stored_index_config_uses_runtime_default_when_request_omits_threshold() {
let req = CreateIndexRequest {
name: "idx".to_string(),
config: IndexConfigRequest {
nbits: Some(4),
batch_size: Some(50_000),
seed: Some(42),
start_from_scratch: None,
max_documents: None,
fts_tokenizer: Some("unicode61".to_string()),
},
};

let stored_config = build_stored_index_config(&req);

assert_eq!(
stored_config.start_from_scratch,
next_plaid::default_start_from_scratch()
);
}
}
2 changes: 1 addition & 1 deletion next-plaid-api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct IndexConfigStored {
}

fn default_start_from_scratch() -> usize {
999
next_plaid::default_start_from_scratch()
}

fn default_fts_tokenizer() -> String {
Expand Down
147 changes: 147 additions & 0 deletions next-plaid-api/tests/start_from_scratch_env_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::sync::Arc;
use std::time::Duration;

use axum::{
extract::DefaultBodyLimit,
http::StatusCode,
routing::{get, post, put},
Json, Router,
};
use next_plaid_api::{
handlers,
models::CreateIndexResponse,
state::{ApiConfig, AppState},
};
use serde_json::json;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tower_http::{
cors::{Any, CorsLayer},
timeout::TimeoutLayer,
};

fn build_test_router(state: Arc<AppState>) -> Router {
let index_routes = Router::new()
.route(
"/",
get(handlers::list_indices).post(handlers::create_index),
)
.route(
"/{name}",
get(handlers::get_index_info).delete(handlers::delete_index),
);

let document_routes = Router::new()
.route(
"/{name}/documents",
post(handlers::add_documents).delete(handlers::delete_documents),
)
.route("/{name}/update", post(handlers::update_index))
.route("/{name}/config", put(handlers::update_index_config));

let metadata_routes = Router::new()
.route("/{name}/metadata", get(handlers::get_all_metadata))
.route("/{name}/metadata/count", get(handlers::get_metadata_count))
.route("/{name}/metadata/check", post(handlers::check_metadata))
.route("/{name}/metadata/query", post(handlers::query_metadata))
.route("/{name}/metadata/get", post(handlers::get_metadata));

let search_routes = Router::new()
.route("/{name}/search", post(handlers::search))
.route("/{name}/search/filtered", post(handlers::search_filtered));

let rerank_route = Router::new().route("/rerank", post(handlers::rerank));

let indices_router = Router::new()
.merge(index_routes)
.merge(document_routes)
.merge(metadata_routes)
.merge(search_routes);

let health_handler = |state: axum::extract::State<Arc<AppState>>| async move {
Json(json!({
"status": "healthy",
"loaded_indices": state.loaded_count()
}))
};

Router::new()
.route("/health", get(health_handler))
.nest("/indices", indices_router)
.merge(rerank_route)
.layer(DefaultBodyLimit::max(100 * 1024 * 1024))
.layer(TimeoutLayer::with_status_code(
StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(30),
))
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.with_state(state)
}

#[tokio::test]
async fn create_index_uses_env_default_start_from_scratch_when_request_omits_it() {
unsafe {
std::env::set_var("INDEX_DEFAULT_START_FROM_SCRATCH", "159");
}
assert_eq!(
std::env::var("INDEX_DEFAULT_START_FROM_SCRATCH")
.ok()
.as_deref(),
Some("159")
);

let temp_dir = TempDir::new().expect("temp dir should exist");
let config = ApiConfig {
index_dir: temp_dir.path().to_path_buf(),
default_top_k: 10,
};

#[cfg(feature = "model")]
let state = Arc::new(AppState::with_model(config, None, None, None));
#[cfg(not(feature = "model"))]
let state = Arc::new(AppState::new(config));

let app = build_test_router(state);
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("listener should bind");
let addr = listener.local_addr().expect("local addr should exist");
let base_url = format!("http://{}", addr);

tokio::spawn(async move {
axum::serve(listener, app).await.expect("server should run");
});

tokio::time::sleep(Duration::from_millis(100)).await;

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("client should build");

let response = client
.post(format!("{}/indices", base_url))
.json(&json!({
"name": "env-default-index",
"config": {
"nbits": 4
}
}))
.send()
.await
.expect("request should succeed");

assert_eq!(response.status(), reqwest::StatusCode::OK);

let body: CreateIndexResponse = response
.json()
.await
.expect("response body should deserialize");

assert_eq!(body.config.start_from_scratch, 159);
}
8 changes: 6 additions & 2 deletions next-plaid/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct IndexConfig {
}

fn default_start_from_scratch() -> usize {
999
crate::default_start_from_scratch()
}

fn default_kmeans_niters() -> usize {
Expand All @@ -94,7 +94,7 @@ impl Default for IndexConfig {
kmeans_niters: 4,
max_points_per_centroid: 256,
n_samples_kmeans: None,
start_from_scratch: 999,
start_from_scratch: crate::default_start_from_scratch(),
force_cpu: false,
fts_tokenizer: crate::text_search::FtsTokenizer::default(),
}
Expand Down Expand Up @@ -1804,6 +1804,10 @@ mod tests {
assert_eq!(config.nbits, 4);
assert_eq!(config.batch_size, 50_000);
assert_eq!(config.seed, Some(42));
assert_eq!(
config.start_from_scratch,
crate::default_start_from_scratch()
);
}

#[test]
Expand Down
Loading
Loading