Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions slack-bridge/follower-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
resolvePinetMeshAuth,
resolveRuntimeAgentIdentity,
syncFollowerInboxEntries,
syncFollowerTransferredThreadContext,
} from "./helpers.js";
import {
type FollowerDeliveryState,
Expand Down Expand Up @@ -322,6 +323,18 @@ export function createFollowerRuntime(deps: FollowerRuntimeDeps): FollowerRuntim
}

if (agentMessages.length > 0) {
const transferSync = syncFollowerTransferredThreadContext(
agentMessages,
deps.getThreads(),
deps.getAgentOwnerToken(),
);
if (transferSync.threadUpdates.length > 0) {
mergeFollowerThreadUpdates(deps.getThreads(), transferSync.threadUpdates);
if (transferSync.changed) {
deps.persistState();
}
}

const pinetPrompt = formatPinetInboxMessages(agentMessages);
if (deps.deliverFollowUpMessage(pinetPrompt)) {
markFollowerInboxIdsDelivered(deps.deliveryState, getInboxIds(agentMessages));
Expand Down
110 changes: 110 additions & 0 deletions slack-bridge/helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import {
alignAgentIdentityToRole,
trackBrokerInboundThread,
syncFollowerInboxEntries,
syncFollowerTransferredThreadContext,
resolveFollowerThreadChannel,
isDirectMessageChannel,
buildFollowerRuntimeDiagnostic,
Expand Down Expand Up @@ -3509,6 +3510,115 @@ describe("syncFollowerInboxEntries", () => {
});
});

// ─── syncFollowerTransferredThreadContext ─────────────────

describe("syncFollowerTransferredThreadContext", () => {
it("extracts Slack channel context from explicit broker transfer metadata", () => {
const threads = new Map<string, FollowerThreadState>();
const result = syncFollowerTransferredThreadContext(
[
{
inboxId: 41,
message: {
threadId: "a2a:broker-1:worker-1",
source: "agent",
sender: "broker-1",
body: "please take over",
createdAt: "2026-05-18T22:00:00.000Z",
metadata: {
a2a: true,
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1777798507.674009",
source: "slack",
channel: "C123",
},
},
},
},
],
threads,
"worker-owner-token",
);

expect(result).toEqual({
changed: true,
threadUpdates: [
{
channelId: "C123",
threadTs: "1777798507.674009",
userId: "broker-1",
owner: "worker-owner-token",
source: "slack",
},
],
});
});

it("ignores transfers without channel context so callers fall back to diagnostics", () => {
const result = syncFollowerTransferredThreadContext(
[
{
message: {
threadId: "a2a:broker-1:worker-1",
sender: "broker-1",
body: "please take over",
metadata: {
a2a: true,
threadOwnershipTransfer: { mode: "transfer", threadId: "1777798507.674009" },
},
},
},
],
new Map(),
"worker-owner-token",
);

expect(result).toEqual({ changed: false, threadUpdates: [] });
});

it("returns changed=false when the transferred thread context is already cached", () => {
const threads = new Map<string, FollowerThreadState>([
[
"1777798507.674009",
{
channelId: "C123",
threadTs: "1777798507.674009",
userId: "broker-1",
owner: "worker-owner-token",
source: "slack",
},
],
]);

const result = syncFollowerTransferredThreadContext(
[
{
message: {
threadId: "a2a:broker-1:worker-1",
sender: "broker-1",
body: "please take over",
metadata: {
a2a: true,
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1777798507.674009",
source: "slack",
channel: "C123",
},
},
},
},
],
threads,
"worker-owner-token",
);

expect(result.changed).toBe(false);
expect(result.threadUpdates).toHaveLength(1);
});
});

// ─── resolveFollowerThreadChannel ─────────────────────────

