Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions slack-bridge/bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,41 @@ const threadLookup = new Map(); // "channel:thread_ts" β†’ thread-N
let threadCounter = 0;
const MAX_THREADS = 10_000;

// Track inbound message timestamps pending a βœ… reaction.
// Key: "channel:thread_ts" (the thread root), Value: { channel, messageTs, receivedAt }
// When the agent replies via /send with a matching thread_ts, we react with βœ…
// on the original inbound message and remove the entry.
const pendingAckReactions = new Map();
const PENDING_ACK_TTL_MS = 10 * 60 * 1000; // 10 minutes

/**
* When the agent sends a reply in a thread, resolve the pending ack by
* adding a βœ… reaction to the original inbound message and removing the entry.
* Also prunes expired entries.
*/
function resolveAckReaction(channel, threadTs) {
const now = Date.now();
for (const [key, entry] of pendingAckReactions) {
if (now - entry.receivedAt > PENDING_ACK_TTL_MS) {
pendingAckReactions.delete(key);
}
}

const threadKey = `${channel}:${threadTs}`;
const pending = pendingAckReactions.get(threadKey);
if (!pending) return;

pendingAckReactions.delete(threadKey);
app.client.reactions.add({
token: process.env.SLACK_BOT_TOKEN,
channel: pending.channel,
timestamp: pending.messageTs,
name: "white_check_mark",
}).catch((err) => {
console.warn(`βœ… check reaction failed: ${err.message}`);
});
}

/**
* Evict the oldest entries when the registry exceeds MAX_THREADS.
* Maps iterate in insertion order, so the first entries are the oldest.
Expand Down Expand Up @@ -259,6 +294,24 @@ async function handleMessage(userMessage, event, say) {

console.log(`πŸ’¬ from <@${event.user}>: ${userMessage}`);

// React with πŸ‘€ immediately so the user knows we saw their message.
app.client.reactions.add({
token: process.env.SLACK_BOT_TOKEN,
channel: event.channel,
timestamp: event.ts,
name: "eyes",
}).catch((err) => {
console.warn(`πŸ‘€ eyes reaction failed: ${err.message}`);
});

// Track this message so we can add βœ… when the agent replies.
const threadKey = `${event.channel}:${event.thread_ts || event.ts}`;
pendingAckReactions.set(threadKey, {
channel: event.channel,
messageTs: event.ts,
receivedAt: Date.now(),
});
Comment on lines +308 to +313
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same threading behavior as broker-bridge: multiple messages in one thread before agent reply will only mark the last one with βœ…

Prompt To Fix With AI
This is a comment left during a code review.
Path: slack-bridge/bridge.mjs
Line: 308-313

Comment:
same threading behavior as broker-bridge: multiple messages in one thread before agent reply will only mark the last one with βœ…

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as the broker-bridge comment β€” this is intentional. πŸ‘€ marks each individual message, βœ… marks the thread as responded-to. Only the most recent message in a burst gets the checkmark, which avoids visual noise.

Responded by pi using anthropic/claude-sonnet-4-20250514.


try {
// Always re-resolve the socket before sending (handles agent restarts).
// Capture into a local to avoid TOCTOU with concurrent handleMessage calls.
Expand Down Expand Up @@ -422,6 +475,12 @@ function startApiServer() {
});

console.log(`πŸ“€ Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// If this is a threaded reply, check for a pending βœ… ack reaction.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
}

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel }));

Expand Down Expand Up @@ -460,6 +519,10 @@ function startApiServer() {
});

console.log(`πŸ“€ Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// Check for a pending βœ… ack reaction on the /reply path too.
resolveAckReaction(thread.channel, thread.thread_ts);

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel }));

Expand Down
54 changes: 54 additions & 0 deletions slack-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,37 @@ const threadLookup = new Map();
let threadCounter = 0;
const MAX_THREADS = 10_000;

// Track inbound message timestamps pending a βœ… reaction.
// Key: "channel:thread_ts" (the thread root), Value: { channel, messageTs, receivedAt }
// When the agent replies via /send with a matching thread_ts, we react with βœ…
// on the original inbound message and remove the entry.
const pendingAckReactions = new Map();
const PENDING_ACK_TTL_MS = 10 * 60 * 1000; // 10 minutes

/**
* When the agent sends a reply in a thread, resolve the pending ack by
* adding a βœ… reaction to the original inbound message and removing the entry.
* Also prunes expired entries.
*/
function resolveAckReaction(channel, threadTs) {
const now = Date.now();
// Prune expired entries while we're here
for (const [key, entry] of pendingAckReactions) {
if (now - entry.receivedAt > PENDING_ACK_TTL_MS) {
pendingAckReactions.delete(key);
}
}

const threadKey = `${channel}:${threadTs}`;
const pending = pendingAckReactions.get(threadKey);
if (!pending) return;

pendingAckReactions.delete(threadKey);
_react(pending.channel, pending.messageTs, "white_check_mark").catch((err) => {
logWarn(`βœ… check reaction failed: ${err.message}`);
});
}

