diff --git a/Cargo.toml b/Cargo.toml index c92c3ff..0326684 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "src/sacp-test", "src/yopo", "src/sacp-trace-viewer", + "src/decaf", ] resolver = "2" diff --git a/src/decaf/Cargo.toml b/src/decaf/Cargo.toml new file mode 100644 index 0000000..dddd10d --- /dev/null +++ b/src/decaf/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "decaf" +version = "1.0.0-alpha.1" +edition = "2024" +description = "Debouncing proxy for ACP - coalesces agent message chunks" +license = "MIT OR Apache-2.0" +repository = "https://github.com/symposium-dev/symposium-acp" +keywords = ["acp", "agent", "proxy", "debounce"] +categories = ["development-tools"] + +[dependencies] +sacp = { version = "11.0.0-alpha.1", path = "../sacp" } +tokio.workspace = true +tracing.workspace = true + +[dev-dependencies] +futures.workspace = true +sacp-conductor = { path = "../sacp-conductor" } +sacp-test = { path = "../sacp-test" } +tokio-util.workspace = true +tracing-subscriber.workspace = true diff --git a/src/decaf/src/lib.rs b/src/decaf/src/lib.rs new file mode 100644 index 0000000..87279da --- /dev/null +++ b/src/decaf/src/lib.rs @@ -0,0 +1,198 @@ +//! Debouncing proxy for ACP. +//! +//! Agents often send `AgentMessageChunk` notifications word-by-word, +//! creating a flood of tiny messages. Decaf coalesces these chunks, +//! forwarding a single combined chunk every N milliseconds instead. +//! +//! # Usage +//! +//! ```no_run +//! # use decaf::Decaf; +//! # use sacp::{Proxy, ConnectTo}; +//! # use std::time::Duration; +//! # async fn example(transport: impl ConnectTo + 'static) -> Result<(), sacp::Error> { +//! Decaf::new(Duration::from_millis(100)) +//! .run(transport) +//! .await?; +//! # Ok(()) +//! # } +//! ``` + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use sacp::schema::{ + ContentBlock, ContentChunk, PromptRequest, SessionId, SessionNotification, SessionUpdate, +}; +use sacp::util::MatchDispatch; +use sacp::{Agent, Client, Conductor, ConnectTo, Dispatch, Proxy}; +use tokio::sync::Mutex; + +/// A debouncing proxy that coalesces `AgentMessageChunk` notifications. +/// +/// Instead of forwarding every individual chunk, Decaf buffers text +/// and flushes it at a configurable interval. +pub struct Decaf { + interval: Duration, +} + +struct BufferedSession { + /// Accumulated text chunks. + text: String, + + /// The most recent notification, used as a template when flushing + /// (preserves session_id, meta, annotations, etc). + template: SessionNotification, +} + +type State = Arc>>; + +impl Decaf { + pub fn new(interval: Duration) -> Self { + Decaf { interval } + } + + pub async fn run(self, transport: impl ConnectTo + 'static) -> Result<(), sacp::Error> { + let state: State = Arc::new(Mutex::new(HashMap::new())); + let interval = self.interval; + + Proxy + .builder() + .name("decaf") + .on_receive_dispatch_from( + Agent, + { + let state = state.clone(); + async move |dispatch: Dispatch, cx| { + MatchDispatch::new(dispatch) + .if_notification(async |notification: SessionNotification| { + let is_text_chunk = matches!( + ¬ification.update, + SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(_), + .. + }) + ); + + if is_text_chunk { + // Buffer the text chunk + let mut sessions = state.lock().await; + let text = match ¬ification.update { + SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(tc), + .. + }) => tc.text.clone(), + _ => unreachable!(), + }; + + match sessions.get_mut(¬ification.session_id) { + Some(buffered) => { + buffered.text.push_str(&text); + buffered.template = notification; + } + None => { + sessions.insert( + notification.session_id.clone(), + BufferedSession { + text, + template: notification, + }, + ); + } + } + } else { + // Non-chunk message: flush buffer first, then forward + flush_session(&state, ¬ification.session_id, &cx).await?; + cx.send_notification_to(Client, notification)?; + } + + Ok(()) + }) + .await + .if_response_to::(async |result, router| { + // Flush any remaining buffered text before + // the prompt response reaches the client. + flush_all(&state, &cx).await?; + router.respond_with_result(result) + }) + .await + .done() + } + }, + sacp::on_receive_dispatch!(), + ) + .with_spawned({ + let state = state.clone(); + move |cx| async move { + let mut ticker = tokio::time::interval(interval); + loop { + ticker.tick().await; + flush_all(&state, &cx).await?; + } + } + }) + .connect_to(transport) + .await + } +} + +impl ConnectTo for Decaf { + async fn connect_to(self, transport: impl ConnectTo) -> Result<(), sacp::Error> { + self.run(transport).await + } +} + +/// Flush a single session's buffer, sending a coalesced chunk to the client. +async fn flush_session( + state: &State, + session_id: &SessionId, + cx: &sacp::ConnectionTo, +) -> Result<(), sacp::Error> { + let flushed = { + let mut sessions = state.lock().await; + match sessions.get_mut(session_id) { + Some(buffered) if !buffered.text.is_empty() => { + let text = std::mem::take(&mut buffered.text); + let mut notification = buffered.template.clone(); + + // Replace the text content with the coalesced text + if let SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(tc), + .. + }) = &mut notification.update + { + tc.text = text; + } + + Some(notification) + } + _ => None, + } + }; + + if let Some(notification) = flushed { + cx.send_notification_to(Client, notification)?; + } + + Ok(()) +} + +/// Flush all sessions that have buffered data. +async fn flush_all(state: &State, cx: &sacp::ConnectionTo) -> Result<(), sacp::Error> { + // Collect session IDs that need flushing while holding the lock briefly + let session_ids: Vec = { + let sessions = state.lock().await; + sessions + .iter() + .filter(|(_, b)| !b.text.is_empty()) + .map(|(id, _)| id.clone()) + .collect() + }; + + for session_id in session_ids { + flush_session(state, &session_id, cx).await?; + } + + Ok(()) +} diff --git a/src/decaf/tests/debounce.rs b/src/decaf/tests/debounce.rs new file mode 100644 index 0000000..d99a0a7 --- /dev/null +++ b/src/decaf/tests/debounce.rs @@ -0,0 +1,240 @@ +//! Integration test for the decaf debouncing proxy. +//! +//! Creates a fast word-dumping agent that sends 20 words as individual +//! `AgentMessageChunk` notifications with no delay, runs them through +//! decaf, and verifies the client receives fewer (coalesced) notifications +//! containing all the original text. + +use std::path::PathBuf; +use std::time::Duration; + +use decaf::Decaf; +use futures::{SinkExt, StreamExt, channel::mpsc}; +use sacp::schema::{ + AgentCapabilities, ContentBlock, ContentChunk, InitializeRequest, InitializeResponse, + NewSessionRequest, NewSessionResponse, PromptRequest, PromptResponse, ProtocolVersion, + SessionId, SessionNotification, SessionUpdate, StopReason, TextContent, +}; +use sacp::{Agent, Client, ConnectTo, ConnectionTo, Responder}; +use sacp_conductor::{ConductorImpl, ProxiesAndAgent}; +use tokio::io::duplex; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +const WORDS: &[&str] = &[ + "The ", + "quick ", + "brown ", + "fox ", + "jumps ", + "over ", + "the ", + "lazy ", + "dog. ", + "Pack ", + "my ", + "box ", + "with ", + "five ", + "dozen ", + "liquor ", + "jugs. ", + "How ", + "vexingly ", + "quick ", +]; + +// --------------------------------------------------------------------------- +// FastWordAgent — sends each word as an individual AgentMessageChunk +// --------------------------------------------------------------------------- + +#[derive(Clone)] +struct FastWordAgent; + +impl ConnectTo for FastWordAgent { + async fn connect_to(self, client: impl ConnectTo) -> Result<(), sacp::Error> { + Agent + .builder() + .name("fast-word-agent") + .on_receive_request( + async |init: InitializeRequest, responder: Responder, _cx| { + responder.respond( + InitializeResponse::new(init.protocol_version) + .agent_capabilities(AgentCapabilities::new()), + ) + }, + sacp::on_receive_request!(), + ) + .on_receive_request( + async |_req: NewSessionRequest, responder: Responder, _cx| { + responder.respond(NewSessionResponse::new(SessionId::new("test-session-1"))) + }, + sacp::on_receive_request!(), + ) + .on_receive_request( + async |request: PromptRequest, + responder: Responder, + cx: ConnectionTo| { + let cx2 = cx.clone(); + cx.spawn(async move { + let session_id = request.session_id.clone(); + + // Dump all words as fast as possible — no delay + for word in WORDS { + cx2.send_notification(SessionNotification::new( + session_id.clone(), + SessionUpdate::AgentMessageChunk(ContentChunk::new( + ContentBlock::Text(TextContent::new(word.to_string())), + )), + ))?; + } + + responder.respond(PromptResponse::new(StopReason::EndTurn)) + }) + }, + sacp::on_receive_request!(), + ) + .connect_to(client) + .await + } +} + +// --------------------------------------------------------------------------- +// Test helper +// --------------------------------------------------------------------------- + +async fn recv( + response: sacp::SentRequest, +) -> Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + response.on_receiving_result(async move |result| { + tx.send(result).map_err(|_| sacp::Error::internal_error()) + })?; + rx.await.map_err(|_| sacp::Error::internal_error())? +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +/// Verifies that decaf coalesces rapid word-by-word chunks into fewer +/// notifications while preserving all the text content. +#[tokio::test] +async fn test_decaf_coalesces_chunks() -> Result<(), sacp::Error> { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_test_writer() + .try_init(); + + // Channel to collect notifications arriving at the client + let (notif_tx, mut notif_rx) = mpsc::unbounded::(); + + let (client_write, conductor_read) = duplex(8192); + let (conductor_write, client_read) = duplex(8192); + + // Spawn conductor: FastWordAgent -> Decaf (100ms) -> client + let conductor_handle = tokio::spawn(async move { + ConductorImpl::new_agent( + "decaf-test-conductor".to_string(), + ProxiesAndAgent::new(FastWordAgent).proxy(Decaf::new(Duration::from_millis(100))), + Default::default(), + ) + .run(sacp::ByteStreams::new( + conductor_write.compat_write(), + conductor_read.compat(), + )) + .await + }); + + // Run the client, capturing every SessionNotification + let result = tokio::time::timeout(Duration::from_secs(10), async { + sacp::Client + .builder() + .name("decaf-test-client") + .on_receive_notification( + { + let mut notif_tx = notif_tx.clone(); + async move |notification: SessionNotification, + _cx: sacp::ConnectionTo| { + notif_tx + .send(notification) + .await + .map_err(|_| sacp::Error::internal_error()) + } + }, + sacp::on_receive_notification!(), + ) + .connect_with( + sacp::ByteStreams::new(client_write.compat_write(), client_read.compat()), + async |cx| { + recv(cx.send_request(InitializeRequest::new(ProtocolVersion::LATEST))).await?; + + let session = + recv(cx.send_request(NewSessionRequest::new(PathBuf::from("/")))).await?; + + let _prompt_response = recv(cx.send_request(PromptRequest::new( + session.session_id, + vec![ContentBlock::Text(TextContent::new("go".to_string()))], + ))) + .await?; + + Ok(()) + }, + ) + .await + }) + .await + .expect("Test timed out"); + + conductor_handle.abort(); + result?; + + // Collect all captured notifications + drop(notif_tx); + let mut notifications = Vec::new(); + while let Some(n) = notif_rx.next().await { + notifications.push(n); + } + + // Extract text from each notification + let mut texts: Vec = Vec::new(); + for notif in ¬ifications { + if let SessionUpdate::AgentMessageChunk(ContentChunk { + content: ContentBlock::Text(tc), + .. + }) = ¬if.update + { + texts.push(tc.text.clone()); + } + } + + let all_text: String = texts.concat(); + let expected: String = WORDS.concat(); + + // All text must arrive intact + assert_eq!( + all_text, expected, + "Debounced text should contain all original words" + ); + + // Debouncing should coalesce — fewer notifications than words sent. + // The agent sends 20 words with no delay, so they should all land + // within a single 100ms tick (or at most a few ticks). + assert!( + texts.len() < WORDS.len(), + "Expected fewer notifications ({}) than words sent ({}), \ + meaning debouncing coalesced chunks. Individual chunks: {:?}", + texts.len(), + WORDS.len(), + texts, + ); + + tracing::info!( + words_sent = WORDS.len(), + notifications_received = texts.len(), + "Debouncing verified: {} words coalesced into {} notifications", + WORDS.len(), + texts.len(), + ); + + Ok(()) +}