-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcapability_negotiation.rs
More file actions
178 lines (163 loc) · 5.46 KB
/
Copy pathcapability_negotiation.rs
File metadata and controls
178 lines (163 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
//! Capability-driven peer routing with ordered fallback + cost rollup.
//!
//! Walks an ordered chain of peer runtimes per request class. Marketplace
//! fields ride on the negotiated capabilities (RFC §21 namespace) so no
//! extra round trip is needed to learn cost / latency / class. Retryable
//! errors fall through to the next peer; everything else surfaces.
//!
//! Demonstrates RFC §7, §17.3.1, §18.3.
// Examples are illustrative, not runnable: setup is elided with `todo!()` and
// the protocol shape is what the reader sees. Suppress the lints that would
// otherwise force unidiomatic skeleton code.
#![allow(
clippy::todo,
clippy::unimplemented,
clippy::panic,
clippy::unwrap_used,
clippy::expect_used,
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::doc_markdown,
clippy::needless_pass_by_value,
clippy::too_many_arguments,
clippy::unused_async,
clippy::diverging_sub_expression,
clippy::no_effect_underscore_binding,
clippy::let_unit_value,
clippy::used_underscore_binding,
clippy::let_underscore_untyped,
clippy::struct_field_names,
clippy::manual_let_else,
clippy::map_unwrap_or,
clippy::redundant_pub_crate,
dead_code,
unreachable_code,
unused_assignments,
unused_mut,
unused_imports,
unused_variables
)]
use std::collections::HashMap;
use arcp::error::ARCPError;
use arcp::messages::Capabilities;
use arcp::transport::MemoryTransport;
use arcp::{ARCPClient, Envelope, ErrorCode};
use serde_json::json;
/// Concrete transport pinned for the snippet — production uses `WebSocketTransport`
/// per peer URL.
type Client = ARCPClient<MemoryTransport>;
const PEERS: &[&str] = &[
"anthropic-haiku",
"anthropic-sonnet",
"openai-4o",
"groq-llama",
];
const COST_CEILING_USD_PER_MTOK: f64 = 8.0;
const LATENCY_CEILING_MS: u32 = 800;
fn fallback_chain(class: &str) -> &'static [&'static str] {
match class {
"cheap_fast" => &["groq-llama", "anthropic-haiku", "openai-4o"],
"balanced" => &["anthropic-sonnet", "openai-4o", "anthropic-haiku"],
"deep" => &["anthropic-sonnet"],
_ => &[],
}
}
#[derive(Debug, Clone, Copy)]
struct Profile {
cost_per_mtok: f64,
p50_latency_ms: u32,
}
/// Capabilities is `extra="allow"` at the wire — namespaced fields ride
/// alongside the core booleans (RFC §7 / §21).
fn profile_from(_caps: &Capabilities) -> Profile {
// Read `arcpx.market.cost_per_mtok.v1`, `arcpx.market.p50_latency_ms.v1`,
// `arcpx.market.model_class.v1` from the extension namespace.
todo!()
}
fn candidate_chain(profiles: &HashMap<&str, Profile>, class: &str) -> Vec<&'static str> {
fallback_chain(class)
.iter()
.copied()
.filter(|name| {
profiles.get(name).is_some_and(|p| {
p.cost_per_mtok <= COST_CEILING_USD_PER_MTOK
&& p.p50_latency_ms <= LATENCY_CEILING_MS
})
})
.collect()
}
const fn is_retryable(code: ErrorCode) -> bool {
matches!(
code,
ErrorCode::ResourceExhausted
| ErrorCode::Unavailable
| ErrorCode::DeadlineExceeded
| ErrorCode::Aborted
)
}
/// Walk `chain`. On a retryable wire error or a `tool.error` reply with a
/// retryable code, try the next peer; on a hard error, surface it. The
/// `extensions={"arcpx.market.peer.v1": <name>}` block on each invoke lets
/// downstream observers tell which peer ultimately answered.
async fn invoke_with_fallback(
_clients: &HashMap<&str, Client>,
chain: &[&str],
_tool: &str,
_arguments: serde_json::Value,
_trace_id: &str,
) -> Result<Envelope, ARCPError> {
let mut last: Option<ARCPError> = None;
for _name in chain {
let reply: Result<Envelope, ARCPError> = todo!();
match reply {
Ok(env) => return Ok(env),
Err(exc) => {
let code = exc.code();
last = Some(exc);
if is_retryable(code) {
continue;
}
return Err(last.expect("set above"));
}
}
}
Err(last.unwrap_or_else(|| ARCPError::Unavailable {
detail: "no peers available".into(),
}))
}
#[derive(Debug, Default)]
struct Usage {
tokens_in: u64,
tokens_out: u64,
cost_usd: f64,
by_peer: HashMap<String, f64>,
}
/// Subscribe to each peer's `metric` envelopes; aggregate `tokens.used`
/// (with `kind=input|output`) and `cost.usd` (with `peer=<name>`) into a
/// per-tenant rollup. Standard names from RFC §17.3.1.
fn consume_metric(_env: &Envelope, _totals: &mut HashMap<String, Usage>) {
todo!()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut clients: HashMap<&str, Client> = HashMap::new();
let mut profiles: HashMap<&str, Profile> = HashMap::new();
for name in PEERS {
let client: Client = todo!(); // transport per peer URL, identity, auth elided
let caps: Capabilities = todo!(); // client.negotiated_capabilities()
profiles.insert(name, profile_from(&caps));
clients.insert(name, client);
}
let trace_id = "trace_<uuid>";
let chain = candidate_chain(&profiles, "balanced");
let _reply = invoke_with_fallback(
&clients,
&chain,
"chat.completion",
json!({"prompt": "Hello", "tenant": "acme-corp"}),
trace_id,
)
.await?;
println!("invoked balanced chain across {} peers", chain.len());
Ok(())
}