-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathack_backpressure.rs
More file actions
101 lines (88 loc) · 3.36 KB
/
Copy pathack_backpressure.rs
File metadata and controls
101 lines (88 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! ARCP v1.1 §6.5 — `backpressure` / slow-consumer flow-control demo.
//!
//! The runtime tracks unacked event lag per session. When the client falls
//! behind (ack watermark far below the latest `event_seq`), the runtime
//! emits a `backpressure` envelope and may throttle event delivery until
//! the client catches up.
//!
//! This example:
//! 1. Connects with a very slow auto-ack cadence so the consumer
//! intentionally falls behind.
//! 2. Submits a `chatty` job that emits ~2 000 metric events.
//! 3. Waits to observe a `backpressure` envelope from the runtime.
//! 4. Catches up by sending a `session.ack` at the current high watermark.
//!
//! Run with:
//! `cargo run --example ack_backpressure`
#![allow(
clippy::todo,
clippy::unimplemented,
clippy::panic,
clippy::unwrap_used,
clippy::expect_used,
clippy::missing_errors_doc,
clippy::missing_panics_doc,
clippy::doc_markdown,
clippy::needless_pass_by_value,
clippy::too_many_arguments,
clippy::unused_async,
clippy::diverging_sub_expression,
clippy::no_effect_underscore_binding,
clippy::let_unit_value,
clippy::used_underscore_binding,
clippy::let_underscore_untyped,
clippy::struct_field_names,
clippy::manual_let_else,
clippy::map_unwrap_or,
clippy::redundant_pub_crate,
dead_code,
unreachable_code,
unused_assignments,
unused_mut,
unused_imports,
unused_variables
)]
use arcp::error::ARCPError;
use arcp::transport::MemoryTransport;
use arcp::{ARCPClient, Envelope};
use serde_json::json;
type Client = ARCPClient<MemoryTransport>;
const CHATTY_COUNT: u32 = 2_000;
/// Submit the `chatty` job and return the job ID from `job.accepted`.
async fn submit_chatty(_client: &Client) -> Result<String, ARCPError> {
// client.request(envelope("tool.invoke", {tool: "chatty",
// arguments: {count: CHATTY_COUNT}})) -> job_id from job.accepted
todo!()
}
/// Send a `session.ack` advancing the watermark to `last_seq`.
async fn ack(_client: &Client, _last_seq: u64) -> Result<(), ARCPError> {
// client.send(envelope("session.ack", {last_processed_seq: last_seq}))
todo!()
}
/// Receive events until the job reaches a terminal state. Returns
/// `(metric_count, back_pressure_observed)`.
async fn drain(_client: &Client, _job_id: &str) -> Result<(u32, bool), ARCPError> {
let mut metrics: u32 = 0;
let mut back_pressure = false;
// for await env in client.events():
// match env.payload.type_name():
// "metric" => metrics += 1,
// "backpressure" => { back_pressure = true; ack(client, env.event_seq).await? }
// "job.completed" | "job.failed" | "job.cancelled" => break,
Ok((metrics, back_pressure))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect with autoAck disabled / cadence set very slow so the
// consumer falls behind and the runtime emits `backpressure`.
let client: Client = todo!(); // transport, identity, auth elided
let job_id = submit_chatty(&client).await?;
println!("accepted: job_id={job_id}");
let (metrics, bp) = drain(&client, &job_id).await?;
println!("metrics observed={metrics} back_pressure={bp}");
if !bp {
return Err("expected a backpressure event but none arrived".into());
}
println!("back-pressure observed and acknowledged — done");
Ok(())
}