Skip to content

Handle MCP server-initiated notifications so hosted MCP servers can push events to agents in real time #1096

@Streamweaver

Description

@Streamweaver

Context

MCP's wire protocol supports server→client push via several notification
types: notifications/resources/updated (driven by the per-URI
resources/subscribe client→server request),
notifications/resources/list_changed,
notifications/tools/list_changed,
notifications/prompts/list_changed, notifications/progress,
notifications/cancelled, and notifications/message (server-side
logging).

Hosted MCP servers rely on these to deliver events in real time without
client polling. Examples across the ecosystem where push vs. poll is the
difference between a usable and a useless UX:

  • Inbox MCP servers (AgentMail, Gmail MCP variants) — agent replies
    seconds after mail lands rather than minutes later.
  • Chat-bridge MCPs (Slack, Discord, Matrix) — mentions get
    same-conversation responses rather than feeling batched.
  • Filesystem MCP — agent reacts when a watched directory changes.
  • Database MCPs with LISTEN/NOTIFY — agents run on row-level
    events for ops/alerting.
  • Calendar, GitHub, home-automation MCPs — invite changes, issue
    creation, sensor events drive agent workflows.

What OpenFang does today

crates/openfang-runtime/src/mcp.rs:94-97,275,320 initializes every
MCP connection with a bare rmcp::model::ClientInfo. rmcp dispatches
incoming notifications via ClientHandler
(rmcp::handler::client::ClientHandler), but ClientInfo takes the
blanket no-op impl (rmcp-1.3.0/src/handler/client.rs:263).
Notifications are received and typed; they just land in default no-op
callbacks (on_resource_updated, on_tool_list_changed,
on_prompt_list_changed, …). Separately, resources/subscribe
requests are never issued from McpConnection, so
notifications/resources/updated cannot fire for any URI even if a
handler existed. Server ClientCapabilities are ::default() — no
negotiation.

Result: hosted MCP endpoints that rely on push cannot drive reactive
behavior in an OpenFang agent. The only available paths are polling
via schedule_create or a per-provider webhook bridge.

Proposed design

Wrap ClientInfo in a handler that routes notifications into the
existing kernel event bus. openfang-kernel/src/event_bus.rs already
provides per-AgentId fan-out via publish(Event) and
subscribe_agent(AgentId) — no new channel-adapter primitive needed.

Sketch:

// crates/openfang-runtime/src/mcp.rs
struct OpenfangMcpClient {
    info: ClientInfo,
    bus: Arc<EventBus>,
    server_name: String,
    subs: Arc<DashMap<Uri, AgentId>>, // (uri → subscribed agent)
}

impl ClientHandler for OpenfangMcpClient {
    async fn on_resource_updated(
        &self,
        _ctx: NotificationContext<RoleClient>,
        params: ResourceUpdatedParams,
    ) {
        if let Some(agent) = self.subs.get(&params.uri) {
            self.bus.publish(Event::McpServerNotification {
                server: self.server_name.clone(),
                agent: *agent,
                kind: McpNotificationKind::ResourceUpdated(params),
            });
        }
    }
    // ... parallel handlers for the other notification types
}

// New method on McpConnection:
async fn subscribe_resource(
    &self,
    agent: AgentId,
    uri: Uri,
) -> Result<(), String> {
    self.subs.insert(uri.clone(), agent);
    self.client.peer()
        .subscribe(SubscribeRequestParams { uri })
        .await
        .map_err(|e| e.to_string())
}

Agents receive events as distinct Event variants — not synthesized
user turns (those corrupt conversation history and break replay/eval).

Scope (v1)

In: resources/updated, resources/list_changed,
tools/list_changed, prompts/list_changed. Client-side capability
negotiation via initialize. Per-(agent, server, uri) subscription
tracking with explicit lifecycle.

Out (deferred):

  • notifications/message — server-side logging, would burn agent
    tokens if routed as events.
  • notifications/progress / notifications/cancelled — belong to
    specific in-flight tool invocations; different routing model.

Open design questions (my recommendations, please push back)

  1. Subscription initiation. Agents subscribe explicitly via a new
    mcp_subscribe_resource(uri) tool. Keeps capability grants visible
    in the manifest and auditable. Alternative (auto-subscribe on
    manifest reference) hides side effects and I'd push back on it.
  2. Reconnect semantics. On connection drop, replay all tracked
    resources/subscribe calls, then publish a
    Event::McpConnectionReconnected so agents can catch up
    deliberately if they want to.
  3. Backpressure. Bounded per-connection queue, drop-oldest policy,
    with a McpNotificationDropped event emitted for observability.
    Alternatives considered: coalesce per-URI (harder to reason about),
    block transport (backs up the server). Prefer drop-oldest.
  4. Security gate. Default off: [[mcp_servers]] grows an
    allow_push_events = bool field, default false. A compromised MCP
    server shouldn't silently invoke agents; operator opts in per
    server.

Ask

Sanity-check from a maintainer on (a) event-bus-based routing vs. a
new channel adapter, and (b) the four recommendations above. Barring
concerns, I'll open a draft PR in ~1 week implementing the sketch —
reviewable as a single unit or split into (1) handler plumbing +
event-bus wiring, (2) subscribe plumbing, (3) capability negotiation,
(4) config/security gate.

Related: #1095 (unrelated drive-by stdio-env-passthrough fix, not a
dependency).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions