Skip to content

Commit f2ef11e

Browse files
nficanoclaude
andcommitted
feat(execution): implement job.result_chunk event kind per ARCP v1.1 §8.4
Adds a new top-level `job.result_chunk` message variant carrying an ordered fragment of a streamed final result (`result_id`, `chunk_seq`, `data`, `encoding`, `more`). Per §8.4 the terminating `job.completed` references the streamed result by `result_id` rather than carrying it inline; the rust-sdk surfaces `result_id` / `result_size` / `summary` as new fields on `JobCompletedPayload`. Implementation: - New `JobResultChunkPayload` and `ResultChunkEncoding` (`utf8` / `base64`) types in `messages::execution`. A minimal in-crate base64 decoder avoids adding a runtime dependency. - `ResultChunkAssembler` helper accumulates chunks in order, validating monotonic `chunk_seq`, consistent `result_id` and `encoding`, and surfaces a clean `Result<Vec<u8>, ResultChunkError>` on `finish` (or `String` via `finish_utf8`). - `ToolContext::emit_result_chunk` emits one chunk envelope from inside a tool handler. - An agent signals "I streamed my result" by returning a sentinel shape `{ "$arcp_streamed_result": { result_id, result_size?, summary? } }`; the runtime promotes the inner fields onto `job.completed` and drops `value`, matching §8.4's "do not mix inline and chunks" rule. - `is_forwardable_job_event` (the §7.6 forwarder filter) is extended to forward `job.result_chunk` to cross-session subscribers. Closes #35 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ecb8a94 commit f2ef11e

7 files changed

Lines changed: 835 additions & 11 deletions

File tree

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ path = "examples/agent_versions/main.rs"
142142
name = "job_subscribe"
143143
path = "examples/job_subscribe/main.rs"
144144

145+
[[example]]
146+
name = "result_chunk"
147+
path = "examples/result_chunk/main.rs"
148+
145149
[lints.rust]
146150
unsafe_code = "deny"
147151
missing_docs = "deny"

