diff --git a/backend-v2/src/handlers/document.rs b/backend-v2/src/handlers/document.rs new file mode 100644 index 0000000..08b3d2d --- /dev/null +++ b/backend-v2/src/handlers/document.rs @@ -0,0 +1,201 @@ +use std::sync::Arc; + +use aide::NoApi; +use axum::{ + Json, + extract::{Multipart, Path, Query, State}, + http::StatusCode, +}; +use schemars::JsonSchema; +use serde::Deserialize; +use speedwagon::FileType; +use uuid::Uuid; + +use crate::{ + error::{ApiResult, AppError}, + model::{ + BatchIngestResponse, BatchPurgeResponse, BulkPurgeRequest, DocumentResponse, FailedItem, + }, + state::AppState, +}; + +#[derive(Debug, Deserialize, JsonSchema)] +pub struct ListDocumentsQuery { + #[serde(default)] + pub page: Option, + #[serde(default)] + pub page_size: Option, +} + +pub async fn list_documents( + State(state): State>, + Query(query): Query, +) -> ApiResult>> { + let page = query.page.unwrap_or(0); + let page_size = query.page_size.unwrap_or(50); + + let store = state.store.read().await; + let docs = store + .list(false, page, page_size) + .map_err(|e| AppError::internal(e.to_string()))?; + + Ok(Json(docs.into_iter().map(DocumentResponse::from).collect())) +} + +pub async fn get_document( + State(state): State>, + Path(id): Path, +) -> ApiResult> { + let store = state.store.read().await; + match store.get(id) { + Some(doc) => Ok(Json(DocumentResponse::from(doc))), + None => Err(AppError::not_found("document not found")), + } +} + +fn parse_filetype(filename: &str) -> Result { + let ext = filename.rsplit('.').next().unwrap_or("").to_lowercase(); + match ext.as_str() { + "pdf" => Ok(FileType::PDF), + "md" | "markdown" | "txt" => Ok(FileType::MD), + _ => Err(format!( + "unsupported file type '.{ext}' — supported: pdf, md, txt" + )), + } +} + +pub async fn ingest_document( + State(state): State>, + NoApi(mut multipart): NoApi, +) -> ApiResult<(StatusCode, Json)> { + let mut valid_items: Vec<(Vec, FileType)> = Vec::new(); + let mut filenames: Vec = Vec::new(); + let mut failed: Vec = Vec::new(); + + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| AppError::bad_request(format!("multipart error: {e}")))? + { + if field.name() != Some("file") { + continue; + } + let filename = field.file_name().unwrap_or("upload").to_string(); + let bytes = field + .bytes() + .await + .map_err(|e| AppError::bad_request(format!("multipart error: {e}")))?; + + match parse_filetype(&filename) { + Ok(filetype) => { + valid_items.push((bytes.to_vec(), filetype)); + filenames.push(filename); + } + Err(e) => { + failed.push(FailedItem { + name: filename, + error: e, + }); + } + } + } + + if valid_items.is_empty() && failed.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(AppError::new("missing 'file' field in multipart body")), + )); + } + + let mut store = state.store.write().await; + let result = store + .ingest_many(valid_items) + .await + .map_err(|e| AppError::internal(e.to_string()))?; + + let docs = store + .get_many(&result.succeeded) + .map_err(|e| AppError::internal(e.to_string()))?; + drop(store); + + for f in result.failed { + let name = filenames + .get(f.index) + .cloned() + .unwrap_or_else(|| format!("file[{}]", f.index)); + failed.push(FailedItem { + name, + error: f.error, + }); + } + + for doc in &docs { + tracing::info!(id = %doc.id, title = %doc.title, "document ingested"); + } + + let succeeded: Vec = docs.into_iter().map(DocumentResponse::from).collect(); + let status = if failed.is_empty() { + StatusCode::CREATED + } else { + StatusCode::OK + }; + + Ok((status, Json(BatchIngestResponse { succeeded, failed }))) +} + +pub async fn purge_document( + State(state): State>, + Path(id): Path, +) -> ApiResult { + let mut store = state.store.write().await; + match store + .purge(id) + .map_err(|e| AppError::internal(e.to_string()))? + { + Some(doc) => { + tracing::info!(%id, title = %doc.title, "document purged"); + Ok(StatusCode::NO_CONTENT) + } + None => Err(AppError::not_found("document not found")), + } +} + +pub async fn purge_documents( + State(state): State>, + Json(payload): Json, +) -> ApiResult<(StatusCode, Json)> { + if payload.ids.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(AppError::new("ids must not be empty")), + )); + } + + let mut ids = Vec::with_capacity(payload.ids.len()); + let mut failed = Vec::new(); + for raw_id in payload.ids { + match Uuid::parse_str(&raw_id) { + Ok(id) => ids.push(id), + Err(_) => failed.push(FailedItem { + name: raw_id, + error: "invalid document id".into(), + }), + } + } + + let mut store = state.store.write().await; + let result = store.purge_many(ids); + drop(store); + + let purged: Vec = result.purged.iter().map(|id| id.to_string()).collect(); + failed.extend(result.failed.into_iter().map(|f| FailedItem { + name: f.id.to_string(), + error: f.error, + })); + + for id in &purged { + tracing::info!(%id, "document purged"); + } + + Ok((StatusCode::OK, Json(BatchPurgeResponse { purged, failed }))) +} diff --git a/backend-v2/src/handlers/mod.rs b/backend-v2/src/handlers/mod.rs index b10b7ed..70d4baa 100644 --- a/backend-v2/src/handlers/mod.rs +++ b/backend-v2/src/handlers/mod.rs @@ -1,7 +1,9 @@ mod auth; +mod document; mod session; mod user; pub use auth::*; +pub use document::*; pub use session::*; pub use user::*; diff --git a/backend-v2/src/handlers/session.rs b/backend-v2/src/handlers/session.rs index 415d68e..6953cb8 100644 --- a/backend-v2/src/handlers/session.rs +++ b/backend-v2/src/handlers/session.rs @@ -23,7 +23,7 @@ use crate::{ state::AppState, }; -const DEFAULT_MODEL: &str = "openai/gpt-4o-mini"; +const DEFAULT_MODEL: &str = "openai/gpt-5.4-mini"; fn sandbox_name_for(id: &Uuid) -> String { let s = id.simple().to_string(); diff --git a/backend-v2/src/main.rs b/backend-v2/src/main.rs index 7a2f1ad..4298f47 100644 --- a/backend-v2/src/main.rs +++ b/backend-v2/src/main.rs @@ -88,7 +88,7 @@ async fn run_server() -> std::io::Result<()> { let store_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(".speedwagon"); let store = Arc::new(RwLock::new( - Store::new(store_path).expect("speedwagon store init"), + Store::new(&store_path).expect("speedwagon store init"), )); { diff --git a/backend-v2/src/model/document.rs b/backend-v2/src/model/document.rs new file mode 100644 index 0000000..48cf107 --- /dev/null +++ b/backend-v2/src/model/document.rs @@ -0,0 +1,58 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use speedwagon::Document; + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct DocumentResponse { + pub id: String, + pub title: String, + pub len: usize, +} + +impl From<&Document> for DocumentResponse { + fn from(doc: &Document) -> Self { + Self { + id: doc.id.clone(), + title: doc.title.clone(), + len: doc.len, + } + } +} + +impl From for DocumentResponse { + fn from(doc: Document) -> Self { + Self { + id: doc.id, + title: doc.title, + len: doc.len, + } + } +} + +// --- Response DTOs --- + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct FailedItem { + pub name: String, + pub error: String, +} + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct BatchIngestResponse { + pub succeeded: Vec, + pub failed: Vec, +} + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct BatchPurgeResponse { + pub purged: Vec, + pub failed: Vec, +} + +// --- Request DTOs --- + +#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct BulkPurgeRequest { + pub ids: Vec, +} diff --git a/backend-v2/src/model/mod.rs b/backend-v2/src/model/mod.rs index 1be5775..0308305 100644 --- a/backend-v2/src/model/mod.rs +++ b/backend-v2/src/model/mod.rs @@ -1,7 +1,9 @@ +mod document; mod message; mod session; mod user; +pub use document::*; pub use message::*; pub use session::*; pub use user::*; diff --git a/backend-v2/src/router.rs b/backend-v2/src/router.rs index a79d38d..cd32a1c 100644 --- a/backend-v2/src/router.rs +++ b/backend-v2/src/router.rs @@ -54,10 +54,23 @@ pub fn get_router(state: Arc) -> ApiRouter { post(handlers::send_message_stream), ); + let document_routes = ApiRouter::new() + .api_route( + "/documents", + get(handlers::list_documents) + .post(handlers::ingest_document) + .delete(handlers::purge_documents), + ) + .api_route( + "/documents/{id}", + get(handlers::get_document).delete(handlers::purge_document), + ); + ApiRouter::new() .merge(auth_routes) .merge(me_routes) .merge(admin_routes) .merge(session_routes) + .merge(document_routes) .with_state(state) } diff --git a/backend-v2/tests/common/mod.rs b/backend-v2/tests/common/mod.rs index 4ee58b5..4bb47ac 100644 --- a/backend-v2/tests/common/mod.rs +++ b/backend-v2/tests/common/mod.rs @@ -341,6 +341,141 @@ pub async fn clear_message_history_status( app.clone().oneshot(req).await.unwrap().status() } +// ── Document helpers ───────────────────────────────────────────────────────── + +pub async fn list_documents(app: &axum::Router) -> Vec { + let req = Request::builder() + .method("GET") + .uri("/documents") + .body(Body::empty()) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + assert_eq!(resp.status(), axum::http::StatusCode::OK); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&bytes).unwrap() +} + +fn build_multipart_body(files: &[(&str, &[u8])]) -> (String, Vec) { + let boundary = "----testboundary"; + let mut body = Vec::new(); + for (filename, content) in files { + body.extend_from_slice(format!("--{boundary}\r\n").as_bytes()); + body.extend_from_slice( + format!( + "Content-Disposition: form-data; name=\"file\"; filename=\"{filename}\"\r\n\ + Content-Type: application/octet-stream\r\n\r\n" + ) + .as_bytes(), + ); + body.extend_from_slice(content); + body.extend_from_slice(b"\r\n"); + } + body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes()); + (boundary.to_string(), body) +} + +/// Ingest a single file and return the first succeeded document. +pub async fn ingest_document( + app: &axum::Router, + filename: &str, + content: &[u8], +) -> serde_json::Value { + let batch = ingest_documents(app, &[(filename, content)]).await; + let succeeded = batch["succeeded"] + .as_array() + .expect("succeeded should be array"); + assert!( + !succeeded.is_empty(), + "ingest_document: no succeeded items — failed: {:?}", + batch["failed"] + ); + succeeded[0].clone() +} + +/// Ingest multiple files and return the full BatchIngestResponse. +pub async fn ingest_documents(app: &axum::Router, files: &[(&str, &[u8])]) -> serde_json::Value { + post_documents(app, files).await.1 +} + +/// Ingest files and also return the HTTP status code. +pub async fn ingest_documents_with_status( + app: &axum::Router, + files: &[(&str, &[u8])], +) -> (axum::http::StatusCode, serde_json::Value) { + post_documents(app, files).await +} + +async fn post_documents( + app: &axum::Router, + files: &[(&str, &[u8])], +) -> (axum::http::StatusCode, serde_json::Value) { + let (boundary, body) = build_multipart_body(files); + + let req = Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(Body::from(body)) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + (status, serde_json::from_slice(&bytes).unwrap()) +} + +pub async fn purge_document(app: &axum::Router, id: &str) -> axum::http::StatusCode { + let req = Request::builder() + .method("DELETE") + .uri(format!("/documents/{id}")) + .body(Body::empty()) + .unwrap(); + + app.clone().oneshot(req).await.unwrap().status() +} + +pub async fn bulk_purge_documents( + app: &axum::Router, + ids: &[&str], +) -> (axum::http::StatusCode, serde_json::Value) { + let payload = serde_json::json!({ "ids": ids }); + let req = Request::builder() + .method("DELETE") + .uri("/documents") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&payload).unwrap())) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + ( + status, + serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null), + ) +} + +pub async fn get_document( + app: &axum::Router, + id: &str, +) -> (axum::http::StatusCode, serde_json::Value) { + let req = Request::builder() + .method("GET") + .uri(format!("/documents/{id}")) + .body(Body::empty()) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + let body = serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null); + (status, body) +} + // ── Text extraction ─────────────────────────────────────────────────────────── pub fn extract_text_from_slice(outputs: &[serde_json::Value]) -> String { diff --git a/backend-v2/tests/document_test.rs b/backend-v2/tests/document_test.rs new file mode 100644 index 0000000..71ae245 --- /dev/null +++ b/backend-v2/tests/document_test.rs @@ -0,0 +1,312 @@ +#[path = "common/mod.rs"] +mod common; + +use axum::http::StatusCode; +use common::{ + bulk_purge_documents, get_document, ingest_document, ingest_documents_with_status, + list_documents, make_app_with_repo, make_repo, purge_document, +}; +use http_body_util::BodyExt; +use tower::ServiceExt; + +#[tokio::test] +async fn list_documents_empty_initially() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let docs = list_documents(&app).await; + assert!(docs.is_empty()); +} + +#[tokio::test] +async fn ingest_and_list_document() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Test Document\n\nThis is test content for indexing."; + let doc = ingest_document(&app, "test.md", content).await; + + assert!(doc.get("id").is_some(), "response should contain id"); + assert!(doc.get("title").is_some(), "response should contain title"); + assert!(doc.get("len").is_some(), "response should contain len"); + + let docs = list_documents(&app).await; + assert_eq!(docs.len(), 1); + assert_eq!(docs[0]["id"], doc["id"]); +} + +#[tokio::test] +async fn get_document_by_id() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Getting by ID\n\nSome content here."; + let created = ingest_document(&app, "get-test.md", content).await; + let id = created["id"].as_str().unwrap(); + + let (status, fetched) = get_document(&app, id).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(fetched["id"].as_str().unwrap(), id); + assert_eq!(fetched["title"].as_str(), created["title"].as_str()); +} + +#[tokio::test] +async fn get_nonexistent_document_returns_404() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let fake_id = uuid::Uuid::new_v4(); + let (status, _) = get_document(&app, &fake_id.to_string()).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn purge_document_removes_it() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# To Be Purged\n\nThis document will be deleted."; + let doc = ingest_document(&app, "purge-me.md", content).await; + let id = doc["id"].as_str().unwrap(); + + let status = purge_document(&app, id).await; + assert_eq!(status, StatusCode::NO_CONTENT); + + let docs = list_documents(&app).await; + assert!(docs.is_empty(), "document list should be empty after purge"); + + let (status, _) = get_document(&app, id).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn purge_nonexistent_returns_404() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let fake_id = uuid::Uuid::new_v4(); + let status = purge_document(&app, &fake_id.to_string()).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn ingest_duplicate_returns_same_id() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Duplicate Test\n\nSame content, same ID."; + let doc1 = ingest_document(&app, "dup1.md", content).await; + let doc2 = ingest_document(&app, "dup2.md", content).await; + + assert_eq!( + doc1["id"].as_str().unwrap(), + doc2["id"].as_str().unwrap(), + "same content should produce same UUID (UUIDv5)" + ); + + let docs = list_documents(&app).await; + assert_eq!( + docs.len(), + 1, + "duplicate ingest should not create extra document" + ); +} + +// ── Multi-file ingest tests ────────────────────────────────────────────────── + +#[tokio::test] +async fn ingest_multiple_documents() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[ + ("doc1.md", b"# Document One\n\nFirst document."), + ("doc2.txt", b"# Document Two\n\nSecond document."), + ]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::CREATED); + + let succeeded = batch["succeeded"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2); + assert!(batch["failed"].as_array().unwrap().is_empty()); + + let docs = list_documents(&app).await; + assert_eq!(docs.len(), 2); +} + +#[tokio::test] +async fn ingest_partial_failure_mixed_filetypes() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[ + ("good.md", b"# Good Document\n\nValid markdown."), + ("bad.csv", b"a,b,c\n1,2,3"), + ("also-good.txt", b"# Another Good\n\nAlso valid."), + ]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::OK, "partial success should return 200"); + + let succeeded = batch["succeeded"].as_array().unwrap(); + let failed = batch["failed"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), "bad.csv"); +} + +#[tokio::test] +async fn ingest_all_unsupported_returns_empty_succeeded() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[("data.csv", b"a,b,c")]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::OK); + + let succeeded = batch["succeeded"].as_array().unwrap(); + let failed = batch["failed"].as_array().unwrap(); + assert!(succeeded.is_empty()); + assert_eq!(failed.len(), 1); +} + +#[tokio::test] +async fn ingest_no_file_field_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let boundary = "----testboundary"; + let body = format!("--{boundary}--\r\n"); + + let req = axum::http::Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(axum::body::Body::from(body)) + .unwrap(); + + let resp = tower::ServiceExt::oneshot(app, req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +} + +#[tokio::test] +async fn ingest_malformed_multipart_after_valid_file_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let boundary = "----testboundary"; + let body = format!( + "--{boundary}\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"good.md\"\r\n\ + Content-Type: text/markdown\r\n\r\n\ + # Good Document\r\n\ + --{boundary}\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"bad.md\"\r\n\ + Content-Type: text/markdown\r\n\r\n\ + # Missing closing boundary" + ); + + let req = axum::http::Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(axum::body::Body::from(body)) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + let body: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + assert!( + body["error"] + .as_str() + .unwrap_or_default() + .contains("multipart error"), + "unexpected error body: {body}" + ); +} + +// ── Bulk purge tests ───────────────────────────────────────────────────────── + +#[tokio::test] +async fn bulk_purge_multiple_documents() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc1 = ingest_document(&app, "a.md", b"# Doc A\n\nContent A.").await; + let doc2 = ingest_document(&app, "b.md", b"# Doc B\n\nContent B.").await; + let id1 = doc1["id"].as_str().unwrap(); + let id2 = doc2["id"].as_str().unwrap(); + + let (status, resp) = bulk_purge_documents(&app, &[id1, id2]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + assert_eq!(purged.len(), 2); + assert!(resp["failed"].as_array().unwrap().is_empty()); + + let docs = list_documents(&app).await; + assert!(docs.is_empty()); +} + +#[tokio::test] +async fn bulk_purge_partial_failure() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc = ingest_document(&app, "c.md", b"# Doc C\n\nContent C.").await; + let real_id = doc["id"].as_str().unwrap(); + let fake_id = uuid::Uuid::new_v4().to_string(); + + let (status, resp) = bulk_purge_documents(&app, &[real_id, &fake_id]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + let failed = resp["failed"].as_array().unwrap(); + assert_eq!(purged.len(), 1); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), fake_id); +} + +#[tokio::test] +async fn bulk_purge_invalid_id_is_item_failure() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc = ingest_document(&app, "valid.md", b"# Valid\n\nContent.").await; + let real_id = doc["id"].as_str().unwrap(); + let invalid_id = "not-a-uuid"; + + let (status, resp) = bulk_purge_documents(&app, &[real_id, invalid_id]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + let failed = resp["failed"].as_array().unwrap(); + assert_eq!(purged.len(), 1); + assert_eq!(purged[0].as_str().unwrap(), real_id); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), invalid_id); + assert!( + failed[0]["error"] + .as_str() + .unwrap_or_default() + .contains("invalid document id") + ); +} + +#[tokio::test] +async fn bulk_purge_empty_ids_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let (status, _) = bulk_purge_documents(&app, &[]).await; + assert_eq!(status, StatusCode::BAD_REQUEST); +} diff --git a/backend-v2/tests/e2e_test.rs b/backend-v2/tests/e2e_test.rs index 043c697..5651deb 100644 --- a/backend-v2/tests/e2e_test.rs +++ b/backend-v2/tests/e2e_test.rs @@ -6,9 +6,86 @@ use std::sync::Arc; use agent_k_backend::{repository, router::get_router, state::AppState}; use aide::openapi::OpenApi; use ailoy::{agent::default_provider_mut, lang_model::LangModelProvider}; -use common::{extract_text, post_session, send_message, test_jwt_config}; -use speedwagon::{FileType, Store, build_tools}; +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use common::test_jwt_config; +use http_body_util::BodyExt; +use speedwagon::{Store, build_tools}; use tokio::sync::RwLock; +use tower::ServiceExt; + +fn json_request(method: &str, uri: &str, body: Option<&str>) -> Request { + let builder = Request::builder() + .method(method) + .uri(uri) + .header("content-type", "application/json"); + match body { + Some(b) => builder.body(Body::from(b.to_string())).unwrap(), + None => builder.body(Body::from("{}")).unwrap(), + } +} + +fn multipart_request(files: &[(&str, &[u8])]) -> Request { + let boundary = "----e2e-test-boundary"; + let mut body = Vec::new(); + for (filename, content) in files { + body.extend_from_slice(format!("--{boundary}\r\n").as_bytes()); + body.extend_from_slice( + format!( + "Content-Disposition: form-data; name=\"file\"; filename=\"{filename}\"\r\n\ + Content-Type: application/octet-stream\r\n\r\n" + ) + .as_bytes(), + ); + body.extend_from_slice(content); + body.extend_from_slice(b"\r\n"); + } + body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes()); + + Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(Body::from(body)) + .unwrap() +} + +fn extract_assistant_text(outputs: &serde_json::Value) -> String { + outputs + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|o| { + let depth = o.get("depth").and_then(|d| d.as_u64()).unwrap_or(0); + if depth != 0 { + return None; + } + o.get("message")? + .get("contents")? + .as_array()? + .iter() + .filter_map(|p| p.get("text")?.as_str()) + .map(str::to_string) + .reduce(|a, b| a + &b) + }) + .collect::>() + .join("") + }) + .unwrap_or_default() +} + +fn assert_send_ok(status: StatusCode, body: &[u8]) -> serde_json::Value { + if status != StatusCode::OK { + panic!( + "send_message returned {status}: {}", + String::from_utf8_lossy(body) + ); + } + serde_json::from_slice(body).unwrap() +} #[tokio::test] #[ignore = "requires OPENAI_API_KEY"] @@ -30,53 +107,126 @@ async fn test_ingest_message_purge_cycle() { provider.tools = build_tools(store.clone()); } - let test_content = b"The capital of Freedonia is Glorkville. This is a unique fact."; - let doc_id = store - .write() - .await - .ingest(test_content.iter().copied(), FileType::MD) - .await - .expect("ingest failed"); - let repo = repository::create_repository("sqlite::memory:") .await .expect("test repo init"); - let state = Arc::new(AppState::new(repo, store.clone(), test_jwt_config())); + let state = Arc::new(AppState::new(repo, store, test_jwt_config())); let app = get_router(state).finish_api(&mut OpenApi::default()); - let session_id = post_session(&app).await; + // ── Ingest two documents via HTTP multipart ────────────────────────────── + let resp = app + .clone() + .oneshot(multipart_request(&[ + ( + "freedonia.md", + b"The capital of Freedonia is Glorkville. This is a unique fact.", + ), + ( + "zorbax.md", + b"The largest ocean on planet Zorbax is the Shimmer Sea. It covers 40% of the surface.", + ), + ])) + .await + .unwrap(); + let status = resp.status(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let batch: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!( + status, + StatusCode::CREATED, + "batch ingest should succeed: {batch}" + ); - let outputs = send_message(&app, session_id, "What is the capital of Freedonia?").await; - let arr = outputs.as_array().expect("response must be an array"); + let succeeded = batch["succeeded"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2, "both documents should ingest"); + let doc_ids: Vec<&str> = succeeded + .iter() + .map(|d| d["id"].as_str().unwrap()) + .collect(); - assert!(!arr.is_empty(), "messages should not be empty"); + // ── Create session ─────────────────────────────────────────────────────── + let resp = app + .clone() + .oneshot(json_request("POST", "/sessions", None)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let session: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let session_id = session["id"].as_str().unwrap(); + let msg_uri = format!("/sessions/{session_id}/messages"); - let has_assistant = arr.iter().any(|o| { - o.get("message") - .and_then(|m| m.get("role")) - .and_then(|r| r.as_str()) - == Some("assistant") - }); + // ── Question about document 1 (Freedonia) ──────────────────────────────── + let q1 = serde_json::json!({ "content": "What is the capital of Freedonia?" }).to_string(); + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q1))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let text = extract_assistant_text(&outputs); assert!( - has_assistant, - "should contain at least one assistant message" + text.contains("Glorkville"), + "response should mention 'Glorkville', got: {text}", ); - let text = extract_text(&outputs); - assert!(!text.is_empty(), "assistant text should not be empty"); + // ── Question about document 2 (Zorbax) ─────────────────────────────────── + let q2 = + serde_json::json!({ "content": "What is the largest ocean on planet Zorbax?" }).to_string(); + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q2))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let text = extract_assistant_text(&outputs); assert!( - text.contains("Glorkville"), - "response should mention 'Glorkville' from the ingested document, got: {text}", + text.contains("Shimmer Sea"), + "response should mention 'Shimmer Sea', got: {text}", ); - // Purge the document - store.write().await.purge(doc_id).expect("purge failed"); + // ── Bulk purge both documents via HTTP ─────────────────────────────────── + let purge_body = serde_json::json!({ "ids": doc_ids }).to_string(); + let resp = app + .clone() + .oneshot(json_request("DELETE", "/documents", Some(&purge_body))) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let purge_resp: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let purged = purge_resp["purged"].as_array().unwrap(); + assert_eq!(purged.len(), 2, "both documents should be purged"); + + // ── Verify documents are gone ──────────────────────────────────────────── + let resp = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri("/documents") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let docs: Vec = serde_json::from_slice(&body).unwrap(); + assert!(docs.is_empty(), "document list should be empty after purge"); - // Send same message after purge - let outputs = send_message(&app, session_id, "What is the capital of Freedonia?").await; - let post_purge_text = extract_text(&outputs); + // ── Post-purge question (agent should still respond, just without KB) ──── + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q1))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let post_purge_text = extract_assistant_text(&outputs); assert!( !post_purge_text.is_empty(), - "post-purge response should not be empty" + "post-purge response should not be empty", ); } diff --git a/speedwagon/src/store/mod.rs b/speedwagon/src/store/mod.rs index acc6636..80b3cab 100644 --- a/speedwagon/src/store/mod.rs +++ b/speedwagon/src/store/mod.rs @@ -7,7 +7,7 @@ mod searcher; mod translator; use std::{ - fs, + fs, io, path::{Path, PathBuf}, sync::Arc, }; @@ -21,8 +21,40 @@ use uuid::Uuid; pub use document::{Document, FindResult}; pub use searcher::{SearchPage, SearchResult}; +#[derive(Debug, Clone)] +pub struct IngestResult { + pub succeeded: Vec, + pub failed: Vec, +} + +#[derive(Debug, Clone)] +pub struct IngestFailure { + pub index: usize, + pub error: String, +} + +#[derive(Debug, Clone)] +pub struct PurgeResult { + pub purged: Vec, + pub failed: Vec, +} + +#[derive(Debug, Clone)] +pub struct PurgeFailure { + pub id: Uuid, + pub error: String, +} + pub type SharedStore = Arc>; +fn remove_ingest_artifact(path: &Path) { + match fs::remove_file(path) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => log::warn!("failed to clean up ingest artifact {:?}: {e}", path), + } +} + /// Speedwagon store layout: /// /// ```text @@ -92,64 +124,124 @@ impl Store { Ok(id) } - /// Adds multiple files in one batch: translates each to corpus, resolves titles, then - /// commits them all to the index in a single write. + /// Adds multiple files in one batch with partial-success semantics. + /// Successfully processed files are batched into a single index write. + /// Files that fail at any stage (translation, reading, title extraction) are + /// recorded in `IngestResult::failed` and their intermediate files are cleaned up. pub async fn ingest_many( &mut self, items: impl IntoIterator, FileType)>, - ) -> Result> { - let items = items + ) -> Result { + let items: Vec<(Vec, FileType)> = items .into_iter() .map(|v| (v.0.into_iter().collect::>(), v.1)) - .collect::>(); - let mut all_ids = Vec::with_capacity(items.len()); - let mut to_index: Vec<(Uuid, String)> = Vec::new(); // (id, content) + .collect(); - for (bytes, filetype) in &items { - let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); - all_ids.push(id); + let mut succeeded = Vec::with_capacity(items.len()); + let mut failed = Vec::new(); + let mut to_index: Vec<(usize, Uuid, String, bool)> = Vec::new(); // (input index, id, content, new_corpus) + for (idx, (bytes, filetype)) in items.iter().enumerate() { + let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); let corpus_path = self.root.join("corpus").join(format!("{id}.md")); - if !corpus_path.exists() { - match filetype { - FileType::MD => { - fs::write(&corpus_path, bytes)?; - } + let new_corpus = !corpus_path.exists(); + + if new_corpus { + let mut new_origin: Option = None; + + let ok = match filetype { + FileType::MD => fs::write(&corpus_path, bytes).map_err(|e| e.to_string()), _ => { let ext = filetype.to_string(); let origin_path = self.root.join("origin").join(format!("{id}.{ext}")); if !origin_path.exists() { - fs::write(&origin_path, bytes)?; + if let Err(e) = fs::write(&origin_path, bytes) { + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + continue; + } + new_origin = Some(origin_path.clone()); } - translator::translate(&origin_path, &corpus_path)?; + translator::translate(&origin_path, &corpus_path).map_err(|e| e.to_string()) } + }; + + if let Err(e) = ok { + if new_corpus { + remove_ingest_artifact(&corpus_path); + } + if let Some(origin) = &new_origin { + remove_ingest_artifact(origin); + } + failed.push(IngestFailure { + index: idx, + error: e, + }); + continue; } } - if !indexer::document_exists(&self.index, &id.to_string())? { - let content = fs::read_to_string(&corpus_path) - .with_context(|| format!("failed to read corpus: {corpus_path:?}"))?; - to_index.push((id, content)); + match indexer::document_exists(&self.index, &id.to_string()) { + Ok(true) => { + succeeded.push(id); + } + Ok(false) => match fs::read_to_string(&corpus_path) { + Ok(content) => { + to_index.push((idx, id, content, new_corpus)); + } + Err(e) => { + if new_corpus { + remove_ingest_artifact(&corpus_path); + } + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + } + }, + Err(e) => { + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + } } } - if to_index.is_empty() { - return Ok(all_ids); - } - let mut docs: Vec<(String, String, String)> = Vec::with_capacity(to_index.len()); - for (id, content) in to_index { - let title = parser::get_title(&content).await?; - docs.push((id.to_string(), title, content)); + for (idx, id, content, new_corpus) in to_index { + match parser::get_title(&content).await { + Ok(title) => docs.push((id.to_string(), title, content)), + Err(e) => { + let corpus_path = self.root.join("corpus").join(format!("{id}.md")); + if new_corpus { + remove_ingest_artifact(&corpus_path); + } + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + } + } } - let refs: Vec<(&str, &str, &str)> = docs - .iter() - .map(|(id, title, content)| (id.as_str(), title.as_str(), content.as_str())) - .collect(); - indexer::add_documents(&self.index, &refs)?; + if !docs.is_empty() { + let refs: Vec<(&str, &str, &str)> = docs + .iter() + .map(|(id, title, content)| (id.as_str(), title.as_str(), content.as_str())) + .collect(); + indexer::add_documents(&self.index, &refs)?; + + for (id_str, _, _) in &docs { + if let Ok(id) = id_str.parse::() { + succeeded.push(id); + } + } + } - Ok(all_ids) + Ok(IngestResult { succeeded, failed }) } /// Removes a document from the index and deletes its origin and corpus files. @@ -179,6 +271,28 @@ impl Store { Ok(doc) } + /// Removes multiple documents with partial-success semantics. + pub fn purge_many(&mut self, ids: impl IntoIterator) -> PurgeResult { + let mut purged = Vec::new(); + let mut failed = Vec::new(); + + for id in ids { + match self.purge(id) { + Ok(Some(_)) => purged.push(id), + Ok(None) => failed.push(PurgeFailure { + id, + error: "document not found".into(), + }), + Err(e) => failed.push(PurgeFailure { + id, + error: e.to_string(), + }), + } + } + + PurgeResult { purged, failed } + } + /// Get # of documents pub fn count(&self) -> u32 { indexer::num_documents(&self.index).unwrap_or(0) as u32 @@ -254,6 +368,30 @@ mod tests { use super::*; + #[tokio::test] + async fn ingest_many_preserves_existing_corpus_when_corpus_read_fails() { + let tempdir = tempfile::tempdir().expect("failed to create tempdir"); + let mut store = Store::new(tempdir.path()).expect("failed to create store"); + + let bytes = b"same input bytes"; + let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); + let corpus_path = tempdir.path().join("corpus").join(format!("{id}.md")); + let invalid_utf8 = [0xff, 0xfe, 0xfd]; + fs::write(&corpus_path, invalid_utf8).expect("failed to seed existing corpus"); + + let result = store + .ingest_many([(bytes.to_vec(), FileType::MD)]) + .await + .expect("ingest_many should report per-item failure"); + + assert!(result.succeeded.is_empty()); + assert_eq!(result.failed.len(), 1); + assert_eq!( + fs::read(&corpus_path).expect("existing corpus should remain"), + invalid_utf8 + ); + } + #[tokio::test] #[ignore = "requires network access & docling"] async fn test_ingest_financebench_samples() { diff --git a/speedwagon/src/store/preset.rs b/speedwagon/src/store/preset.rs index 13cd4b4..3259f5e 100644 --- a/speedwagon/src/store/preset.rs +++ b/speedwagon/src/store/preset.rs @@ -33,8 +33,12 @@ pub async fn setup_docset(store: &mut Store, preset: &PresetKind) -> Result<()> } println!("Ingesting corpus FinanceBench..."); - let ids = store.ingest_many(items).await?; - println!("Done. {} documents ingested.", ids.len()); + let result = store.ingest_many(items).await?; + println!( + "Done. {} documents ingested, {} failed.", + result.succeeded.len(), + result.failed.len(), + ); } } Ok(())