diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 6435b55836..515233dfdd 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -11,9 +11,9 @@ use crate::types::{ use async_trait::async_trait; use futures::Stream; use serde::Serialize; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{mpsc, watch}; use tracing::{debug, info, warn}; @@ -49,6 +49,21 @@ pub struct TelegramAdapter { /// Bot username (without @), populated from `getMe` during `start()`. /// Used for @mention detection in group messages. bot_username: Arc>>, + /// `(chat_id, emoji)` pairs that Telegram has rejected with a terminal + /// `setMessageReaction` error for this bot instance. Checked before + /// issuing the API call so we don't keep retrying reactions that will + /// never succeed in that chat. Keyed by chat so that an emoji restricted + /// in one chat can still be attempted in another (`Chat.available_reactions` + /// can differ per chat and is settable by admins). + /// + /// Cached errors: `REACTION_INVALID` (emoji not in the free-reaction + /// allowlist, or not a valid reaction at all), `REACTION_NOT_AVAILABLE` + /// (chat admin restricted this emoji), and `REACTION_TOO_MANY` (bot hit + /// per-message reaction cap for this chat). Transient errors (429, 5xx, + /// unrelated 400s) are NOT cached. Grows monotonically over process + /// lifetime; cache resets on restart, which is fine because admins can + /// change allowed reactions at any time. + rejected_reactions: Arc>>, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, } @@ -77,6 +92,7 @@ impl TelegramAdapter { poll_interval, api_base_url, bot_username: Arc::new(tokio::sync::RwLock::new(None)), + rejected_reactions: Arc::new(Mutex::new(HashSet::new())), shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, } @@ -158,8 +174,20 @@ impl TelegramAdapter { // Any other tag (e.g. , ) causes a 400 Bad Request. let sanitized = sanitize_telegram_html(text); - // Telegram has a 4096 character limit per message — split if needed + // Telegram has a 4096 character limit per message — split if needed. + // + // Error semantics for multi-chunk sends match the convention used by + // sibling adapters that also call `split_message` (Discord, Gitter, + // Mattermost, Nextcloud, Twitch, Pumble): fail loudly if NOTHING was + // delivered (first-chunk failure → return Err so the caller knows), but + // treat a mid-stream failure as best-effort — warn and continue — so the + // user isn't told "send failed" after they've already received + // preceding chunks. The motivating bug (HTML parse errors) is always a + // first-chunk failure anyway (sanitization/parse_mode applies to the + // whole text), so this keeps the fix effective while avoiding a + // partial-delivery-then-error regression. let chunks = split_message(&sanitized, 4096); + let mut delivered_any = false; for chunk in chunks { let mut body = serde_json::json!({ "chat_id": chat_id, @@ -175,7 +203,15 @@ impl TelegramAdapter { if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); warn!("Telegram sendMessage failed ({status}): {body_text}"); + if !delivered_any { + return Err( + format!("Telegram sendMessage failed ({status}): {body_text}").into(), + ); + } + // Partial delivery already happened; continue on best-effort. + continue; } + delivered_any = true; } Ok(()) } @@ -201,9 +237,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendPhoto failed: {body_text}"); + warn!("Telegram sendPhoto failed ({status}): {body_text}"); + return Err(format!("Telegram sendPhoto failed ({status}): {body_text}").into()); } Ok(()) } @@ -230,9 +268,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendDocument failed: {body_text}"); + warn!("Telegram sendDocument failed ({status}): {body_text}"); + return Err(format!("Telegram sendDocument failed ({status}): {body_text}").into()); } Ok(()) } @@ -268,9 +308,13 @@ impl TelegramAdapter { } let resp = self.client.post(&url).multipart(form).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendDocument upload failed: {body_text}"); + warn!("Telegram sendDocument upload failed ({status}): {body_text}"); + return Err( + format!("Telegram sendDocument upload failed ({status}): {body_text}").into(), + ); } Ok(()) } @@ -291,9 +335,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendVoice failed: {body_text}"); + warn!("Telegram sendVoice failed ({status}): {body_text}"); + return Err(format!("Telegram sendVoice failed ({status}): {body_text}").into()); } Ok(()) } @@ -320,9 +366,11 @@ impl TelegramAdapter { body["message_thread_id"] = serde_json::json!(tid); } let resp = self.client.post(&url).json(&body).send().await?; - if !resp.status().is_success() { + let status = resp.status(); + if !status.is_success() { let body_text = resp.text().await.unwrap_or_default(); - warn!("Telegram sendLocation failed: {body_text}"); + warn!("Telegram sendLocation failed ({status}): {body_text}"); + return Err(format!("Telegram sendLocation failed ({status}): {body_text}").into()); } Ok(()) } @@ -356,7 +404,26 @@ impl TelegramAdapter { /// Sets or replaces the bot's emoji reaction on a message. Each new call /// automatically replaces the previous reaction, so there is no need to /// explicitly remove old ones. + /// + /// Telegram restricts non-premium bots to a free-reaction allowlist, and + /// chat admins can further restrict allowed reactions per chat via + /// `Chat.available_reactions`. Terminal errors + /// (`REACTION_INVALID`, `REACTION_NOT_AVAILABLE`, `REACTION_TOO_MANY`) + /// are cached per `(chat_id, emoji)` so we don't keep calling the API + /// with reactions that will never succeed in that chat. Because two + /// concurrent `fire_reaction` calls for the same `(chat_id, emoji)` can + /// both pass the cache check before either rejection lands, the first + /// rejection may produce up to N duplicate API calls where N is the + /// concurrency — this is benign (insert is idempotent) and self-limits + /// on the second turn. fn fire_reaction(&self, chat_id: i64, message_id: i64, emoji: &str) { + // Short-circuit: (chat_id, emoji) previously rejected for this bot. + if let Ok(rejected) = self.rejected_reactions.lock() { + if rejected.contains(&(chat_id, emoji.to_string())) { + return; + } + } + let url = format!( "{}/bot{}/setMessageReaction", self.api_base_url, @@ -368,11 +435,23 @@ impl TelegramAdapter { "reaction": [{"type": "emoji", "emoji": emoji}], }); let client = self.client.clone(); + let rejected_cache = self.rejected_reactions.clone(); + let emoji = emoji.to_string(); tokio::spawn(async move { match client.post(&url).json(&body).send().await { Ok(resp) if !resp.status().is_success() => { let body_text = resp.text().await.unwrap_or_default(); debug!("Telegram setMessageReaction failed: {body_text}"); + if is_terminal_reaction_error(&body_text) { + if let Ok(mut rejected) = rejected_cache.lock() { + if rejected.insert((chat_id, emoji.clone())) { + debug!( + "Telegram: caching rejected reaction (chat={chat_id}, emoji={emoji:?}); \ + further setMessageReaction calls with this pair will be skipped" + ); + } + } + } } Err(e) => { debug!("Telegram setMessageReaction error: {e}"); @@ -383,6 +462,18 @@ impl TelegramAdapter { } } +/// Terminal errors for `setMessageReaction` — retrying with the same +/// `(chat, emoji)` pair will not succeed without an outside change (chat +/// admin updating `Chat.available_reactions`, bot getting Premium, etc.). +/// Callers cache these and stop retrying. Transient errors (429, 5xx, +/// `RETRY_AFTER`, unrelated 400s like `MESSAGE_NOT_MODIFIED`) are NOT +/// included here. +fn is_terminal_reaction_error(body_text: &str) -> bool { + body_text.contains("REACTION_INVALID") + || body_text.contains("REACTION_NOT_AVAILABLE") + || body_text.contains("REACTION_TOO_MANY") +} + impl TelegramAdapter { /// Internal helper: send content with optional forum-topic thread_id. /// @@ -1909,4 +2000,317 @@ mod tests { } assert!(!msg.metadata.contains_key("reply_to_message_id")); } + + // ----------------------------------------------------------------------- + // Stub Telegram Bot API server for send-path and reaction-cache tests. + // + // Binds an axum app to an ephemeral port, returns a base URL that the + // `TelegramAdapter` can be pointed at via the `api_url` constructor + // parameter, and records per-call response fixtures + hit count. + // ----------------------------------------------------------------------- + + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[derive(Default)] + struct StubServer { + hits: AtomicUsize, + responses: std::sync::Mutex>, + } + + impl StubServer { + fn new(responses: Vec<(u16, &str)>) -> Arc { + Arc::new(Self { + hits: AtomicUsize::new(0), + responses: std::sync::Mutex::new( + responses + .into_iter() + .map(|(s, b)| (s, b.to_string())) + .collect(), + ), + }) + } + + fn hit_count(&self) -> usize { + self.hits.load(Ordering::SeqCst) + } + } + + async fn spawn_stub_server(stub: Arc) -> String { + use axum::{http::StatusCode, routing::any, Router}; + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let stub_for_handler = stub.clone(); + let app = Router::new().fallback(any(move || { + let stub = stub_for_handler.clone(); + async move { + let i = stub.hits.fetch_add(1, Ordering::SeqCst); + let responses = stub.responses.lock().unwrap(); + if i < responses.len() { + let (status, body) = responses[i].clone(); + ( + StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + body, + ) + } else { + ( + StatusCode::OK, + r#"{"ok":true,"result":true}"#.to_string(), + ) + } + } + })); + tokio::spawn(async move { + let _ = axum::serve(listener, app).await; + }); + format!("http://{}", addr) + } + + /// Build an adapter pointed at a stub server, bypassing `start()` (which + /// would call `getMe` / `setMyCommands` against the real API). + fn test_adapter(api_url: String) -> TelegramAdapter { + TelegramAdapter::new( + "test:token".to_string(), + vec![], + Duration::from_millis(10), + Some(api_url), + ) + } + + async fn wait_for(mut cond: F, timeout_ms: u64) -> bool + where + F: FnMut() -> bool, + { + let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms); + while std::time::Instant::now() < deadline { + if cond() { + return true; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + cond() + } + + // ----------------------------------------------------------------------- + // send-path error propagation (api_send_message) + // ----------------------------------------------------------------------- + + #[tokio::test] + async fn test_api_send_message_single_chunk_400_returns_err() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: can't parse entities"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, "hello", None).await; + + assert!(result.is_err(), "expected Err on single-chunk 400"); + let err = result.unwrap_err().to_string(); + assert!(err.contains("400"), "err should include status: {err}"); + assert!( + err.contains("can't parse entities"), + "err should include body: {err}" + ); + assert_eq!(stub.hit_count(), 1, "expected exactly one POST"); + } + + #[tokio::test] + async fn test_api_send_message_single_chunk_200_returns_ok() { + let stub = StubServer::new(vec![(200, r#"{"ok":true,"result":{}}"#)]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, "hello", None).await; + + assert!(result.is_ok(), "expected Ok on 200: {result:?}"); + assert_eq!(stub.hit_count(), 1); + } + + #[tokio::test] + async fn test_api_send_message_first_chunk_fail_returns_err() { + // Two-chunk message; first POST fails. Nothing delivered → Err. + let big = "a".repeat(5000); // > 4096 → split into two chunks + let stub = StubServer::new(vec![ + (500, r#"{"ok":false,"error_code":500,"description":"server"}"#), + (200, r#"{"ok":true,"result":{}}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, &big, None).await; + + assert!( + result.is_err(), + "first-chunk failure must return Err, got Ok" + ); + // Must have stopped after the failing first chunk — no partial send. + assert_eq!( + stub.hit_count(), + 1, + "expected adapter to abort after first-chunk failure" + ); + } + + #[tokio::test] + async fn test_api_send_message_partial_delivery_returns_ok() { + // Two-chunk message; first POST succeeds (user sees chunk 1), second + // fails. Match sibling-adapter best-effort convention: warn + continue, + // return Ok so the agent isn't told total failure after partial success. + let big = "a".repeat(5000); + let stub = StubServer::new(vec![ + (200, r#"{"ok":true,"result":{}}"#), + (400, r#"{"ok":false,"error_code":400,"description":"some err"}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + let result = adapter.api_send_message(12345, &big, None).await; + + assert!( + result.is_ok(), + "partial delivery must return Ok (best-effort), got {result:?}" + ); + assert_eq!(stub.hit_count(), 2, "both chunks should have been attempted"); + } + + // ----------------------------------------------------------------------- + // reaction cache (fire_reaction + is_terminal_reaction_error) + // ----------------------------------------------------------------------- + + #[test] + fn test_is_terminal_reaction_error_matches() { + assert!(is_terminal_reaction_error( + r#"{"ok":false,"description":"Bad Request: REACTION_INVALID"}"# + )); + assert!(is_terminal_reaction_error( + r#"{"description":"Bad Request: REACTION_NOT_AVAILABLE"}"# + )); + assert!(is_terminal_reaction_error( + r#"{"description":"Bad Request: REACTION_TOO_MANY"}"# + )); + } + + #[test] + fn test_is_terminal_reaction_error_rejects_transient() { + assert!(!is_terminal_reaction_error( + r#"{"description":"Too Many Requests: retry after 5"}"# + )); + assert!(!is_terminal_reaction_error( + r#"{"description":"Bad Request: MESSAGE_NOT_MODIFIED"}"# + )); + assert!(!is_terminal_reaction_error(r#"{"ok":true}"#)); + assert!(!is_terminal_reaction_error("")); + } + + #[tokio::test] + async fn test_fire_reaction_caches_on_reaction_invalid() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: REACTION_INVALID"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "⏳"); + + let cached = wait_for( + || { + adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "⏳".to_string()))) + .unwrap_or(false) + }, + 1000, + ) + .await; + assert!(cached, "emoji should be cached after REACTION_INVALID"); + assert_eq!(stub.hit_count(), 1); + + // Second call with same (chat, emoji) must short-circuit. + adapter.fire_reaction(999, 2, "⏳"); + // Give any rogue task time to fire. + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!( + stub.hit_count(), + 1, + "short-circuit should have prevented second POST" + ); + } + + #[tokio::test] + async fn test_fire_reaction_cache_is_per_chat() { + // Same emoji rejected in chat A should NOT short-circuit in chat B. + let stub = StubServer::new(vec![ + ( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: REACTION_INVALID"}"#, + ), + (200, r#"{"ok":true,"result":true}"#), + ]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(111, 1, "⏳"); + wait_for( + || { + adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(111_i64, "⏳".to_string()))) + .unwrap_or(false) + }, + 1000, + ) + .await; + assert_eq!(stub.hit_count(), 1); + + adapter.fire_reaction(222, 1, "⏳"); + wait_for(|| stub.hit_count() >= 2, 1000).await; + assert_eq!( + stub.hit_count(), + 2, + "different chat_id must still fire even when same emoji was cached" + ); + } + + #[tokio::test] + async fn test_fire_reaction_does_not_cache_non_terminal() { + let stub = StubServer::new(vec![( + 400, + r#"{"ok":false,"error_code":400,"description":"Bad Request: MESSAGE_NOT_MODIFIED"}"#, + )]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "⏳"); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(stub.hit_count(), 1); + + let cached = adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "⏳".to_string()))) + .unwrap_or(true); + assert!(!cached, "non-terminal 400 must NOT populate the cache"); + } + + #[tokio::test] + async fn test_fire_reaction_does_not_cache_on_success() { + let stub = StubServer::new(vec![(200, r#"{"ok":true,"result":true}"#)]); + let base = spawn_stub_server(stub.clone()).await; + let adapter = test_adapter(base); + + adapter.fire_reaction(999, 1, "🤔"); + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!(stub.hit_count(), 1); + + let cached = adapter + .rejected_reactions + .lock() + .map(|s| s.contains(&(999_i64, "🤔".to_string()))) + .unwrap_or(true); + assert!(!cached, "successful reaction must NOT populate the cache"); + } }