examples/result_chunk/main.rs

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
//! ARCP v1.1 §8.4 — `job.result_chunk` streamed-result demo.
2+
//!
3+
//! Hosts a `report-builder` agent that emits the final result as a
4+
//! sequence of `job.result_chunk` events, terminated by a `job.completed`
5+
//! that references the streamed `result_id`. The client uses
6+
//! `ResultChunkAssembler` to reassemble the chunks into the original
7+
//! payload.
8+
//!
9+
//! Run with:
10+
//! `cargo run --example result_chunk`
11+
12+
#![allow(clippy::similar_names, clippy::expect_used, clippy::print_stdout)]
13+
14+
use std::sync::Arc;
15+
use std::time::Duration;
16+
17+
use arcp::auth::BearerAuthenticator;
18+
use arcp::envelope::Envelope;
19+
use arcp::error::ARCPError;
20+
use arcp::messages::{
21+
AuthScheme, Capabilities, ClientIdentity, Credentials, MessageType, ResultChunkAssembler,
22+
ResultChunkEncoding, SessionOpenPayload, ToolInvokePayload,
23+
};
24+
use arcp::runtime::context::ToolContext;
25+
use arcp::runtime::server::STREAMED_RESULT_SENTINEL;
26+
use arcp::runtime::tools::{ToolHandler, ToolRegistryBuilder};
27+
use arcp::runtime::ARCPRuntime;
28+
use arcp::transport::{paired, Transport};
29+
use async_trait::async_trait;
30+
31+
struct ReportBuilder;
32+
33+
#[async_trait]
34+
impl ToolHandler for ReportBuilder {
35+
fn name(&self) -> &'static str {
36+
"report-builder"
37+
}
38+
39+
async fn invoke(
40+
&self,
41+
arguments: serde_json::Value,
42+
ctx: ToolContext,
43+
) -> Result<serde_json::Value, ARCPError> {
44+
let total = arguments
45+
.get("chunks")
46+
.and_then(serde_json::Value::as_u64)
47+
.unwrap_or(8);
48+
let result_id = format!("res_{}", ctx.job_id().as_str());
49+
let mut bytes: u64 = 0;
50+
for i in 0..total {
51+
let more = i + 1 < total;
52+
let fragment = format!("Section {}: lorem ipsum dolor sit amet\n", i + 1);
53+
bytes += fragment.len() as u64;
54+
ctx.emit_result_chunk(&result_id, i, fragment, ResultChunkEncoding::Utf8, more)
55+
.await?;
56+
tokio::time::sleep(Duration::from_millis(10)).await;
57+
}
58+
Ok(serde_json::json!({
59+
STREAMED_RESULT_SENTINEL: {
60+
"result_id": result_id,
61+
"result_size": bytes,
62+
"summary": format!("report with {total} chunks"),
63+
}
64+
}))
65+
}
66+
}
67+
68+
#[tokio::main]
69+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
70+
let runtime = ARCPRuntime::builder()
71+
.with_authenticator(Box::new(
72+
BearerAuthenticator::new().with_token("demo-token", "demo"),
73+
))
74+
.with_tools(
75+
ToolRegistryBuilder::new()
76+
.with(Arc::new(ReportBuilder))
77+
.build(),
78+
)
79+
.build()
80+
.await?;
81+
82+
let (server_t, client_t) = paired();
83+
let _h = runtime.serve_connection(server_t);
84+
85+
let mut open = Envelope::new(MessageType::SessionOpen(SessionOpenPayload {
86+
auth: Credentials {
87+
scheme: AuthScheme::Bearer,
88+
token: Some("demo-token".into()),
89+
},
90+
client: ClientIdentity {
91+
kind: "result-chunk-demo".into(),
92+
version: env!("CARGO_PKG_VERSION").into(),
93+
fingerprint: None,
94+
principal: None,
95+
},
96+
capabilities: Capabilities::default(),
97+
}));
98+
open.id = arcp::ids::MessageId::new();
99+
client_t.send(open).await?;
100+
let accepted = client_t.recv().await?.ok_or("no session.accepted")?;
101+
let MessageType::SessionAccepted(payload) = accepted.payload else {
102+
return Err("expected session.accepted".into());
103+
};
104+
let session_id = payload.session_id;
105+
println!("connected; session_id={session_id}");
106+
107+
let mut invoke = Envelope::new(MessageType::ToolInvoke(ToolInvokePayload {
108+
tool: "report-builder".into(),
109+
arguments: serde_json::json!({"chunks": 5}),
110+
}));
111+
invoke.session_id = Some(session_id);
112+
client_t.send(invoke).await?;
113+
114+
let mut assembler = ResultChunkAssembler::new();
115+
let mut chunks = 0u32;
116+
loop {
117+
let env = tokio::time::timeout(Duration::from_secs(2), client_t.recv())
118+
.await??
119+
.ok_or("transport closed")?;
120+
match env.payload {
121+
MessageType::JobAccepted(p) => println!("job_id={}", p.job_id),
122+
MessageType::JobResultChunk(chunk) => {
123+
chunks += 1;
124+
println!(
125+
"result_chunk seq={} more={} len={}B",
126+
chunk.chunk_seq,
127+
chunk.more,
128+
chunk.data.len()
129+
);
130+
let _ = assembler.push(&chunk)?;
131+
}
132+
MessageType::JobCompleted(p) => {
133+
println!(
134+
"job.completed result_id={:?} result_size={:?} summary={:?}",
135+
p.result_id, p.result_size, p.summary
136+
);
137+
break;
138+
}
139+
_ => {}
140+
}
141+
}
142+
143+
let assembled = assembler.finish_utf8()?;
144+
println!(
145+
"assembled {} chunks into {} bytes (head: {:?})",
146+
chunks,
147+
assembled.len(),
148+
&assembled[..assembled.len().min(40)],
149+
);
150+
151+
Ok(())
152+
}

0 commit comments

Comments
 (0)