diff --git a/crates/openfang-api/src/server.rs b/crates/openfang-api/src/server.rs index ed773fcb7b..09118af2c7 100644 --- a/crates/openfang-api/src/server.rs +++ b/crates/openfang-api/src/server.rs @@ -54,6 +54,10 @@ pub async fn build_router( budget_config: Arc::new(tokio::sync::RwLock::new(kernel.config.budget.clone())), }); + // Start WS cron broadcaster — subscribes to kernel event bus and pushes + // cron job results to all connected WebSocket clients in real-time. + ws::start_ws_cron_broadcaster(kernel.clone()); + // CORS: allow localhost origins by default. If API key is set, the API // is protected anyway. For development, permissive CORS is convenient. let cors = if state.kernel.config.api_key.trim().is_empty() { diff --git a/crates/openfang-api/src/ws.rs b/crates/openfang-api/src/ws.rs index e57eb28d6e..1500b29f8c 100644 --- a/crates/openfang-api/src/ws.rs +++ b/crates/openfang-api/src/ws.rs @@ -19,6 +19,7 @@ use axum::response::IntoResponse; use dashmap::DashMap; use futures::stream::SplitSink; use futures::{SinkExt, StreamExt}; +use openfang_kernel::OpenFangKernel; use openfang_runtime::kernel_handle::KernelHandle; use openfang_runtime::llm_driver::StreamEvent; use openfang_runtime::llm_errors; @@ -30,7 +31,7 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, warn}; /// Per-IP WebSocket connection tracker. @@ -98,6 +99,62 @@ fn ws_tracker() -> &'static DashMap { TRACKER.get_or_init(DashMap::new) } +/// Per-agent WebSocket sender entry. +struct WsSender { + sender: Arc>>, +} + +/// Global registry: agent_id → active WebSocket senders. +/// Uses RwLock for fine-grained read/write access to the sender list. +fn ws_agent_connections() -> &'static DashMap>> { + static REGISTRY: std::sync::OnceLock>>> = + std::sync::OnceLock::new(); + REGISTRY.get_or_init(DashMap::new) +} + +/// Register a WebSocket connection for an agent (async). +pub async fn register_ws_connection( + agent_id: AgentId, + sender: Arc>>, +) { + let entry = ws_agent_connections().entry(agent_id).or_default(); + let mut senders = entry.value().write().await; + senders.push(WsSender { sender }); +} + +/// Deregister a WebSocket connection for an agent. +/// Returns the number of remaining connections for this agent. +pub async fn deregister_ws_connection( + agent_id: AgentId, + sender: &Arc>>, +) -> usize { + let entry = match ws_agent_connections().get(&agent_id) { + Some(e) => e, + None => return 0, + }; + let mut senders = entry.value().write().await; + senders.retain(|s| !Arc::ptr_eq(&s.sender, sender)); + senders.len() +} + +/// Broadcast a JSON message to all active WebSocket connections for an agent. +/// Returns the number of connections the message was sent to. +pub async fn broadcast_to_ws(agent_id: AgentId, msg: serde_json::Value) -> usize { + let entry = match ws_agent_connections().get(&agent_id) { + Some(e) => e, + None => return 0, + }; + let senders = entry.value().read().await; + let mut success_count = 0; + for ws_sender in senders.iter() { + let sender = &ws_sender.sender; + if send_json(sender, &msg).await.is_ok() { + success_count += 1; + } + } + success_count +} + /// RAII guard that decrements the connection count on drop. struct WsConnectionGuard { ip: IpAddr, @@ -264,6 +321,9 @@ async fn handle_agent_ws( let (sender, mut receiver) = socket.split(); let sender = Arc::new(Mutex::new(sender)); + // Register this connection in the global agent-WS registry + register_ws_connection(agent_id, Arc::clone(&sender)).await; + // Per-connection verbose level (default: Full) let verbose = Arc::new(AtomicU8::new(VerboseLevel::Full as u8)); @@ -416,7 +476,8 @@ async fn handle_agent_ws( } } - // Cleanup + // Cleanup: deregister from agent-WS registry and abort background tasks + deregister_ws_connection(agent_id, &sender).await; update_handle.abort(); info!(agent_id = %id_str, "WebSocket disconnected"); } @@ -1333,6 +1394,110 @@ pub fn strip_think_tags(text: &str) -> String { result } +// --------------------------------------------------------------------------- +// Cron Job WS Broadcasting +// --------------------------------------------------------------------------- + +/// Start a background task that subscribes to the kernel's event bus and +/// broadcasts cron job results to all connected WebSocket clients for the +/// relevant agent. +/// +/// This runs independently of the channel bridge — it uses the kernel's +/// event bus to receive `CronJobExecuted` events and pushes them to WS. +pub fn start_ws_cron_broadcaster(kernel: Arc) { + tokio::spawn(async move { + let mut rx = kernel.event_bus.subscribe_all(); + loop { + let event = rx.recv().await; + match event { + Ok(event) => { + if let openfang_types::event::EventPayload::System( + openfang_types::event::SystemEvent::CronJobExecuted { + agent_id, + job_id, + job_name, + trigger_message, + response, + delivered_to_channel: _, + }, + ) = event.payload + { + // Build the trigger message (synthetic user message from cron) + let trigger_msg = serde_json::json!({ + "type": "message", + "content": trigger_message, + "source": "cron", + "job_id": job_id, + "job_name": job_name + }); + let _ = broadcast_to_ws(agent_id, trigger_msg).await; + + // Send typing start + let _ = broadcast_to_ws( + agent_id, + serde_json::json!({"state": "start", "type": "typing"}), + ) + .await; + + // Send streaming phase + let _ = broadcast_to_ws( + agent_id, + serde_json::json!({"detail": null, "phase": "streaming", "type": "phase"}), + ) + .await; + + // Send text delta (full response since we don't have streaming chunks) + let text_delta = serde_json::json!({ + "content": response, + "type": "text_delta" + }); + let _ = broadcast_to_ws(agent_id, text_delta).await; + + // Send done phase + let _ = broadcast_to_ws( + agent_id, + serde_json::json!({"detail": null, "phase": "done", "type": "phase"}), + ) + .await; + + // Send typing stop + let _ = broadcast_to_ws( + agent_id, + serde_json::json!({"state": "stop", "type": "typing"}), + ) + .await; + + // Send final response (mimics the format from agent_loop) + let response_msg = serde_json::json!({ + "type": "response", + "content": response, + "context_pressure": "low", + "cost_usd": null, + "input_tokens": 0, + "iterations": 0, + "output_tokens": 0 + }); + let _ = broadcast_to_ws(agent_id, response_msg).await; + + info!( + agent_id = %agent_id, + job_id = %job_id, + "Cron job result broadcast to WS" + ); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!(lagged_messages = n, "WS cron broadcaster lagged, skipping"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + info!("WS cron broadcaster channel closed, stopping"); + break; + } + } + } + }); +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- diff --git a/crates/openfang-api/static/js/pages/chat.js b/crates/openfang-api/static/js/pages/chat.js index fd47a7fa29..9bd185c6b6 100644 --- a/crates/openfang-api/static/js/pages/chat.js +++ b/crates/openfang-api/static/js/pages/chat.js @@ -695,6 +695,15 @@ function chatPage() { switch (data.type) { case 'connected': break; + // Incoming message from server (e.g., cron trigger) — display as user message + case 'message': + if (data.content) { + var meta = data.source === 'cron' ? '[Scheduled: ' + (data.job_name || data.job_id || '') + ']' : ''; + this.messages.push({ id: ++msgId, role: 'user', text: data.content, meta: meta, tools: [], images: [], ts: Date.now() }); + this.scrollToBottom(); + } + break; + // Legacy thinking event (backward compat) case 'thinking': if (!this.messages.length || !this.messages[this.messages.length - 1].thinking) { diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index 7a39e7bafb..6ee6ed9101 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -5966,17 +5966,31 @@ impl OpenFangKernel { // Multi-destination fan-out (never aborts the job on delivery error). cron_fan_out_targets(self, job_name, &result.response, &delivery_targets) .await; - match cron_deliver_response(self, agent_id, &result.response, &delivery) + let delivered_to_channel = cron_deliver_response(self, agent_id, &result.response, &delivery) .await - { - Ok(()) => { - self.cron_scheduler.record_success(job_id); - Ok(result.response) - } - Err(e) => { - self.cron_scheduler.record_failure(job_id, &e); - Err(e) - } + .is_ok(); + // Publish event for WS broadcast (API layer subscribes and pushes to WebSocket connections). + let cron_event = Event::new( + AgentId::new(), + EventTarget::System, + EventPayload::System(SystemEvent::CronJobExecuted { + agent_id, + job_id: job_id.to_string(), + job_name: job_name.clone(), + trigger_message: message.clone(), + response: result.response.clone(), + delivered_to_channel, + }), + ); + self.publish_event(cron_event).await; + // Note: WS broadcast happens regardless of channel delivery success/failure. + // Channel delivery failure is recorded as a job failure. + if delivered_to_channel { + self.cron_scheduler.record_success(job_id); + Ok(result.response) + } else { + self.cron_scheduler.record_failure(job_id, "channel delivery failed"); + Err("channel delivery failed".to_string()) } } Ok(Err(e)) => { @@ -6020,15 +6034,29 @@ impl OpenFangKernel { Ok(Ok((_run_id, output))) => { // Multi-destination fan-out (never aborts the job on delivery error). cron_fan_out_targets(self, job_name, &output, &delivery_targets).await; - match cron_deliver_response(self, agent_id, &output, &delivery).await { - Ok(()) => { - self.cron_scheduler.record_success(job_id); - Ok(output) - } - Err(e) => { - self.cron_scheduler.record_failure(job_id, &e); - Err(e) - } + let delivered_to_channel = cron_deliver_response(self, agent_id, &output, &delivery) + .await + .is_ok(); + // Publish event for WS broadcast (API layer subscribes and pushes to WebSocket connections). + let cron_event = Event::new( + AgentId::new(), + EventTarget::System, + EventPayload::System(SystemEvent::CronJobExecuted { + agent_id, + job_id: job_id.to_string(), + job_name: job_name.clone(), + trigger_message: format!("workflow: {}", workflow_id), + response: output.clone(), + delivered_to_channel, + }), + ); + self.publish_event(cron_event).await; + if delivered_to_channel { + self.cron_scheduler.record_success(job_id); + Ok(output) + } else { + self.cron_scheduler.record_failure(job_id, "channel delivery failed"); + Err("channel delivery failed".to_string()) } } Ok(Err(e)) => { diff --git a/crates/openfang-kernel/src/triggers.rs b/crates/openfang-kernel/src/triggers.rs index 9fe28ee5a9..05283c9a05 100644 --- a/crates/openfang-kernel/src/triggers.rs +++ b/crates/openfang-kernel/src/triggers.rs @@ -449,6 +449,14 @@ fn describe_event(event: &Event) -> String { "Health check failed: agent {agent_id}, unresponsive for {unresponsive_secs}s" ) } + SystemEvent::CronJobExecuted { + agent_id, + job_id, + job_name, + .. + } => { + format!("Cron job executed: {job_name} ({job_id}) for agent {agent_id}") + } }, EventPayload::Custom(data) => { format!("Custom event ({} bytes)", data.len()) diff --git a/crates/openfang-migrate/src/openclaw.rs b/crates/openfang-migrate/src/openclaw.rs index f79657899a..e0b5ea1e53 100644 --- a/crates/openfang-migrate/src/openclaw.rs +++ b/crates/openfang-migrate/src/openclaw.rs @@ -896,10 +896,10 @@ fn derive_capabilities(tools: &[String]) -> AgentCapabilities { "shell_exec" => { caps.shell = vec!["*".to_string()]; } - "web_fetch" | "web_search" | "browser_navigate" => { - if caps.network.is_empty() { - caps.network = vec!["*".to_string()]; - } + "web_fetch" | "web_search" | "browser_navigate" + if caps.network.is_empty() => + { + caps.network = vec!["*".to_string()]; } "agent_send" | "agent_list" => { if caps.agent_message.is_empty() { diff --git a/crates/openfang-types/src/event.rs b/crates/openfang-types/src/event.rs index 69b1ab107c..c3ab13d8d6 100644 --- a/crates/openfang-types/src/event.rs +++ b/crates/openfang-types/src/event.rs @@ -277,6 +277,21 @@ pub enum SystemEvent { /// How long the agent has been unresponsive. unresponsive_secs: u64, }, + /// A scheduled cron job was executed and produced a result. + CronJobExecuted { + /// The agent that ran the job. + agent_id: AgentId, + /// The job's unique ID. + job_id: String, + /// The job's display name. + job_name: String, + /// The trigger message sent to the agent. + trigger_message: String, + /// The agent's text response. + response: String, + /// Whether the result was delivered to a channel (e.g. Telegram). + delivered_to_channel: bool, + }, } /// A complete event in the OpenFang event system.