let socketPath = null;

let cryptoState = null;
Expand Down Expand Up @@ -695,6 +726,21 @@ async function handleUserMessage(userMessage, event) {
logWarn(`⚠️ Suspicious patterns from <@${event.user}>: ${suspicious.join(", ")}`);
}

// React with πŸ‘€ immediately so the user knows we saw their message.
const ackChannel = event.channel;
const ackMessageTs = event.ts;
_react(ackChannel, ackMessageTs, "eyes").catch((err) => {
logWarn(`πŸ‘€ eyes reaction failed: ${err.message}`);
});

// Track this message so we can add βœ… when the agent replies.
const threadKey = `${ackChannel}:${event.thread_ts || ackMessageTs}`;
pendingAckReactions.set(threadKey, {
channel: ackChannel,
messageTs: ackMessageTs,
receivedAt: Date.now(),
});
Comment on lines +737 to +742
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a user sends multiple messages in the same thread before the agent replies, only the last message gets βœ… (each message overwrites the previous entry since they share the same channel:thread_ts key). consider whether this is intentional or if individual messages should be tracked using channel:messageTs as the key instead

Prompt To Fix With AI
This is a comment left during a code review.
Path: slack-bridge/broker-bridge.mjs
Line: 737-742

Comment:
if a user sends multiple messages in the same thread before the agent replies, only the last message gets βœ… (each message overwrites the previous entry since they share the same `channel:thread_ts` key). consider whether this is intentional or if individual messages should be tracked using `channel:messageTs` as the key instead

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional β€” the βœ… signals "the agent replied to this thread", not "the agent processed this specific message." Each individual message already gets its own πŸ‘€ on arrival, so you can tell which ones were seen. Putting βœ… on every queued message in a burst would look noisy.

Responded by pi using anthropic/claude-sonnet-4-20250514.


refreshSocket();
const currentSocket = socketPath;
if (!currentSocket) {
Expand Down Expand Up @@ -988,6 +1034,11 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// If this is a threaded reply, check for a pending βœ… ack reaction.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
}

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts }));
return;
Expand Down Expand Up @@ -1020,6 +1071,9 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// Check for a pending βœ… ack reaction on the /reply path too.
resolveAckReaction(thread.channel, thread.thread_ts);

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts }));
return;
Expand Down
8 changes: 7 additions & 1 deletion test/broker-bridge.integration.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,13 @@ describe("broker pull bridge semi-integration", () => {
expect(receivedCommands.some((cmd) => cmd.type === "get_message")).toBe(false);

expect(sendPayloads.some((payload) => payload.action === "chat.postMessage")).toBe(false);
expect(sendPayloads.some((payload) => payload.action === "reactions.add")).toBe(false);

// Bridge now sends an πŸ‘€ reaction on inbound messages (fire-and-forget)
const reactionPayloads = sendPayloads.filter((payload) => payload.action === "reactions.add");
expect(reactionPayloads.length).toBe(1);
expect(reactionPayloads[0].routing.channel).toBe("C123");
expect(reactionPayloads[0].routing.timestamp).toBe("1730000000.000100");
expect(reactionPayloads[0].routing.emoji).toBe("eyes");
});

it("uses protocol-versioned inbox.pull signatures with wait_seconds by default", async () => {
Expand Down