Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions slack-bridge/follower-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
resolvePinetMeshAuth,
resolveRuntimeAgentIdentity,
syncFollowerInboxEntries,
syncTransferredSlackThreadContexts,
} from "./helpers.js";
import {
type FollowerDeliveryState,
Expand Down Expand Up @@ -322,6 +323,18 @@ export function createFollowerRuntime(deps: FollowerRuntimeDeps): FollowerRuntim
}

if (agentMessages.length > 0) {
const transferredThreads = syncTransferredSlackThreadContexts(
agentMessages,
deps.getThreads(),
deps.getAgentOwnerToken(),
);
if (transferredThreads.threadUpdates.length > 0) {
mergeFollowerThreadUpdates(deps.getThreads(), transferredThreads.threadUpdates);
if (transferredThreads.changed) {
deps.persistState();
}
}

const pinetPrompt = formatPinetInboxMessages(agentMessages);
if (deps.deliverFollowUpMessage(pinetPrompt)) {
markFollowerInboxIdsDelivered(deps.deliveryState, getInboxIds(agentMessages));
Expand Down
140 changes: 140 additions & 0 deletions slack-bridge/helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import {
isAgentToAgentEntry,
partitionFollowerInboxEntries,
syncBrokerInboxEntries,
syncTransferredSlackThreadContexts,
buildBrokerProtocolGuardrailsPrompt,
buildWorkerPromptGuidelines,
buildIdentityReplyGuidelines,
Expand Down Expand Up @@ -765,6 +766,36 @@ describe("formatPinetInboxMessages", () => {
expect(result).not.toContain("ACK briefly after reading, do the work");
});

it("surfaces transferred Slack thread context with slack_send guidance", () => {
const result = formatPinetInboxMessages([
{
inboxId: 18,
message: {
threadId: "a2a:broker:worker",
sender: "broker-id",
body: "Take issue #756",
metadata: {
senderAgent: "Broker Bunny",
a2a: true,
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1779139556.450249",
source: "slack",
channel: "C0APL58LB1R",
},
},
},
},
]);

expect(result).toContain(
"transferred_slack_thread thread_ts=1779139556.450249 channel=C0APL58LB1R reply=slack_send",
);
expect(result).toContain(
"transferred Slack threads can be replied to with slack_send using the shown thread_ts.",
);
});

it("falls back to the sender id when no senderAgent metadata exists", () => {
const result = formatPinetInboxMessages([
{
Expand Down Expand Up @@ -3393,6 +3424,115 @@ describe("isDirectMessageChannel", () => {
});
});

// ─── syncTransferredSlackThreadContexts ───────────────────

describe("syncTransferredSlackThreadContexts", () => {
it("hydrates follower thread state from transferred Slack thread metadata", () => {
const threads = new Map<string, FollowerThreadState>();
const result = syncTransferredSlackThreadContexts(
[
{
inboxId: 18,
message: {
threadId: "a2a:broker:worker",
sender: "broker-id",
body: "Take issue #756",
metadata: {
a2a: true,
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1779139556.450249",
source: "slack",
channel: "C0APL58LB1R",
},
},
},
},
],
threads,
"AgentOwner",
);

expect(result.changed).toBe(true);
expect(result.threadUpdates).toEqual([
{
channelId: "C0APL58LB1R",
threadTs: "1779139556.450249",
userId: "",
owner: "AgentOwner",
source: "slack",
},
]);
});

it("overwrites stale cached owner on explicit transfer", () => {
const threads = new Map<string, FollowerThreadState>([
[
"1779139556.450249",
{
channelId: "C_OLD",
threadTs: "1779139556.450249",
userId: "U_ORIGINAL",
owner: "PreviousOwner",
source: "slack",
},
],
]);

const result = syncTransferredSlackThreadContexts(
[
{
message: {
threadId: "a2a:broker:worker",
sender: "broker-id",
metadata: {
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1779139556.450249",
source: "slack",
channel: "C0APL58LB1R",
},
},
},
},
],
threads,
"AgentOwner",
);

expect(result.changed).toBe(true);
expect(result.threadUpdates).toEqual([
{
channelId: "C0APL58LB1R",
threadTs: "1779139556.450249",
userId: "U_ORIGINAL",
owner: "AgentOwner",
source: "slack",
},
]);
});

it("ignores transfer metadata without channel context", () => {
const result = syncTransferredSlackThreadContexts(
[
{
message: {
threadId: "a2a:broker:worker",
sender: "broker-id",
metadata: {
threadOwnershipTransfer: { mode: "transfer", threadId: "1779139556.450249" },
},
},
},
],
new Map<string, FollowerThreadState>(),
"AgentOwner",
);

expect(result).toEqual({ threadUpdates: [], changed: false });
});
});

// ─── syncFollowerInboxEntries ─────────────────────────────

describe("syncFollowerInboxEntries", () => {
Expand Down
98 changes: 90 additions & 8 deletions slack-bridge/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,38 @@ function formatPinetInboxPointer(entry: FollowerInboxEntry): string {
return buildPinetReadPointer(entry.message.threadId ?? "");
}

function getTransferredSlackThreadContext(
metadata: Record<string, unknown> | null,
): { threadId: string; channel?: string } | null {
const transfer = metadata?.threadOwnershipTransfer;
if (!transfer || typeof transfer !== "object" || Array.isArray(transfer)) {
return null;
}

const raw = transfer as Record<string, unknown>;
const threadId = typeof raw.threadId === "string" ? raw.threadId.trim() : "";
if (!threadId) {
return null;
}

const source = typeof raw.source === "string" ? raw.source.trim() : "";
if (source && source !== "slack") {
return null;
}

const channel = typeof raw.channel === "string" ? raw.channel.trim() : "";
return { threadId, ...(channel ? { channel } : {}) };
}

function formatPinetThreadTransferSuffix(entry: FollowerInboxEntry): string {
const transfer = getTransferredSlackThreadContext(entry.message.metadata);
if (!transfer) {
return "";
}

return ` transferred_slack_thread thread_ts=${transfer.threadId}${transfer.channel ? ` channel=${transfer.channel}` : ""} reply=slack_send`;
}

export function formatPinetInboxMessages(entries: FollowerInboxEntry[]): string {
const annotatedEntries = entries.map((entry) => {
const classification = classifyPinetMail({
Expand All @@ -451,7 +483,8 @@ export function formatPinetInboxMessages(entries: FollowerInboxEntry[]): string
const sender = getPinetSenderLabel(entry.message);
const label = formatPinetMailClassLabel(classification.class);
const inboxSuffix = entry.inboxId != null ? ` inbox_id=${entry.inboxId}` : "";
return `[thread ${threadTs}] [${label}] ${sender}:${inboxSuffix} ${formatPinetInboxPointer(entry)}`;
const transferSuffix = formatPinetThreadTransferSuffix(entry);
return `[thread ${threadTs}] [${label}] ${sender}:${inboxSuffix}${transferSuffix} ${formatPinetInboxPointer(entry)}`;
});

const hasMaintenanceOnly = annotatedEntries.some(
Expand All @@ -461,14 +494,19 @@ export function formatPinetInboxMessages(entries: FollowerInboxEntry[]): string
(entry) => entry.classification.class === "steering",
);
const hasFollowUp = annotatedEntries.some((entry) => entry.classification.class === "fwup");
const hasTransferredSlackThread = annotatedEntries.some(({ entry }) =>
Boolean(getTransferredSlackThreadContext(entry.message.metadata)),
);

const guidance = hasMaintenanceOnly
? hasActionableWork || hasFollowUp
? "Read pointer(s) before acting; reply via pinet action=send for steering/follow-up."
: "Context-only pointer(s); read only if needed."
: hasActionableWork
? "Read pointer(s) before acting; reply via pinet action=send."
: "Read pointer(s) if follow-up is needed; reply via pinet action=send when needed.";
const guidance = hasTransferredSlackThread
? "Read pointer(s) before acting; transferred Slack threads can be replied to with slack_send using the shown thread_ts."
: hasMaintenanceOnly
? hasActionableWork || hasFollowUp
? "Read pointer(s) before acting; reply via pinet action=send for steering/follow-up."
: "Context-only pointer(s); read only if needed."
: hasActionableWork
? "Read pointer(s) before acting; reply via pinet action=send."
: "Read pointer(s) if follow-up is needed; reply via pinet action=send when needed.";

return `New Pinet messages:\n${lines.join("\n")}\n\n${guidance}`;
}
Expand Down Expand Up @@ -2133,6 +2171,11 @@ export interface FollowerInboxSyncResult {
changed: boolean;
}

export interface TransferredSlackThreadSyncResult {
threadUpdates: FollowerThreadState[];
changed: boolean;
}

export interface BrokerInboxControlEntry {
inboxId: number;
command: PinetControlCommand;
Expand All @@ -2147,6 +2190,45 @@ export function isDirectMessageChannel(channel: string): boolean {
return /^D[A-Z0-9]+$/.test(channel);
}

export function syncTransferredSlackThreadContexts(
entries: FollowerInboxEntry[],
existingThreads: ReadonlyMap<string, FollowerThreadState>,
agentOwner: string,
): TransferredSlackThreadSyncResult {
let changed = false;
const threadUpdates: FollowerThreadState[] = [];

for (const entry of entries) {
const transfer = getTransferredSlackThreadContext(entry.message.metadata);
if (!transfer?.channel) {
continue;
}

const existing = existingThreads.get(transfer.threadId);
const nextThread: FollowerThreadState = {
channelId: transfer.channel,
threadTs: transfer.threadId,
userId: existing?.userId ?? "",
owner: agentOwner,
source: "slack",
};

threadUpdates.push(nextThread);
if (
!existing ||
existing.channelId !== nextThread.channelId ||
existing.threadTs !== nextThread.threadTs ||
existing.userId !== nextThread.userId ||
existing.owner !== nextThread.owner ||
existing.source !== nextThread.source
) {
changed = true;
}
}

return { threadUpdates, changed };
}

export function syncFollowerInboxEntries(
entries: FollowerInboxEntry[],
existingThreads: ReadonlyMap<string, FollowerThreadState>,
Expand Down
11 changes: 10 additions & 1 deletion slack-bridge/pinet-mesh-ops.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,19 @@ describe("createPinetMeshOps", () => {
messageId: 1,
target: "Worker One",
transferredThreadId: "1777798507.674009",
transferredThreadChannel: "C123",
});
expect(transferThreadOwnership).toHaveBeenCalledWith("1777798507.674009", "worker-1");
expect(insertedMessages[0]?.body).toContain("Transferred Slack thread context:");
expect(insertedMessages[0]?.body).toContain("thread_ts: 1777798507.674009");
expect(insertedMessages[0]?.body).toContain("channel: C123");
expect(insertedMessages[0]?.metadata).toMatchObject({
threadOwnershipTransfer: { mode: "transfer", threadId: "1777798507.674009" },
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1777798507.674009",
source: "slack",
channel: "C123",
},
senderAgent: "Broker Crane",
a2a: true,
});
Expand Down
Loading
Loading