From 40903cceee90207d6977e18bfeb0c484b571eefb Mon Sep 17 00:00:00 2001 From: Scott Turnbull Date: Tue, 21 Apr 2026 18:01:24 -0400 Subject: [PATCH 1/2] channels/telegram: propagate send failures from api_send_* helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The six outbound helpers in the Telegram adapter (sendMessage, sendPhoto, sendDocument, sendDocument_upload, sendVoice, sendLocation) previously logged a `warn!` on HTTP non-success and still returned `Ok(())`. Callers interpreted that as successful delivery and told the agent "Message sent" even when Telegram had rejected the request (e.g. 400 Bad Request from malformed HTML entities with parse_mode=HTML). The agent recorded phantom success in its session history, corrupting subsequent behavior. The fix returns `Err(format!(...).into())` on HTTP non-success in all six helpers, matching the error-handling convention documented in CONTRIBUTING.md. `api_send_message` is slightly different because it splits long messages into chunks via `split_message(4096)`. Naively returning `Err` on any chunk failure would create a partial-delivery-then-error regression — worse than the original silent success. The function now tracks `delivered_any` across chunks: - First-chunk failure (nothing delivered yet) → return `Err` to surface the failure. This is where the motivating HTML-parse-error bug lives, so the fix is fully effective. - Subsequent-chunk failure (user already received preceding chunks) → log `warn!` and continue with best-effort delivery, matching the convention used by every other adapter in the crate that calls `split_message` (Discord, Gitter, Mattermost, Nextcloud, Twitch, Pumble, etc.). Tests: 4 new tests using a small in-crate stub server (axum on an ephemeral port, reached via the existing `api_url` constructor seam — zero new dependencies). 41 telegram tests pass (37 existing + 4 new). --- crates/openfang-channels/src/telegram.rs | 214 +++++++++++++++++++++-- 1 file changed, 203 insertions(+), 11 deletions(-) diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 6435b55836..4b4e5356a7 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -158,8 +158,20 @@ impl TelegramAdapter { // Any other tag (e.g. , ) causes a 400 Bad Request. let sanitized = sanitize_telegram_html(text); - // Telegram has a 4096 character limit per message — split if needed + // Telegram has a 4096 character limit per message — split if needed. + // + // Error semantics for multi-chunk sends match the convention used by + // sibling adapters that also call `split_message` (Discord, Gitter, + // Mattermost, Nextcloud, Twitch, Pumble): fail loudly if NOTHING was + // delivered (first-chunk failure → return Err so the caller knows), but + // treat a mid-stream failure as best-effort — warn and continue — so the + // user isn't told "send failed" after they've already received + // preceding chunks. The motivating bug (HTML parse errors) is always a + // first-chunk failure anyway (sanitization/parse_mode applies to the + // whole text), so this keeps the fix effective while avoiding a + // partial-delivery-then-error regression. let chunks = split_message(&sanitized, 4096); + let mut delivered_any = false; for chunk in chunks { let mut body = serde_json::json!({ "chat_id": chat_id, @@ -175,7 +187,15 @@ impl TelegramAdapter { if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); warn!("Telegram sendMessage failed ({status}): {body_text}"); + if !delivered_any { + return Err( + format!("Telegram sendMessage failed ({status}): {body_text}").into(), + ); + } + // Partial delivery already happened; continue on best-effort. + continue; } + delivered_any = true; } Ok(()) } @@ -201,9 +221,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendPhoto failed: {body_text}"); + warn!("Telegram sendPhoto failed ({status}): {body_text}"); + return Err(format!("Telegram sendPhoto failed ({status}): {body_text}").into()); } Ok(()) } @@ -230,9 +252,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendDocument failed: {body_text}"); + warn!("Telegram sendDocument failed ({status}): {body_text}"); + return Err(format!("Telegram sendDocument failed ({status}): {body_text}").into()); } Ok(()) } @@ -268,9 +292,13 @@ impl TelegramAdapter { } let resp = self.client.post(&url).multipart(form).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendDocument upload failed: {body_text}"); + warn!("Telegram sendDocument upload failed ({status}): {body_text}"); + return Err( + format!("Telegram sendDocument upload failed ({status}): {body_text}").into(), + ); } Ok(()) } @@ -291,9 +319,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendVoice failed: {body_text}"); + warn!("Telegram sendVoice failed ({status}): {body_text}"); + return Err(format!("Telegram sendVoice failed ({status}): {body_text}").into()); } Ok(()) } @@ -320,9 +350,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendLocation failed: {body_text}"); + warn!("Telegram sendLocation failed ({status}): {body_text}"); + return Err(format!("Telegram sendLocation failed ({status}): {body_text}").into()); } Ok(()) } @@ -1909,4 +1941,164 @@ mod tests { } assert!(!msg.metadata.contains_key("reply_to_message_id")); } + + // ----------------------------------------------------------------------- + // Stub Telegram Bot API server for send-path and reaction-cache tests. + // + // Binds an axum app to an ephemeral port, returns a base URL that the + // `TelegramAdapter` can be pointed at via the `api_url` constructor + // parameter, and records per-call response fixtures + hit count. + // ----------------------------------------------------------------------- + + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Default)] + struct StubServer { + hits: AtomicUsize, + responses: std::sync::Mutex>, + } + + impl StubServer { + fn new(responses: Vec<(u16, &str)>) -> Arc { + Arc::new(Self { + hits: AtomicUsize::new(0), + responses: std::sync::Mutex::new( + responses + .into_iter() + .map(|(s, b)| (s, b.to_string())) + .collect(), + ), + }) + } + + fn hit_count(&self) -> usize { + self.hits.load(Ordering::SeqCst) + } + } + + async fn spawn_stub_server(stub: Arc) -> String { + use axum::{http::StatusCode, routing::any, Router}; + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let stub_for_handler = stub.clone(); + let app = Router::new().fallback(any(move || { + let stub = stub_for_handler.clone(); + async move { + let i = stub.hits.fetch_add(1, Ordering::SeqCst); + let responses = stub.responses.lock().unwrap(); + if i < responses.len() { + let (status, body) = responses[i].clone(); + ( + StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + body, + ) + } else { + ( + StatusCode::OK, + r#"{"ok":true,"result":true}"#.to_string(), + ) + } + } + })); + tokio::spawn(async move { + let _ = axum::serve(listener, app).await; + }); + format!("http://{}", addr) + } + + /// Build an adapter pointed at a stub server, bypassing `start()` (which + /// would call `getMe` / `setMyCommands` against the real API). + fn test_adapter(api_url: String) -> TelegramAdapter { + TelegramAdapter::new( + "test:token".to_string(), + vec![], + Duration::from_millis(10), + Some(api_url), + ) + } + + // ----------------------------------------------------------------------- + // send-path error propagation (api_send_message) + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_api_send_message_single_chunk_400_returns_err() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: can't parse entities"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, "hello", None).await; + + assert!(result.is_err(), "expected Err on single-chunk 400"); + let err = result.unwrap_err().to_string(); + assert!(err.contains("400"), "err should include status: {err}"); + assert!( + err.contains("can't parse entities"), + "err should include body: {err}" + ); + assert_eq!(stub.hit_count(), 1, "expected exactly one POST"); + } + + #[tokio::test] + async fn test_api_send_message_single_chunk_200_returns_ok() { + let stub = StubServer::new(vec![(200, r#"{"ok":true,"result":{}}"#)]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, "hello", None).await; + + assert!(result.is_ok(), "expected Ok on 200: {result:?}"); + assert_eq!(stub.hit_count(), 1); + } + + #[tokio::test] + async fn test_api_send_message_first_chunk_fail_returns_err() { + // Two-chunk message; first POST fails. Nothing delivered → Err. + let big = "a".repeat(5000); // > 4096 → split into two chunks + let stub = StubServer::new(vec![ + (500, r#"{"ok":false,"error_code":500,"description":"server"}"#), + (200, r#"{"ok":true,"result":{}}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, &big, None).await; + + assert!( + result.is_err(), + "first-chunk failure must return Err, got Ok" + ); + // Must have stopped after the failing first chunk — no partial send. + assert_eq!( + stub.hit_count(), + 1, + "expected adapter to abort after first-chunk failure" + ); + } + + #[tokio::test] + async fn test_api_send_message_partial_delivery_returns_ok() { + // Two-chunk message; first POST succeeds (user sees chunk 1), second + // fails. Match sibling-adapter best-effort convention: warn + continue, + // return Ok so the agent isn't told total failure after partial success. + let big = "a".repeat(5000); + let stub = StubServer::new(vec![ + (200, r#"{"ok":true,"result":{}}"#), + (400, r#"{"ok":false,"error_code":400,"description":"some err"}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, &big, None).await; + + assert!( + result.is_ok(), + "partial delivery must return Ok (best-effort), got {result:?}" + ); + assert_eq!(stub.hit_count(), 2, "both chunks should have been attempted"); + } + } From 9fee63d58c7a1d0530f7dc1cd25ebbc1b7444478 Mon Sep 17 00:00:00 2001 From: Scott Turnbull Date: Tue, 21 Apr 2026 18:03:23 -0400 Subject: [PATCH 2/2] channels/telegram: cache terminal setMessageReaction errors per (chat, emoji) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `fire_reaction` calls `setMessageReaction` fire-and-forget on every agent lifecycle event. When Telegram returns a terminal error like `REACTION_INVALID` (emoji not in the bot's free-reaction allowlist), `REACTION_NOT_AVAILABLE` (chat admin restricted this emoji), or `REACTION_TOO_MANY` (per-message cap), retrying on every subsequent turn is pointless log spam and wasted API quota. This adds a per-bot-instance `HashSet<(i64, String)>` keyed by `(chat_id, emoji)` that records terminal rejections and short-circuits future calls for the same pair. Keyed by chat, not just emoji, because `Chat.available_reactions` varies across chats and is admin-mutable (https://core.telegram.org/bots/api#setmessagereaction) — an emoji rejected in chat A may still be valid in chat B. Cache is per-process; on restart it rebuilds naturally, which handles any runtime allowlist change without needing persistence. The terminal-error match uses a small private helper `is_terminal_reaction_error` that substring-matches the three permanent errors. Transient errors (429, 5xx, `MESSAGE_NOT_MODIFIED`, unrelated 400s) are deliberately NOT cached. Concurrency: the cache uses `std::sync::Mutex` — critical section is two `HashSet` ops (contains + insert), never held across `.await`. Endorsed by the Tokio shared-state tutorial (https://tokio.rs/tokio/tutorial/shared-state) for exactly this shape. Two concurrent `fire_reaction` calls for the same (chat, emoji) can both pass the cache check before either rejection lands, producing up to N duplicate API calls on the first rejection; the duplicate `insert` is idempotent so this is benign and self-limits on the second turn. Documented in-code. Tests: 6 new tests covering terminal-error matching, cache insertion, per-chat key isolation, and non-caching of transient and successful responses. Total 47 telegram tests pass (41 existing + 6 new). No new clippy warnings. --- crates/openfang-channels/src/telegram.rs | 216 ++++++++++++++++++++++- 1 file changed, 214 insertions(+), 2 deletions(-) diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 4b4e5356a7..515233dfdd 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -11,9 +11,9 @@ use crate::types::{ use async_trait::async_trait; use futures::Stream; use serde::Serialize; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{mpsc, watch}; use tracing::{debug, info, warn}; @@ -49,6 +49,21 @@ pub struct TelegramAdapter { /// Bot username (without @), populated from `getMe` during `start()`. /// Used for @mention detection in group messages. bot_username: Arc>>, + /// `(chat_id, emoji)` pairs that Telegram has rejected with a terminal + /// `setMessageReaction` error for this bot instance. Checked before + /// issuing the API call so we don't keep retrying reactions that will + /// never succeed in that chat. Keyed by chat so that an emoji restricted + /// in one chat can still be attempted in another (`Chat.available_reactions` + /// can differ per chat and is settable by admins). + /// + /// Cached errors: `REACTION_INVALID` (emoji not in the free-reaction + /// allowlist, or not a valid reaction at all), `REACTION_NOT_AVAILABLE` + /// (chat admin restricted this emoji), and `REACTION_TOO_MANY` (bot hit + /// per-message reaction cap for this chat). Transient errors (429, 5xx, + /// unrelated 400s) are NOT cached. Grows monotonically over process + /// lifetime; cache resets on restart, which is fine because admins can + /// change allowed reactions at any time. + rejected_reactions: Arc>>, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, } @@ -77,6 +92,7 @@ impl TelegramAdapter { poll_interval, api_base_url, bot_username: Arc::new(tokio::sync::RwLock::new(None)), + rejected_reactions: Arc::new(Mutex::new(HashSet::new())), shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, } @@ -388,7 +404,26 @@ impl TelegramAdapter { /// Sets or replaces the bot's emoji reaction on a message. Each new call /// automatically replaces the previous reaction, so there is no need to /// explicitly remove old ones. + /// + /// Telegram restricts non-premium bots to a free-reaction allowlist, and + /// chat admins can further restrict allowed reactions per chat via + /// `Chat.available_reactions`. Terminal errors + /// (`REACTION_INVALID`, `REACTION_NOT_AVAILABLE`, `REACTION_TOO_MANY`) + /// are cached per `(chat_id, emoji)` so we don't keep calling the API + /// with reactions that will never succeed in that chat. Because two + /// concurrent `fire_reaction` calls for the same `(chat_id, emoji)` can + /// both pass the cache check before either rejection lands, the first + /// rejection may produce up to N duplicate API calls where N is the + /// concurrency — this is benign (insert is idempotent) and self-limits + /// on the second turn. fn fire_reaction(&self, chat_id: i64, message_id: i64, emoji: &str) { + // Short-circuit: (chat_id, emoji) previously rejected for this bot. + if let Ok(rejected) = self.rejected_reactions.lock() { + if rejected.contains(&(chat_id, emoji.to_string())) { + return; + } + } + let url = format!( "{}/bot{}/setMessageReaction", self.api_base_url, @@ -400,11 +435,23 @@ impl TelegramAdapter { "reaction": [{"type": "emoji", "emoji": emoji}], }); let client = self.client.clone(); + let rejected_cache = self.rejected_reactions.clone(); + let emoji = emoji.to_string(); tokio::spawn(async move { match client.post(&url).json(&body).send().await { Ok(resp) if !resp.status().is_success() => { let body_text = resp.text().await.unwrap_or_default(); debug!("Telegram setMessageReaction failed: {body_text}"); + if is_terminal_reaction_error(&body_text) { + if let Ok(mut rejected) = rejected_cache.lock() { + if rejected.insert((chat_id, emoji.clone())) { + debug!( + "Telegram: caching rejected reaction (chat={chat_id}, emoji={emoji:?}); \ + further setMessageReaction calls with this pair will be skipped" + ); + } + } + } } Err(e) => { debug!("Telegram setMessageReaction error: {e}"); @@ -415,6 +462,18 @@ impl TelegramAdapter { } } +/// Terminal errors for `setMessageReaction` — retrying with the same +/// `(chat, emoji)` pair will not succeed without an outside change (chat +/// admin updating `Chat.available_reactions`, bot getting Premium, etc.). +/// Callers cache these and stop retrying. Transient errors (429, 5xx, +/// `RETRY_AFTER`, unrelated 400s like `MESSAGE_NOT_MODIFIED`) are NOT +/// included here. +fn is_terminal_reaction_error(body_text: &str) -> bool { + body_text.contains("REACTION_INVALID") + || body_text.contains("REACTION_NOT_AVAILABLE") + || body_text.contains("REACTION_TOO_MANY") +} + impl TelegramAdapter { /// Internal helper: send content with optional forum-topic thread_id. /// @@ -2017,6 +2076,20 @@ mod tests { ) } + async fn wait_for(mut cond: F, timeout_ms: u64) -> bool + where + F: FnMut() -> bool, + { + let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms); + while std::time::Instant::now() < deadline { + if cond() { + return true; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + cond() + } + // ----------------------------------------------------------------------- // send-path error propagation (api_send_message) // ----------------------------------------------------------------------- @@ -2101,4 +2174,143 @@ mod tests { assert_eq!(stub.hit_count(), 2, "both chunks should have been attempted"); } + // ----------------------------------------------------------------------- + // reaction cache (fire_reaction + is_terminal_reaction_error) + // ----------------------------------------------------------------------- + + #[test] + fn test_is_terminal_reaction_error_matches() { + assert!(is_terminal_reaction_error( + r#"{"ok":false,"description":"Bad Request: REACTION_INVALID"}"# + )); + assert!(is_terminal_reaction_error( + r#"{"description":"Bad Request: REACTION_NOT_AVAILABLE"}"# + )); + assert!(is_terminal_reaction_error( + r#"{"description":"Bad Request: REACTION_TOO_MANY"}"# + )); + } + + #[test] + fn test_is_terminal_reaction_error_rejects_transient() { + assert!(!is_terminal_reaction_error( + r#"{"description":"Too Many Requests: retry after 5"}"# + )); + assert!(!is_terminal_reaction_error( + r#"{"description":"Bad Request: MESSAGE_NOT_MODIFIED"}"# + )); + assert!(!is_terminal_reaction_error(r#"{"ok":true}"#)); + assert!(!is_terminal_reaction_error("")); + } + + #[tokio::test] + async fn test_fire_reaction_caches_on_reaction_invalid() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: REACTION_INVALID"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "⏳"); + + let cached = wait_for( + || { + adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "⏳".to_string()))) + .unwrap_or(false) + }, + 1000, + ) + .await; + assert!(cached, "emoji should be cached after REACTION_INVALID"); + assert_eq!(stub.hit_count(), 1); + + // Second call with same (chat, emoji) must short-circuit. + adapter.fire_reaction(999, 2, "⏳"); + // Give any rogue task time to fire. + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!( + stub.hit_count(), + 1, + "short-circuit should have prevented second POST" + ); + } + + #[tokio::test] + async fn test_fire_reaction_cache_is_per_chat() { + // Same emoji rejected in chat A should NOT short-circuit in chat B. + let stub = StubServer::new(vec![ + ( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: REACTION_INVALID"}"#, + ), + (200, r#"{"ok":true,"result":true}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(111, 1, "⏳"); + wait_for( + || { + adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(111_i64, "⏳".to_string()))) + .unwrap_or(false) + }, + 1000, + ) + .await; + assert_eq!(stub.hit_count(), 1); + + adapter.fire_reaction(222, 1, "⏳"); + wait_for(|| stub.hit_count() >= 2, 1000).await; + assert_eq!( + stub.hit_count(), + 2, + "different chat_id must still fire even when same emoji was cached" + ); + } + + #[tokio::test] + async fn test_fire_reaction_does_not_cache_non_terminal() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: MESSAGE_NOT_MODIFIED"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "⏳"); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(stub.hit_count(), 1); + + let cached = adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "⏳".to_string()))) + .unwrap_or(true); + assert!(!cached, "non-terminal 400 must NOT populate the cache"); + } + + #[tokio::test] + async fn test_fire_reaction_does_not_cache_on_success() { + let stub = StubServer::new(vec![(200, r#"{"ok":true,"result":true}"#)]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "🤔"); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(stub.hit_count(), 1); + + let cached = adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "🤔".to_string()))) + .unwrap_or(true); + assert!(!cached, "successful reaction must NOT populate the cache"); + } }