describe("resolveFollowerThreadChannel", () => {
Expand Down
62 changes: 62 additions & 0 deletions slack-bridge/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2133,6 +2133,68 @@ export interface FollowerInboxSyncResult {
changed: boolean;
}

export interface FollowerTransferredThreadContextSyncResult {
threadUpdates: FollowerThreadState[];
changed: boolean;
}

function getFollowerTransferredThreadContext(entry: FollowerInboxEntry): {
threadId: string;
channel: string;
source: string;
} | null {
const metadata = entry.message.metadata ?? {};
const transfer = asRecord(metadata.threadOwnershipTransfer);
if (!transfer) return null;

const mode = asString(transfer.mode);
if (mode && mode !== "transfer") return null;

const threadId = asString(transfer.threadId);
const channel = asString(transfer.channel);
const source = asString(transfer.source) ?? "slack";
if (!threadId || !channel || source !== "slack") return null;

return { threadId, channel, source };
}

export function syncFollowerTransferredThreadContext(
entries: FollowerInboxEntry[],
existingThreads: ReadonlyMap<string, FollowerThreadState>,
agentOwner: string,
): FollowerTransferredThreadContextSyncResult {
let changed = false;
const threadUpdates: FollowerThreadState[] = [];

for (const entry of entries) {
const transferred = getFollowerTransferredThreadContext(entry);
if (!transferred) continue;

const existing = existingThreads.get(transferred.threadId);
const nextThread: FollowerThreadState = {
channelId: transferred.channel,
threadTs: transferred.threadId,
userId: existing?.userId || entry.message.sender || "",
owner: agentOwner,
source: transferred.source,
};

if (
!existing ||
existing.channelId !== nextThread.channelId ||
existing.userId !== nextThread.userId ||
existing.owner !== nextThread.owner ||
existing.source !== nextThread.source
) {
changed = true;
}

threadUpdates.push(nextThread);
}

return { threadUpdates, changed };
}

export interface BrokerInboxControlEntry {
inboxId: number;
command: PinetControlCommand;
Expand Down
7 changes: 6 additions & 1 deletion slack-bridge/pinet-mesh-ops.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,12 @@ describe("createPinetMeshOps", () => {
});
expect(transferThreadOwnership).toHaveBeenCalledWith("1777798507.674009", "worker-1");
expect(insertedMessages[0]?.metadata).toMatchObject({
threadOwnershipTransfer: { mode: "transfer", threadId: "1777798507.674009" },
threadOwnershipTransfer: {
mode: "transfer",
threadId: "1777798507.674009",
source: "slack",
channel: "C123",
},
senderAgent: "Broker Crane",
a2a: true,
});
Expand Down
28 changes: 27 additions & 1 deletion slack-bridge/pinet-mesh-ops.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,28 @@ function getThreadOwnershipTransferId(metadata?: Record<string, unknown>): strin
return typeof threadId === "string" && threadId.trim().length > 0 ? threadId.trim() : null;
}

function enrichThreadOwnershipTransferMetadata(
metadata: Record<string, unknown> | undefined,
thread: PinetMeshOpsTransferableThread,
): Record<string, unknown> {
const transfer = metadata?.threadOwnershipTransfer;
const transferRecord =
transfer && typeof transfer === "object" && !Array.isArray(transfer)
? (transfer as Record<string, unknown>)
: {};

return {
...(metadata ?? {}),
threadOwnershipTransfer: {
...transferRecord,
mode: "transfer",
threadId: thread.threadId,
...(thread.source ? { source: thread.source } : {}),
...(thread.channel ? { channel: thread.channel } : {}),
},
};
}

function parseGitHubRemoteRepo(remoteUrl: string): { repoOwner: string; repoName: string } | null {
const match = remoteUrl.match(/github\.com[:/]([A-Za-z0-9_.-]+)\/([A-Za-z0-9_.-]+?)(?:\.git)?$/i);
if (!match?.[1] || !match[2]) {
Expand Down Expand Up @@ -204,12 +226,16 @@ export function createPinetMeshOps(deps: PinetMeshOpsDeps): PinetMeshOps {
throw new Error(`Thread ${transferThreadId} is not a transferable Slack thread.`);
}

const dispatchMetadata = transferThread
? enrichThreadOwnershipTransferMetadata(finalMetadata, transferThread)
: finalMetadata;

const result = dispatchDirectAgentMessage(db, {
senderAgentId: selfId,
senderAgentName: deps.getAgentName(),
target: targetRef,
body: finalBody,
metadata: finalMetadata,
metadata: dispatchMetadata,
});

if (transferThreadId) {
Expand Down
Loading