From df00a0a061355db92e3e075114e28a93af8bd488 Mon Sep 17 00:00:00 2001 From: Ben Vinegar Date: Tue, 24 Feb 2026 14:53:00 -0500 Subject: [PATCH] =?UTF-8?q?bridge:=20add=20=F0=9F=91=80=20on=20receive=20a?= =?UTF-8?q?nd=20=E2=9C=85=20on=20reply=20emoji=20reactions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user message arrives in Slack, the bridge immediately reacts with :eyes: to signal attentiveness. When the agent sends its first reply back to the same thread, the bridge adds :white_check_mark: to the original message. Both bridges (broker-pull and legacy Socket Mode) implement the same behavior. Pending ack reactions are tracked in a map keyed by thread and auto-expire after 10 minutes. Reactions are fire-and-forget — failures are logged but never block message delivery. --- slack-bridge/bridge.mjs | 63 +++++++++++++++++++++++++ slack-bridge/broker-bridge.mjs | 54 +++++++++++++++++++++ test/broker-bridge.integration.test.mjs | 8 +++- 3 files changed, 124 insertions(+), 1 deletion(-) diff --git a/slack-bridge/bridge.mjs b/slack-bridge/bridge.mjs index 645c35a..cb436e6 100644 --- a/slack-bridge/bridge.mjs +++ b/slack-bridge/bridge.mjs @@ -74,6 +74,41 @@ const threadLookup = new Map(); // "channel:thread_ts" → thread-N let threadCounter = 0; const MAX_THREADS = 10_000; +// Track inbound message timestamps pending a ✅ reaction. +// Key: "channel:thread_ts" (the thread root), Value: { channel, messageTs, receivedAt } +// When the agent replies via /send with a matching thread_ts, we react with ✅ +// on the original inbound message and remove the entry. +const pendingAckReactions = new Map(); +const PENDING_ACK_TTL_MS = 10 * 60 * 1000; // 10 minutes + +/** + * When the agent sends a reply in a thread, resolve the pending ack by + * adding a ✅ reaction to the original inbound message and removing the entry. + * Also prunes expired entries. + */ +function resolveAckReaction(channel, threadTs) { + const now = Date.now(); + for (const [key, entry] of pendingAckReactions) { + if (now - entry.receivedAt > PENDING_ACK_TTL_MS) { + pendingAckReactions.delete(key); + } + } + + const threadKey = `${channel}:${threadTs}`; + const pending = pendingAckReactions.get(threadKey); + if (!pending) return; + + pendingAckReactions.delete(threadKey); + app.client.reactions.add({ + token: process.env.SLACK_BOT_TOKEN, + channel: pending.channel, + timestamp: pending.messageTs, + name: "white_check_mark", + }).catch((err) => { + console.warn(`✅ check reaction failed: ${err.message}`); + }); +} + /** * Evict the oldest entries when the registry exceeds MAX_THREADS. * Maps iterate in insertion order, so the first entries are the oldest. @@ -259,6 +294,24 @@ async function handleMessage(userMessage, event, say) { console.log(`💬 from <@${event.user}>: ${userMessage}`); + // React with 👀 immediately so the user knows we saw their message. + app.client.reactions.add({ + token: process.env.SLACK_BOT_TOKEN, + channel: event.channel, + timestamp: event.ts, + name: "eyes", + }).catch((err) => { + console.warn(`👀 eyes reaction failed: ${err.message}`); + }); + + // Track this message so we can add ✅ when the agent replies. + const threadKey = `${event.channel}:${event.thread_ts || event.ts}`; + pendingAckReactions.set(threadKey, { + channel: event.channel, + messageTs: event.ts, + receivedAt: Date.now(), + }); + try { // Always re-resolve the socket before sending (handles agent restarts). // Capture into a local to avoid TOCTOU with concurrent handleMessage calls. @@ -422,6 +475,12 @@ function startApiServer() { }); console.log(`📤 Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); + + // If this is a threaded reply, check for a pending ✅ ack reaction. + if (thread_ts) { + resolveAckReaction(channel, thread_ts); + } + res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel })); @@ -460,6 +519,10 @@ function startApiServer() { }); console.log(`📤 Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); + + // Check for a pending ✅ ack reaction on the /reply path too. + resolveAckReaction(thread.channel, thread.thread_ts); + res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel })); diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index 2b42e38..3415754 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -149,6 +149,37 @@ const threadLookup = new Map(); let threadCounter = 0; const MAX_THREADS = 10_000; +// Track inbound message timestamps pending a ✅ reaction. +// Key: "channel:thread_ts" (the thread root), Value: { channel, messageTs, receivedAt } +// When the agent replies via /send with a matching thread_ts, we react with ✅ +// on the original inbound message and remove the entry. +const pendingAckReactions = new Map(); +const PENDING_ACK_TTL_MS = 10 * 60 * 1000; // 10 minutes + +/** + * When the agent sends a reply in a thread, resolve the pending ack by + * adding a ✅ reaction to the original inbound message and removing the entry. + * Also prunes expired entries. + */ +function resolveAckReaction(channel, threadTs) { + const now = Date.now(); + // Prune expired entries while we're here + for (const [key, entry] of pendingAckReactions) { + if (now - entry.receivedAt > PENDING_ACK_TTL_MS) { + pendingAckReactions.delete(key); + } + } + + const threadKey = `${channel}:${threadTs}`; + const pending = pendingAckReactions.get(threadKey); + if (!pending) return; + + pendingAckReactions.delete(threadKey); + _react(pending.channel, pending.messageTs, "white_check_mark").catch((err) => { + logWarn(`✅ check reaction failed: ${err.message}`); + }); +} + let socketPath = null; let cryptoState = null; @@ -695,6 +726,21 @@ async function handleUserMessage(userMessage, event) { logWarn(`⚠️ Suspicious patterns from <@${event.user}>: ${suspicious.join(", ")}`); } + // React with 👀 immediately so the user knows we saw their message. + const ackChannel = event.channel; + const ackMessageTs = event.ts; + _react(ackChannel, ackMessageTs, "eyes").catch((err) => { + logWarn(`👀 eyes reaction failed: ${err.message}`); + }); + + // Track this message so we can add ✅ when the agent replies. + const threadKey = `${ackChannel}:${event.thread_ts || ackMessageTs}`; + pendingAckReactions.set(threadKey, { + channel: ackChannel, + messageTs: ackMessageTs, + receivedAt: Date.now(), + }); + refreshSocket(); const currentSocket = socketPath; if (!currentSocket) { @@ -988,6 +1034,11 @@ function startApiServer() { actionRequestBody: { text: safeText }, }); + // If this is a threaded reply, check for a pending ✅ ack reaction. + if (thread_ts) { + resolveAckReaction(channel, thread_ts); + } + res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts })); return; @@ -1020,6 +1071,9 @@ function startApiServer() { actionRequestBody: { text: safeText }, }); + // Check for a pending ✅ ack reaction on the /reply path too. + resolveAckReaction(thread.channel, thread.thread_ts); + res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts })); return; diff --git a/test/broker-bridge.integration.test.mjs b/test/broker-bridge.integration.test.mjs index ca808c3..8d736f5 100644 --- a/test/broker-bridge.integration.test.mjs +++ b/test/broker-bridge.integration.test.mjs @@ -493,7 +493,13 @@ describe("broker pull bridge semi-integration", () => { expect(receivedCommands.some((cmd) => cmd.type === "get_message")).toBe(false); expect(sendPayloads.some((payload) => payload.action === "chat.postMessage")).toBe(false); - expect(sendPayloads.some((payload) => payload.action === "reactions.add")).toBe(false); + + // Bridge now sends an 👀 reaction on inbound messages (fire-and-forget) + const reactionPayloads = sendPayloads.filter((payload) => payload.action === "reactions.add"); + expect(reactionPayloads.length).toBe(1); + expect(reactionPayloads[0].routing.channel).toBe("C123"); + expect(reactionPayloads[0].routing.timestamp).toBe("1730000000.000100"); + expect(reactionPayloads[0].routing.emoji).toBe("eyes"); }); it("uses protocol-versioned inbox.pull signatures with wait_seconds by default", async () => {