From a69222c033541a6ea9e756519b115515dc950826 Mon Sep 17 00:00:00 2001 From: ljhh-0611 Date: Wed, 6 May 2026 16:08:25 +0900 Subject: [PATCH 1/4] feat: batch document ingest + bulk purge with partial success - Add `Store::ingest_many` partial success (IngestResult/IngestFailure) with batch index optimization and best-effort cleanup on failure - Add `Store::purge_many` (PurgeResult/PurgeFailure) - POST /documents: multi-file multipart upload with per-file validation - DELETE /documents: bulk purge via JSON body { ids: [...] } - GET /documents/{id}: single document retrieval - Response DTOs: BatchIngestResponse, BatchPurgeResponse, FailedItem - 14 new document tests + e2e test rewritten for multi-doc HTTP flow Co-Authored-By: Claude Opus 4.6 --- backend-v2/src/main.rs | 2 +- backend-v2/src/model/document.rs | 59 ++++++++ backend-v2/src/model/mod.rs | 2 + backend-v2/src/router.rs | 192 ++++++++++++++++++++++- backend-v2/tests/common/mod.rs | 144 +++++++++++++++++- backend-v2/tests/document_test.rs | 244 ++++++++++++++++++++++++++++++ backend-v2/tests/e2e_test.rs | 227 +++++++++++++++++++++------ speedwagon/src/store/mod.rs | 167 ++++++++++++++++---- speedwagon/src/store/preset.rs | 8 +- 9 files changed, 963 insertions(+), 82 deletions(-) create mode 100644 backend-v2/src/model/document.rs create mode 100644 backend-v2/tests/document_test.rs diff --git a/backend-v2/src/main.rs b/backend-v2/src/main.rs index 84814ca..5fe261f 100644 --- a/backend-v2/src/main.rs +++ b/backend-v2/src/main.rs @@ -52,7 +52,7 @@ async fn main() -> std::io::Result<()> { let store_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(".speedwagon"); let store = Arc::new(RwLock::new( - Store::new(store_path).expect("speedwagon store init"), + Store::new(&store_path).expect("speedwagon store init"), )); // Register API keys with the global provider (needed by Agent::try_with_tools) diff --git a/backend-v2/src/model/document.rs b/backend-v2/src/model/document.rs new file mode 100644 index 0000000..b0b7f52 --- /dev/null +++ b/backend-v2/src/model/document.rs @@ -0,0 +1,59 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use speedwagon::Document; +use uuid::Uuid; + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct DocumentResponse { + pub id: String, + pub title: String, + pub len: usize, +} + +impl From<&Document> for DocumentResponse { + fn from(doc: &Document) -> Self { + Self { + id: doc.id.clone(), + title: doc.title.clone(), + len: doc.len, + } + } +} + +impl From for DocumentResponse { + fn from(doc: Document) -> Self { + Self { + id: doc.id, + title: doc.title, + len: doc.len, + } + } +} + +// --- Response DTOs --- + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct FailedItem { + pub name: String, + pub error: String, +} + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct BatchIngestResponse { + pub succeeded: Vec, + pub failed: Vec, +} + +#[derive(Clone, Debug, Serialize, JsonSchema)] +pub struct BatchPurgeResponse { + pub purged: Vec, + pub failed: Vec, +} + +// --- Request DTOs --- + +#[derive(Clone, Debug, Deserialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct BulkPurgeRequest { + pub ids: Vec, +} diff --git a/backend-v2/src/model/mod.rs b/backend-v2/src/model/mod.rs index fa2c7b8..5a4e9de 100644 --- a/backend-v2/src/model/mod.rs +++ b/backend-v2/src/model/mod.rs @@ -1,5 +1,7 @@ +mod document; mod message; mod session; +pub use document::*; pub use message::*; pub use session::*; diff --git a/backend-v2/src/router.rs b/backend-v2/src/router.rs index 1c9e28c..7bb8f79 100644 --- a/backend-v2/src/router.rs +++ b/backend-v2/src/router.rs @@ -12,18 +12,22 @@ use ailoy::{ }; use axum::{ Json, - extract::{Path, State}, + extract::{Multipart, Path, Query, State}, http::StatusCode, response::sse::{Event, KeepAlive, Sse}, }; use chrono::Utc; use futures_util::StreamExt; -use speedwagon::SpeedwagonSpec; +use serde::Deserialize; +use speedwagon::{FileType, SpeedwagonSpec}; use uuid::Uuid; use crate::{ error::{ApiResult, AppError}, - model::{CreateSessionRequest, SendMessageRequest, SendMessageResponse, SessionResponse}, + model::{ + BatchIngestResponse, BatchPurgeResponse, BulkPurgeRequest, CreateSessionRequest, + DocumentResponse, FailedItem, SendMessageRequest, SendMessageResponse, SessionResponse, + }, state::AppState, }; @@ -47,11 +51,20 @@ pub fn get_router(state: Arc) -> ApiRouter { "/sessions/{id}/messages", axum::routing::get(get_message_history).delete(clear_message_history), ) + .route( + "/documents", + axum::routing::get(list_documents) + .post(ingest_document) + .delete(purge_documents), + ) + .route( + "/documents/{id}", + axum::routing::get(get_document).delete(purge_document), + ) .with_state(state) } async fn build_agent(sandbox: Sandbox) -> Result { - // Speedwagon RAG subagent let sw_card = AgentCard { name: "speedwagon".into(), description: "Search the knowledge base for answers. \ @@ -373,3 +386,174 @@ async fn send_message_stream( Ok(Sse::new(stream).keep_alive(KeepAlive::default())) } + +// ── Document endpoints ─────────────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +struct ListDocumentsQuery { + #[serde(default)] + page: Option, + #[serde(default)] + page_size: Option, +} + +async fn list_documents( + State(state): State>, + Query(query): Query, +) -> ApiResult>> { + let page = query.page.unwrap_or(0); + let page_size = query.page_size.unwrap_or(50); + + let store = state.store.read().await; + let docs = store + .list(false, page, page_size) + .map_err(|e| AppError::internal(e.to_string()))?; + + Ok(Json(docs.into_iter().map(DocumentResponse::from).collect())) +} + +async fn get_document( + State(state): State>, + Path(id): Path, +) -> ApiResult> { + let store = state.store.read().await; + match store.get(id) { + Some(doc) => Ok(Json(DocumentResponse::from(doc))), + None => Err(AppError::not_found("document not found")), + } +} + +fn parse_filetype(filename: &str) -> Result { + let ext = filename.rsplit('.').next().unwrap_or("").to_lowercase(); + match ext.as_str() { + "pdf" => Ok(FileType::PDF), + "md" | "markdown" | "txt" => Ok(FileType::MD), + _ => Err(format!( + "unsupported file type '.{ext}' — supported: pdf, md, txt" + )), + } +} + +async fn ingest_document( + State(state): State>, + mut multipart: Multipart, +) -> ApiResult<(StatusCode, Json)> { + let mut valid_items: Vec<(Vec, FileType)> = Vec::new(); + let mut filenames: Vec = Vec::new(); + let mut failed: Vec = Vec::new(); + + while let Ok(Some(field)) = multipart.next_field().await { + if field.name() != Some("file") { + continue; + } + let filename = field.file_name().unwrap_or("upload").to_string(); + let bytes = field + .bytes() + .await + .map_err(|e| AppError::internal(format!("failed to read upload: {e}")))?; + + match parse_filetype(&filename) { + Ok(filetype) => { + valid_items.push((bytes.to_vec(), filetype)); + filenames.push(filename); + } + Err(e) => { + failed.push(FailedItem { + name: filename, + error: e, + }); + } + } + } + + if valid_items.is_empty() && failed.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(AppError::new("missing 'file' field in multipart body")), + )); + } + + let mut store = state.store.write().await; + let result = store + .ingest_many(valid_items) + .await + .map_err(|e| AppError::internal(e.to_string()))?; + + let docs = store + .get_many(&result.succeeded) + .map_err(|e| AppError::internal(e.to_string()))?; + drop(store); + + for f in result.failed { + let name = filenames + .get(f.index) + .cloned() + .unwrap_or_else(|| format!("file[{}]", f.index)); + failed.push(FailedItem { + name, + error: f.error, + }); + } + + for doc in &docs { + tracing::info!(id = %doc.id, title = %doc.title, "document ingested"); + } + + let succeeded: Vec = docs.into_iter().map(DocumentResponse::from).collect(); + let status = if failed.is_empty() { + StatusCode::CREATED + } else { + StatusCode::OK + }; + + Ok((status, Json(BatchIngestResponse { succeeded, failed }))) +} + +async fn purge_document( + State(state): State>, + Path(id): Path, +) -> ApiResult { + let mut store = state.store.write().await; + match store + .purge(id) + .map_err(|e| AppError::internal(e.to_string()))? + { + Some(doc) => { + tracing::info!(%id, title = %doc.title, "document purged"); + Ok(StatusCode::NO_CONTENT) + } + None => Err(AppError::not_found("document not found")), + } +} + +async fn purge_documents( + State(state): State>, + Json(payload): Json, +) -> ApiResult<(StatusCode, Json)> { + if payload.ids.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + Json(AppError::new("ids must not be empty")), + )); + } + + let mut store = state.store.write().await; + let result = store.purge_many(payload.ids); + drop(store); + + let purged: Vec = result.purged.iter().map(|id| id.to_string()).collect(); + let failed: Vec = result + .failed + .into_iter() + .map(|f| FailedItem { + name: f.id.to_string(), + error: f.error, + }) + .collect(); + + for id in &purged { + tracing::info!(%id, "document purged"); + } + + Ok((StatusCode::OK, Json(BatchPurgeResponse { purged, failed }))) +} diff --git a/backend-v2/tests/common/mod.rs b/backend-v2/tests/common/mod.rs index 1bb88bc..fbd1205 100644 --- a/backend-v2/tests/common/mod.rs +++ b/backend-v2/tests/common/mod.rs @@ -49,7 +49,7 @@ pub async fn make_repo() -> repository::AppRepository { pub fn make_test_store() -> speedwagon::SharedStore { let store_path = std::env::temp_dir().join(format!("speedwagon-test-{}", uuid::Uuid::new_v4())); let store = Arc::new(RwLock::new( - Store::new(store_path).expect("test store init"), + Store::new(&store_path).expect("test store init"), )); store } @@ -264,6 +264,148 @@ pub async fn clear_message_history_status( app.clone().oneshot(req).await.unwrap().status() } +// ── Document helpers ───────────────────────────────────────────────────────── + +pub async fn list_documents(app: &axum::Router) -> Vec { + let req = Request::builder() + .method("GET") + .uri("/documents") + .body(Body::empty()) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + assert_eq!(resp.status(), axum::http::StatusCode::OK); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&bytes).unwrap() +} + +fn build_multipart_body(files: &[(&str, &[u8])]) -> (String, Vec) { + let boundary = "----testboundary"; + let mut body = Vec::new(); + for (filename, content) in files { + body.extend_from_slice(format!("--{boundary}\r\n").as_bytes()); + body.extend_from_slice( + format!( + "Content-Disposition: form-data; name=\"file\"; filename=\"{filename}\"\r\n\ + Content-Type: application/octet-stream\r\n\r\n" + ) + .as_bytes(), + ); + body.extend_from_slice(content); + body.extend_from_slice(b"\r\n"); + } + body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes()); + (boundary.to_string(), body) +} + +/// Ingest a single file and return the first succeeded document. +pub async fn ingest_document( + app: &axum::Router, + filename: &str, + content: &[u8], +) -> serde_json::Value { + let batch = ingest_documents(app, &[(filename, content)]).await; + let succeeded = batch["succeeded"] + .as_array() + .expect("succeeded should be array"); + assert!( + !succeeded.is_empty(), + "ingest_document: no succeeded items — failed: {:?}", + batch["failed"] + ); + succeeded[0].clone() +} + +/// Ingest multiple files and return the full BatchIngestResponse. +pub async fn ingest_documents(app: &axum::Router, files: &[(&str, &[u8])]) -> serde_json::Value { + let (boundary, body) = build_multipart_body(files); + + let req = Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(Body::from(body)) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + serde_json::from_slice(&bytes).unwrap() +} + +/// Ingest files and also return the HTTP status code. +pub async fn ingest_documents_with_status( + app: &axum::Router, + files: &[(&str, &[u8])], +) -> (axum::http::StatusCode, serde_json::Value) { + let (boundary, body) = build_multipart_body(files); + + let req = Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(Body::from(body)) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + (status, serde_json::from_slice(&bytes).unwrap()) +} + +pub async fn purge_document(app: &axum::Router, id: &str) -> axum::http::StatusCode { + let req = Request::builder() + .method("DELETE") + .uri(format!("/documents/{id}")) + .body(Body::empty()) + .unwrap(); + + app.clone().oneshot(req).await.unwrap().status() +} + +pub async fn bulk_purge_documents( + app: &axum::Router, + ids: &[&str], +) -> (axum::http::StatusCode, serde_json::Value) { + let payload = serde_json::json!({ "ids": ids }); + let req = Request::builder() + .method("DELETE") + .uri("/documents") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&payload).unwrap())) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + ( + status, + serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null), + ) +} + +pub async fn get_document( + app: &axum::Router, + id: &str, +) -> (axum::http::StatusCode, serde_json::Value) { + let req = Request::builder() + .method("GET") + .uri(format!("/documents/{id}")) + .body(Body::empty()) + .unwrap(); + + let resp = app.clone().oneshot(req).await.unwrap(); + let status = resp.status(); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + let body = serde_json::from_slice(&bytes).unwrap_or(serde_json::Value::Null); + (status, body) +} + // ── Text extraction ─────────────────────────────────────────────────────────── /// Concatenate all text parts from depth-0 assistant messages in a slice. diff --git a/backend-v2/tests/document_test.rs b/backend-v2/tests/document_test.rs new file mode 100644 index 0000000..7486b64 --- /dev/null +++ b/backend-v2/tests/document_test.rs @@ -0,0 +1,244 @@ +#[path = "common/mod.rs"] +mod common; + +use axum::http::StatusCode; +use common::{ + bulk_purge_documents, get_document, ingest_document, ingest_documents_with_status, + list_documents, make_app_with_repo, make_repo, purge_document, +}; + +#[tokio::test] +async fn list_documents_empty_initially() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let docs = list_documents(&app).await; + assert!(docs.is_empty()); +} + +#[tokio::test] +async fn ingest_and_list_document() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Test Document\n\nThis is test content for indexing."; + let doc = ingest_document(&app, "test.md", content).await; + + assert!(doc.get("id").is_some(), "response should contain id"); + assert!(doc.get("title").is_some(), "response should contain title"); + assert!(doc.get("len").is_some(), "response should contain len"); + + let docs = list_documents(&app).await; + assert_eq!(docs.len(), 1); + assert_eq!(docs[0]["id"], doc["id"]); +} + +#[tokio::test] +async fn get_document_by_id() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Getting by ID\n\nSome content here."; + let created = ingest_document(&app, "get-test.md", content).await; + let id = created["id"].as_str().unwrap(); + + let (status, fetched) = get_document(&app, id).await; + assert_eq!(status, StatusCode::OK); + assert_eq!(fetched["id"].as_str().unwrap(), id); + assert_eq!(fetched["title"].as_str(), created["title"].as_str()); +} + +#[tokio::test] +async fn get_nonexistent_document_returns_404() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let fake_id = uuid::Uuid::new_v4(); + let (status, _) = get_document(&app, &fake_id.to_string()).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn purge_document_removes_it() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# To Be Purged\n\nThis document will be deleted."; + let doc = ingest_document(&app, "purge-me.md", content).await; + let id = doc["id"].as_str().unwrap(); + + let status = purge_document(&app, id).await; + assert_eq!(status, StatusCode::NO_CONTENT); + + let docs = list_documents(&app).await; + assert!(docs.is_empty(), "document list should be empty after purge"); + + let (status, _) = get_document(&app, id).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn purge_nonexistent_returns_404() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let fake_id = uuid::Uuid::new_v4(); + let status = purge_document(&app, &fake_id.to_string()).await; + assert_eq!(status, StatusCode::NOT_FOUND); +} + +#[tokio::test] +async fn ingest_duplicate_returns_same_id() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let content = b"# Duplicate Test\n\nSame content, same ID."; + let doc1 = ingest_document(&app, "dup1.md", content).await; + let doc2 = ingest_document(&app, "dup2.md", content).await; + + assert_eq!( + doc1["id"].as_str().unwrap(), + doc2["id"].as_str().unwrap(), + "same content should produce same UUID (UUIDv5)" + ); + + let docs = list_documents(&app).await; + assert_eq!( + docs.len(), + 1, + "duplicate ingest should not create extra document" + ); +} + +// ── Multi-file ingest tests ────────────────────────────────────────────────── + +#[tokio::test] +async fn ingest_multiple_documents() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[ + ("doc1.md", b"# Document One\n\nFirst document."), + ("doc2.txt", b"# Document Two\n\nSecond document."), + ]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::CREATED); + + let succeeded = batch["succeeded"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2); + assert!(batch["failed"].as_array().unwrap().is_empty()); + + let docs = list_documents(&app).await; + assert_eq!(docs.len(), 2); +} + +#[tokio::test] +async fn ingest_partial_failure_mixed_filetypes() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[ + ("good.md", b"# Good Document\n\nValid markdown."), + ("bad.csv", b"a,b,c\n1,2,3"), + ("also-good.txt", b"# Another Good\n\nAlso valid."), + ]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::OK, "partial success should return 200"); + + let succeeded = batch["succeeded"].as_array().unwrap(); + let failed = batch["failed"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), "bad.csv"); +} + +#[tokio::test] +async fn ingest_all_unsupported_returns_empty_succeeded() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let files: &[(&str, &[u8])] = &[("data.csv", b"a,b,c")]; + + let (status, batch) = ingest_documents_with_status(&app, files).await; + assert_eq!(status, StatusCode::OK); + + let succeeded = batch["succeeded"].as_array().unwrap(); + let failed = batch["failed"].as_array().unwrap(); + assert!(succeeded.is_empty()); + assert_eq!(failed.len(), 1); +} + +#[tokio::test] +async fn ingest_no_file_field_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let boundary = "----testboundary"; + let body = format!("--{boundary}--\r\n"); + + let req = axum::http::Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(axum::body::Body::from(body)) + .unwrap(); + + let resp = tower::ServiceExt::oneshot(app, req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); +} + +// ── Bulk purge tests ───────────────────────────────────────────────────────── + +#[tokio::test] +async fn bulk_purge_multiple_documents() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc1 = ingest_document(&app, "a.md", b"# Doc A\n\nContent A.").await; + let doc2 = ingest_document(&app, "b.md", b"# Doc B\n\nContent B.").await; + let id1 = doc1["id"].as_str().unwrap(); + let id2 = doc2["id"].as_str().unwrap(); + + let (status, resp) = bulk_purge_documents(&app, &[id1, id2]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + assert_eq!(purged.len(), 2); + assert!(resp["failed"].as_array().unwrap().is_empty()); + + let docs = list_documents(&app).await; + assert!(docs.is_empty()); +} + +#[tokio::test] +async fn bulk_purge_partial_failure() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc = ingest_document(&app, "c.md", b"# Doc C\n\nContent C.").await; + let real_id = doc["id"].as_str().unwrap(); + let fake_id = uuid::Uuid::new_v4().to_string(); + + let (status, resp) = bulk_purge_documents(&app, &[real_id, &fake_id]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + let failed = resp["failed"].as_array().unwrap(); + assert_eq!(purged.len(), 1); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), fake_id); +} + +#[tokio::test] +async fn bulk_purge_empty_ids_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let (status, _) = bulk_purge_documents(&app, &[]).await; + assert_eq!(status, StatusCode::BAD_REQUEST); +} diff --git a/backend-v2/tests/e2e_test.rs b/backend-v2/tests/e2e_test.rs index 3a446a6..efde368 100644 --- a/backend-v2/tests/e2e_test.rs +++ b/backend-v2/tests/e2e_test.rs @@ -1,19 +1,92 @@ -#[path = "common/mod.rs"] -mod common; - use std::sync::Arc; use aide::openapi::OpenApi; use ailoy::lang_model::LangModelProvider; +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use http_body_util::BodyExt; +use tower::ServiceExt; use agent_k_backend::repository; use agent_k_backend::router::get_router; use agent_k_backend::state::AppState; use ailoy::agent::default_provider_mut; -use common::{extract_text, post_session, send_message}; -use speedwagon::{FileType, Store, build_tools}; +use speedwagon::{Store, build_tools}; use tokio::sync::RwLock; +fn json_request(method: &str, uri: &str, body: Option<&str>) -> Request { + let builder = Request::builder() + .method(method) + .uri(uri) + .header("content-type", "application/json"); + match body { + Some(b) => builder.body(Body::from(b.to_string())).unwrap(), + None => builder.body(Body::from("{}")).unwrap(), + } +} + +fn multipart_request(files: &[(&str, &[u8])]) -> Request { + let boundary = "----e2e-test-boundary"; + let mut body = Vec::new(); + for (filename, content) in files { + body.extend_from_slice(format!("--{boundary}\r\n").as_bytes()); + body.extend_from_slice( + format!( + "Content-Disposition: form-data; name=\"file\"; filename=\"{filename}\"\r\n\ + Content-Type: application/octet-stream\r\n\r\n" + ) + .as_bytes(), + ); + body.extend_from_slice(content); + body.extend_from_slice(b"\r\n"); + } + body.extend_from_slice(format!("--{boundary}--\r\n").as_bytes()); + + Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(Body::from(body)) + .unwrap() +} + +fn extract_assistant_text(outputs: &serde_json::Value) -> String { + outputs + .as_array() + .map(|arr| { + arr.iter() + .filter_map(|o| { + let depth = o.get("depth").and_then(|d| d.as_u64()).unwrap_or(0); + if depth != 0 { + return None; + } + o.get("message")? + .get("contents")? + .as_array()? + .iter() + .filter_map(|p| p.get("text")?.as_str()) + .map(str::to_string) + .reduce(|a, b| a + &b) + }) + .collect::>() + .join("") + }) + .unwrap_or_default() +} + +fn assert_send_ok(status: StatusCode, body: &[u8]) -> serde_json::Value { + if status != StatusCode::OK { + panic!( + "send_message returned {status}: {}", + String::from_utf8_lossy(body) + ); + } + serde_json::from_slice(body).unwrap() +} + #[tokio::test] #[ignore = "requires OPENAI_API_KEY"] async fn test_ingest_message_purge_cycle() { @@ -21,7 +94,7 @@ async fn test_ingest_message_purge_cycle() { let store_path = std::env::temp_dir().join(format!("speedwagon-e2e-{}", uuid::Uuid::new_v4())); let store = Arc::new(RwLock::new( - Store::new(store_path).expect("test store init"), + Store::new(&store_path).expect("test store init"), )); { @@ -34,58 +107,126 @@ async fn test_ingest_message_purge_cycle() { provider.tools = build_tools(store.clone()); } - let test_content = b"The capital of Freedonia is Glorkville. This is a unique fact."; - let doc_id = store - .write() - .await - .ingest(test_content.iter().copied(), FileType::MD) - .await - .expect("ingest failed"); - let repo = repository::create_repository("sqlite::memory:") .await .expect("test repo init"); - let state = Arc::new(AppState::new(repo, store.clone())); + let state = Arc::new(AppState::new(repo, store)); let app = get_router(state).finish_api(&mut OpenApi::default()); - let session_id = post_session(&app).await; + // ── Ingest two documents via HTTP multipart ────────────────────────────── + let resp = app + .clone() + .oneshot(multipart_request(&[ + ( + "freedonia.md", + b"The capital of Freedonia is Glorkville. This is a unique fact.", + ), + ( + "zorbax.md", + b"The largest ocean on planet Zorbax is the Shimmer Sea. It covers 40% of the surface.", + ), + ])) + .await + .unwrap(); + let status = resp.status(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let batch: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!( + status, + StatusCode::CREATED, + "batch ingest should succeed: {batch}" + ); - let outputs = send_message(&app, session_id, "What is the capital of Freedonia?").await; - let arr = outputs.as_array().expect("response must be an array"); + let succeeded = batch["succeeded"].as_array().unwrap(); + assert_eq!(succeeded.len(), 2, "both documents should ingest"); + let doc_ids: Vec<&str> = succeeded + .iter() + .map(|d| d["id"].as_str().unwrap()) + .collect(); - assert!(!arr.is_empty(), "messages should not be empty"); + // ── Create session ─────────────────────────────────────────────────────── + let resp = app + .clone() + .oneshot(json_request("POST", "/sessions", None)) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::CREATED); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let session: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let session_id = session["id"].as_str().unwrap(); + let msg_uri = format!("/sessions/{session_id}/messages"); - let has_assistant = arr.iter().any(|o| { - o.get("message") - .and_then(|m| m.get("role")) - .and_then(|r| r.as_str()) - == Some("assistant") - }); + // ── Question about document 1 (Freedonia) ──────────────────────────────── + let q1 = serde_json::json!({ "content": "What is the capital of Freedonia?" }).to_string(); + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q1))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let text = extract_assistant_text(&outputs); assert!( - has_assistant, - "should contain at least one assistant message" + text.contains("Glorkville"), + "response should mention 'Glorkville', got: {text}", ); - let text = extract_text(&outputs); - assert!(!text.is_empty(), "assistant text should not be empty"); + // ── Question about document 2 (Zorbax) ─────────────────────────────────── + let q2 = + serde_json::json!({ "content": "What is the largest ocean on planet Zorbax?" }).to_string(); + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q2))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let text = extract_assistant_text(&outputs); assert!( - text.contains("Glorkville"), - "response should mention 'Glorkville' from the ingested document, got: {text}", + text.contains("Shimmer Sea"), + "response should mention 'Shimmer Sea', got: {text}", ); - // Purge the document - store.write().await.purge(doc_id).expect("purge failed"); - - // Send same message after purge - let outputs = send_message( - &app, - session_id, - "What is the capital of Freedonia?", - ) - .await; - let post_purge_text = extract_text(&outputs); + // ── Bulk purge both documents via HTTP ─────────────────────────────────── + let purge_body = serde_json::json!({ "ids": doc_ids }).to_string(); + let resp = app + .clone() + .oneshot(json_request("DELETE", "/documents", Some(&purge_body))) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let purge_resp: serde_json::Value = serde_json::from_slice(&body).unwrap(); + let purged = purge_resp["purged"].as_array().unwrap(); + assert_eq!(purged.len(), 2, "both documents should be purged"); + + // ── Verify documents are gone ──────────────────────────────────────────── + let resp = app + .clone() + .oneshot( + Request::builder() + .method("GET") + .uri("/documents") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let docs: Vec = serde_json::from_slice(&body).unwrap(); + assert!(docs.is_empty(), "document list should be empty after purge"); + + // ── Post-purge question (agent should still respond, just without KB) ──── + let resp = app + .clone() + .oneshot(json_request("POST", &msg_uri, Some(&q1))) + .await + .unwrap(); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let outputs = assert_send_ok(StatusCode::OK, &body); + let post_purge_text = extract_assistant_text(&outputs); assert!( !post_purge_text.is_empty(), - "post-purge response should not be empty" + "post-purge response should not be empty", ); } diff --git a/speedwagon/src/store/mod.rs b/speedwagon/src/store/mod.rs index acc6636..72698f7 100644 --- a/speedwagon/src/store/mod.rs +++ b/speedwagon/src/store/mod.rs @@ -21,6 +21,30 @@ use uuid::Uuid; pub use document::{Document, FindResult}; pub use searcher::{SearchPage, SearchResult}; +#[derive(Debug, Clone)] +pub struct IngestResult { + pub succeeded: Vec, + pub failed: Vec, +} + +#[derive(Debug, Clone)] +pub struct IngestFailure { + pub index: usize, + pub error: String, +} + +#[derive(Debug, Clone)] +pub struct PurgeResult { + pub purged: Vec, + pub failed: Vec, +} + +#[derive(Debug, Clone)] +pub struct PurgeFailure { + pub id: Uuid, + pub error: String, +} + pub type SharedStore = Arc>; /// Speedwagon store layout: @@ -92,64 +116,123 @@ impl Store { Ok(id) } - /// Adds multiple files in one batch: translates each to corpus, resolves titles, then - /// commits them all to the index in a single write. + /// Adds multiple files in one batch with partial-success semantics. + /// Successfully processed files are batched into a single index write. + /// Files that fail at any stage (translation, reading, title extraction) are + /// recorded in `IngestResult::failed` and their intermediate files are cleaned up. pub async fn ingest_many( &mut self, items: impl IntoIterator, FileType)>, - ) -> Result> { - let items = items + ) -> Result { + let items: Vec<(Vec, FileType)> = items .into_iter() .map(|v| (v.0.into_iter().collect::>(), v.1)) - .collect::>(); - let mut all_ids = Vec::with_capacity(items.len()); + .collect(); + + let mut succeeded = Vec::with_capacity(items.len()); + let mut failed = Vec::new(); let mut to_index: Vec<(Uuid, String)> = Vec::new(); // (id, content) - for (bytes, filetype) in &items { + for (idx, (bytes, filetype)) in items.iter().enumerate() { let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); - all_ids.push(id); - let corpus_path = self.root.join("corpus").join(format!("{id}.md")); + if !corpus_path.exists() { - match filetype { - FileType::MD => { - fs::write(&corpus_path, bytes)?; - } + let mut new_origin: Option = None; + + let ok = match filetype { + FileType::MD => fs::write(&corpus_path, bytes).map_err(|e| e.to_string()), _ => { let ext = filetype.to_string(); let origin_path = self.root.join("origin").join(format!("{id}.{ext}")); if !origin_path.exists() { - fs::write(&origin_path, bytes)?; + if let Err(e) = fs::write(&origin_path, bytes) { + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + continue; + } + new_origin = Some(origin_path.clone()); } - translator::translate(&origin_path, &corpus_path)?; + translator::translate(&origin_path, &corpus_path) + .map_err(|e| e.to_string()) } + }; + + if let Err(e) = ok { + let _ = fs::remove_file(&corpus_path); + if let Some(origin) = &new_origin { + let _ = fs::remove_file(origin); + } + failed.push(IngestFailure { + index: idx, + error: e, + }); + continue; } } - if !indexer::document_exists(&self.index, &id.to_string())? { - let content = fs::read_to_string(&corpus_path) - .with_context(|| format!("failed to read corpus: {corpus_path:?}"))?; - to_index.push((id, content)); + match indexer::document_exists(&self.index, &id.to_string()) { + Ok(true) => { + succeeded.push(id); + } + Ok(false) => match fs::read_to_string(&corpus_path) { + Ok(content) => { + to_index.push((id, content)); + } + Err(e) => { + let _ = fs::remove_file(&corpus_path); + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + } + }, + Err(e) => { + failed.push(IngestFailure { + index: idx, + error: e.to_string(), + }); + } } } - if to_index.is_empty() { - return Ok(all_ids); - } - let mut docs: Vec<(String, String, String)> = Vec::with_capacity(to_index.len()); + let mut title_failed_ids: Vec = Vec::new(); for (id, content) in to_index { - let title = parser::get_title(&content).await?; - docs.push((id.to_string(), title, content)); + match parser::get_title(&content).await { + Ok(title) => docs.push((id.to_string(), title, content)), + Err(e) => { + let corpus_path = self.root.join("corpus").join(format!("{id}.md")); + let _ = fs::remove_file(&corpus_path); + title_failed_ids.push(id); + failed.push(IngestFailure { + index: items + .iter() + .position(|(b, _)| Uuid::new_v5(&Uuid::NAMESPACE_OID, b) == id) + .unwrap_or(0), + error: e.to_string(), + }); + } + } } - let refs: Vec<(&str, &str, &str)> = docs - .iter() - .map(|(id, title, content)| (id.as_str(), title.as_str(), content.as_str())) - .collect(); - indexer::add_documents(&self.index, &refs)?; + if !docs.is_empty() { + let refs: Vec<(&str, &str, &str)> = docs + .iter() + .map(|(id, title, content)| (id.as_str(), title.as_str(), content.as_str())) + .collect(); + indexer::add_documents(&self.index, &refs)?; - Ok(all_ids) + for (id_str, _, _) in &docs { + if let Ok(id) = id_str.parse::() { + succeeded.push(id); + } + } + } + + Ok(IngestResult { succeeded, failed }) } /// Removes a document from the index and deletes its origin and corpus files. @@ -179,6 +262,28 @@ impl Store { Ok(doc) } + /// Removes multiple documents with partial-success semantics. + pub fn purge_many(&mut self, ids: impl IntoIterator) -> PurgeResult { + let mut purged = Vec::new(); + let mut failed = Vec::new(); + + for id in ids { + match self.purge(id) { + Ok(Some(_)) => purged.push(id), + Ok(None) => failed.push(PurgeFailure { + id, + error: "document not found".into(), + }), + Err(e) => failed.push(PurgeFailure { + id, + error: e.to_string(), + }), + } + } + + PurgeResult { purged, failed } + } + /// Get # of documents pub fn count(&self) -> u32 { indexer::num_documents(&self.index).unwrap_or(0) as u32 diff --git a/speedwagon/src/store/preset.rs b/speedwagon/src/store/preset.rs index 13cd4b4..3259f5e 100644 --- a/speedwagon/src/store/preset.rs +++ b/speedwagon/src/store/preset.rs @@ -33,8 +33,12 @@ pub async fn setup_docset(store: &mut Store, preset: &PresetKind) -> Result<()> } println!("Ingesting corpus FinanceBench..."); - let ids = store.ingest_many(items).await?; - println!("Done. {} documents ingested.", ids.len()); + let result = store.ingest_many(items).await?; + println!( + "Done. {} documents ingested, {} failed.", + result.succeeded.len(), + result.failed.len(), + ); } } Ok(()) From 8bb858ce882c129afabde86694e6759cbabb7dce Mon Sep 17 00:00:00 2001 From: ljhh-0611 Date: Thu, 7 May 2026 15:32:15 +0900 Subject: [PATCH 2/4] Preserve batch-ingest failure evidence while reducing helper duplication Keep the document batch API behavior unchanged while removing a silent fallback in failure indexing and trimming duplicated test HTTP setup. Constraint: Cleanup is scoped to PR #57 / a69222c document-batch changes only.\nRejected: Rewriting broader Speedwagon parser/tool clippy warnings | outside the requested commit scope.\nConfidence: high\nScope-risk: narrow\nDirective: Keep ingest_many response semantics provisional until the Store contract is hardened.\nTested: cargo fmt --check -p agent-k-backend -p speedwagon; cargo check -p agent-k-backend; cargo test -p agent-k-backend --test document_test; cargo test -p speedwagon --no-default-features --lib; cargo clippy -p agent-k-backend --tests\nNot-tested: live ignored e2e RAG test requiring OPENAI_API_KEY --- backend-v2/tests/common/mod.rs | 28 ++++++++++------------------ speedwagon/src/store/mod.rs | 34 ++++++++++++++++++---------------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/backend-v2/tests/common/mod.rs b/backend-v2/tests/common/mod.rs index fbd1205..89e046a 100644 --- a/backend-v2/tests/common/mod.rs +++ b/backend-v2/tests/common/mod.rs @@ -48,10 +48,9 @@ pub async fn make_repo() -> repository::AppRepository { /// Create a SharedStore + ToolSet backed by a temporary directory. pub fn make_test_store() -> speedwagon::SharedStore { let store_path = std::env::temp_dir().join(format!("speedwagon-test-{}", uuid::Uuid::new_v4())); - let store = Arc::new(RwLock::new( + Arc::new(RwLock::new( Store::new(&store_path).expect("test store init"), - )); - store + )) } /// Build an app from an already-constructed repository. @@ -318,27 +317,20 @@ pub async fn ingest_document( /// Ingest multiple files and return the full BatchIngestResponse. pub async fn ingest_documents(app: &axum::Router, files: &[(&str, &[u8])]) -> serde_json::Value { - let (boundary, body) = build_multipart_body(files); - - let req = Request::builder() - .method("POST") - .uri("/documents") - .header( - "content-type", - format!("multipart/form-data; boundary={boundary}"), - ) - .body(Body::from(body)) - .unwrap(); - - let resp = app.clone().oneshot(req).await.unwrap(); - let bytes = resp.into_body().collect().await.unwrap().to_bytes(); - serde_json::from_slice(&bytes).unwrap() + post_documents(app, files).await.1 } /// Ingest files and also return the HTTP status code. pub async fn ingest_documents_with_status( app: &axum::Router, files: &[(&str, &[u8])], +) -> (axum::http::StatusCode, serde_json::Value) { + post_documents(app, files).await +} + +async fn post_documents( + app: &axum::Router, + files: &[(&str, &[u8])], ) -> (axum::http::StatusCode, serde_json::Value) { let (boundary, body) = build_multipart_body(files); diff --git a/speedwagon/src/store/mod.rs b/speedwagon/src/store/mod.rs index 72698f7..56e7c0d 100644 --- a/speedwagon/src/store/mod.rs +++ b/speedwagon/src/store/mod.rs @@ -7,7 +7,7 @@ mod searcher; mod translator; use std::{ - fs, + fs, io, path::{Path, PathBuf}, sync::Arc, }; @@ -47,6 +47,14 @@ pub struct PurgeFailure { pub type SharedStore = Arc>; +fn remove_ingest_artifact(path: &Path) { + match fs::remove_file(path) { + Ok(()) => {} + Err(e) if e.kind() == io::ErrorKind::NotFound => {} + Err(e) => log::warn!("failed to clean up ingest artifact {:?}: {e}", path), + } +} + /// Speedwagon store layout: /// /// ```text @@ -131,7 +139,7 @@ impl Store { let mut succeeded = Vec::with_capacity(items.len()); let mut failed = Vec::new(); - let mut to_index: Vec<(Uuid, String)> = Vec::new(); // (id, content) + let mut to_index: Vec<(usize, Uuid, String)> = Vec::new(); // (input index, id, content) for (idx, (bytes, filetype)) in items.iter().enumerate() { let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); @@ -155,15 +163,14 @@ impl Store { } new_origin = Some(origin_path.clone()); } - translator::translate(&origin_path, &corpus_path) - .map_err(|e| e.to_string()) + translator::translate(&origin_path, &corpus_path).map_err(|e| e.to_string()) } }; if let Err(e) = ok { - let _ = fs::remove_file(&corpus_path); + remove_ingest_artifact(&corpus_path); if let Some(origin) = &new_origin { - let _ = fs::remove_file(origin); + remove_ingest_artifact(origin); } failed.push(IngestFailure { index: idx, @@ -179,10 +186,10 @@ impl Store { } Ok(false) => match fs::read_to_string(&corpus_path) { Ok(content) => { - to_index.push((id, content)); + to_index.push((idx, id, content)); } Err(e) => { - let _ = fs::remove_file(&corpus_path); + remove_ingest_artifact(&corpus_path); failed.push(IngestFailure { index: idx, error: e.to_string(), @@ -199,19 +206,14 @@ impl Store { } let mut docs: Vec<(String, String, String)> = Vec::with_capacity(to_index.len()); - let mut title_failed_ids: Vec = Vec::new(); - for (id, content) in to_index { + for (idx, id, content) in to_index { match parser::get_title(&content).await { Ok(title) => docs.push((id.to_string(), title, content)), Err(e) => { let corpus_path = self.root.join("corpus").join(format!("{id}.md")); - let _ = fs::remove_file(&corpus_path); - title_failed_ids.push(id); + remove_ingest_artifact(&corpus_path); failed.push(IngestFailure { - index: items - .iter() - .position(|(b, _)| Uuid::new_v5(&Uuid::NAMESPACE_OID, b) == id) - .unwrap_or(0), + index: idx, error: e.to_string(), }); } From d74d1e79b7119e7a5e2c79c13a9acb291e3cd121 Mon Sep 17 00:00:00 2001 From: khj809 Date: Thu, 7 May 2026 21:19:30 +0900 Subject: [PATCH 3/4] use aide::axum::routing --- backend-v2/src/handlers/document.rs | 6 ++++-- backend-v2/src/router.rs | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/backend-v2/src/handlers/document.rs b/backend-v2/src/handlers/document.rs index 1fdddc1..3a00886 100644 --- a/backend-v2/src/handlers/document.rs +++ b/backend-v2/src/handlers/document.rs @@ -1,10 +1,12 @@ use std::sync::Arc; +use aide::NoApi; use axum::{ Json, extract::{Multipart, Path, Query, State}, http::StatusCode, }; +use schemars::JsonSchema; use serde::Deserialize; use speedwagon::FileType; use uuid::Uuid; @@ -17,7 +19,7 @@ use crate::{ state::AppState, }; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, JsonSchema)] pub struct ListDocumentsQuery { #[serde(default)] pub page: Option, @@ -64,7 +66,7 @@ fn parse_filetype(filename: &str) -> Result { pub async fn ingest_document( State(state): State>, - mut multipart: Multipart, + NoApi(mut multipart): NoApi, ) -> ApiResult<(StatusCode, Json)> { let mut valid_items: Vec<(Vec, FileType)> = Vec::new(); let mut filenames: Vec = Vec::new(); diff --git a/backend-v2/src/router.rs b/backend-v2/src/router.rs index 9be1e85..cd32a1c 100644 --- a/backend-v2/src/router.rs +++ b/backend-v2/src/router.rs @@ -55,15 +55,15 @@ pub fn get_router(state: Arc) -> ApiRouter { ); let document_routes = ApiRouter::new() - .route( + .api_route( "/documents", - axum::routing::get(handlers::list_documents) + get(handlers::list_documents) .post(handlers::ingest_document) .delete(handlers::purge_documents), ) - .route( + .api_route( "/documents/{id}", - axum::routing::get(handlers::get_document).delete(handlers::purge_document), + get(handlers::get_document).delete(handlers::purge_document), ); ApiRouter::new() From 2704fb04271e58edd81e8c8d246b1d9432275c55 Mon Sep 17 00:00:00 2001 From: ljhh-0611 Date: Fri, 8 May 2026 14:24:40 +0900 Subject: [PATCH 4/4] Keep document batch failures item-scoped Review feedback showed batch ingest and purge paths could hide item-level failures or over-clean existing artifacts. This keeps API ids string-shaped at the boundary while parsing per item before store operations. Constraint: PR #57 review requested String id consistency and explicit multipart/corpus failure handling Rejected: Converting speedwagon document ids to Uuid | would broaden index/tool/CLI scope beyond PR Confidence: high Scope-risk: narrow Directive: Keep speedwagon index/tool IDs string-shaped unless a broader migration is planned Tested: cargo fmt --check -p speedwagon -p agent-k-backend; cargo test -p speedwagon --lib; cargo test -p agent-k-backend --test document_test; cargo check -p speedwagon -p agent-k-backend; git diff --check Not-tested: clippy -D warnings; blocked by preexisting warnings outside this change --- backend-v2/src/handlers/document.rs | 34 ++++++++++----- backend-v2/src/model/document.rs | 3 +- backend-v2/tests/document_test.rs | 68 +++++++++++++++++++++++++++++ speedwagon/src/store/mod.rs | 45 ++++++++++++++++--- 4 files changed, 130 insertions(+), 20 deletions(-) diff --git a/backend-v2/src/handlers/document.rs b/backend-v2/src/handlers/document.rs index 3a00886..08b3d2d 100644 --- a/backend-v2/src/handlers/document.rs +++ b/backend-v2/src/handlers/document.rs @@ -72,7 +72,11 @@ pub async fn ingest_document( let mut filenames: Vec = Vec::new(); let mut failed: Vec = Vec::new(); - while let Ok(Some(field)) = multipart.next_field().await { + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| AppError::bad_request(format!("multipart error: {e}")))? + { if field.name() != Some("file") { continue; } @@ -80,7 +84,7 @@ pub async fn ingest_document( let bytes = field .bytes() .await - .map_err(|e| AppError::internal(format!("failed to read upload: {e}")))?; + .map_err(|e| AppError::bad_request(format!("multipart error: {e}")))?; match parse_filetype(&filename) { Ok(filetype) => { @@ -167,19 +171,27 @@ pub async fn purge_documents( )); } + let mut ids = Vec::with_capacity(payload.ids.len()); + let mut failed = Vec::new(); + for raw_id in payload.ids { + match Uuid::parse_str(&raw_id) { + Ok(id) => ids.push(id), + Err(_) => failed.push(FailedItem { + name: raw_id, + error: "invalid document id".into(), + }), + } + } + let mut store = state.store.write().await; - let result = store.purge_many(payload.ids); + let result = store.purge_many(ids); drop(store); let purged: Vec = result.purged.iter().map(|id| id.to_string()).collect(); - let failed: Vec = result - .failed - .into_iter() - .map(|f| FailedItem { - name: f.id.to_string(), - error: f.error, - }) - .collect(); + failed.extend(result.failed.into_iter().map(|f| FailedItem { + name: f.id.to_string(), + error: f.error, + })); for id in &purged { tracing::info!(%id, "document purged"); diff --git a/backend-v2/src/model/document.rs b/backend-v2/src/model/document.rs index b0b7f52..48cf107 100644 --- a/backend-v2/src/model/document.rs +++ b/backend-v2/src/model/document.rs @@ -1,7 +1,6 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use speedwagon::Document; -use uuid::Uuid; #[derive(Clone, Debug, Serialize, JsonSchema)] pub struct DocumentResponse { @@ -55,5 +54,5 @@ pub struct BatchPurgeResponse { #[derive(Clone, Debug, Deserialize, JsonSchema)] #[serde(deny_unknown_fields)] pub struct BulkPurgeRequest { - pub ids: Vec, + pub ids: Vec, } diff --git a/backend-v2/tests/document_test.rs b/backend-v2/tests/document_test.rs index 7486b64..71ae245 100644 --- a/backend-v2/tests/document_test.rs +++ b/backend-v2/tests/document_test.rs @@ -6,6 +6,8 @@ use common::{ bulk_purge_documents, get_document, ingest_document, ingest_documents_with_status, list_documents, make_app_with_repo, make_repo, purge_document, }; +use http_body_util::BodyExt; +use tower::ServiceExt; #[tokio::test] async fn list_documents_empty_initially() { @@ -192,6 +194,46 @@ async fn ingest_no_file_field_returns_400() { assert_eq!(resp.status(), StatusCode::BAD_REQUEST); } +#[tokio::test] +async fn ingest_malformed_multipart_after_valid_file_returns_400() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let boundary = "----testboundary"; + let body = format!( + "--{boundary}\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"good.md\"\r\n\ + Content-Type: text/markdown\r\n\r\n\ + # Good Document\r\n\ + --{boundary}\r\n\ + Content-Disposition: form-data; name=\"file\"; filename=\"bad.md\"\r\n\ + Content-Type: text/markdown\r\n\r\n\ + # Missing closing boundary" + ); + + let req = axum::http::Request::builder() + .method("POST") + .uri("/documents") + .header( + "content-type", + format!("multipart/form-data; boundary={boundary}"), + ) + .body(axum::body::Body::from(body)) + .unwrap(); + + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let bytes = resp.into_body().collect().await.unwrap().to_bytes(); + let body: serde_json::Value = serde_json::from_slice(&bytes).unwrap(); + assert!( + body["error"] + .as_str() + .unwrap_or_default() + .contains("multipart error"), + "unexpected error body: {body}" + ); +} + // ── Bulk purge tests ───────────────────────────────────────────────────────── #[tokio::test] @@ -234,6 +276,32 @@ async fn bulk_purge_partial_failure() { assert_eq!(failed[0]["name"].as_str().unwrap(), fake_id); } +#[tokio::test] +async fn bulk_purge_invalid_id_is_item_failure() { + let repo = make_repo().await; + let app = make_app_with_repo(repo); + + let doc = ingest_document(&app, "valid.md", b"# Valid\n\nContent.").await; + let real_id = doc["id"].as_str().unwrap(); + let invalid_id = "not-a-uuid"; + + let (status, resp) = bulk_purge_documents(&app, &[real_id, invalid_id]).await; + assert_eq!(status, StatusCode::OK); + + let purged = resp["purged"].as_array().unwrap(); + let failed = resp["failed"].as_array().unwrap(); + assert_eq!(purged.len(), 1); + assert_eq!(purged[0].as_str().unwrap(), real_id); + assert_eq!(failed.len(), 1); + assert_eq!(failed[0]["name"].as_str().unwrap(), invalid_id); + assert!( + failed[0]["error"] + .as_str() + .unwrap_or_default() + .contains("invalid document id") + ); +} + #[tokio::test] async fn bulk_purge_empty_ids_returns_400() { let repo = make_repo().await; diff --git a/speedwagon/src/store/mod.rs b/speedwagon/src/store/mod.rs index 56e7c0d..80b3cab 100644 --- a/speedwagon/src/store/mod.rs +++ b/speedwagon/src/store/mod.rs @@ -139,13 +139,14 @@ impl Store { let mut succeeded = Vec::with_capacity(items.len()); let mut failed = Vec::new(); - let mut to_index: Vec<(usize, Uuid, String)> = Vec::new(); // (input index, id, content) + let mut to_index: Vec<(usize, Uuid, String, bool)> = Vec::new(); // (input index, id, content, new_corpus) for (idx, (bytes, filetype)) in items.iter().enumerate() { let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); let corpus_path = self.root.join("corpus").join(format!("{id}.md")); + let new_corpus = !corpus_path.exists(); - if !corpus_path.exists() { + if new_corpus { let mut new_origin: Option = None; let ok = match filetype { @@ -168,7 +169,9 @@ impl Store { }; if let Err(e) = ok { - remove_ingest_artifact(&corpus_path); + if new_corpus { + remove_ingest_artifact(&corpus_path); + } if let Some(origin) = &new_origin { remove_ingest_artifact(origin); } @@ -186,10 +189,12 @@ impl Store { } Ok(false) => match fs::read_to_string(&corpus_path) { Ok(content) => { - to_index.push((idx, id, content)); + to_index.push((idx, id, content, new_corpus)); } Err(e) => { - remove_ingest_artifact(&corpus_path); + if new_corpus { + remove_ingest_artifact(&corpus_path); + } failed.push(IngestFailure { index: idx, error: e.to_string(), @@ -206,12 +211,14 @@ impl Store { } let mut docs: Vec<(String, String, String)> = Vec::with_capacity(to_index.len()); - for (idx, id, content) in to_index { + for (idx, id, content, new_corpus) in to_index { match parser::get_title(&content).await { Ok(title) => docs.push((id.to_string(), title, content)), Err(e) => { let corpus_path = self.root.join("corpus").join(format!("{id}.md")); - remove_ingest_artifact(&corpus_path); + if new_corpus { + remove_ingest_artifact(&corpus_path); + } failed.push(IngestFailure { index: idx, error: e.to_string(), @@ -361,6 +368,30 @@ mod tests { use super::*; + #[tokio::test] + async fn ingest_many_preserves_existing_corpus_when_corpus_read_fails() { + let tempdir = tempfile::tempdir().expect("failed to create tempdir"); + let mut store = Store::new(tempdir.path()).expect("failed to create store"); + + let bytes = b"same input bytes"; + let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, bytes); + let corpus_path = tempdir.path().join("corpus").join(format!("{id}.md")); + let invalid_utf8 = [0xff, 0xfe, 0xfd]; + fs::write(&corpus_path, invalid_utf8).expect("failed to seed existing corpus"); + + let result = store + .ingest_many([(bytes.to_vec(), FileType::MD)]) + .await + .expect("ingest_many should report per-item failure"); + + assert!(result.succeeded.is_empty()); + assert_eq!(result.failed.len(), 1); + assert_eq!( + fs::read(&corpus_path).expect("existing corpus should remain"), + invalid_utf8 + ); + } + #[tokio::test] #[ignore = "requires network access & docling"] async fn test_ingest_financebench_samples() {