diff --git a/package.json b/package.json index 3998f61..d850689 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ }, "devDependencies": { "@vitest/coverage-v8": "^3.1.0", - "openclaw": "2026.3.23", + "openclaw": "^2026.4.9", "silk-wasm": "^3.7.1", "typescript": "^5.8.0", "vitest": "^3.1.0" diff --git a/src/auth/pairing.test.ts b/src/auth/pairing.test.ts index c49e25a..217f1fa 100644 --- a/src/auth/pairing.test.ts +++ b/src/auth/pairing.test.ts @@ -16,9 +16,16 @@ const mockWithFileLock = vi.hoisted(() => vi.fn(async (_path: string, _opts: unknown, fn: () => Promise) => fn()), ); +// Mock both the legacy root barrel and the subpath that pairing.ts actually +// imports from. Starting with openclaw 2026.4.x the infra-runtime subpath is +// a distinct module graph node, so mocking only the root barrel no longer +// intercepts `import { withFileLock } from "openclaw/plugin-sdk/infra-runtime"`. vi.mock("openclaw/plugin-sdk", () => ({ withFileLock: mockWithFileLock, })); +vi.mock("openclaw/plugin-sdk/infra-runtime", () => ({ + withFileLock: mockWithFileLock, +})); let tmpDir: string; diff --git a/src/channel.ts b/src/channel.ts index 3640b3a..01a3622 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -28,6 +28,7 @@ import type { WeixinQrStartResult, WeixinQrWaitResult } from "./auth/login-qr.js // command-auth chain during plugin registration, which can re-enter plugin/provider registry // resolution before the account actually starts. import { sendWeixinMediaFile } from "./messaging/send-media.js"; +import { createWeixinThreadBindingManager } from "./thread-bindings.js"; import { sendMessageWeixin, StreamingMarkdownFilter } from "./messaging/send.js"; import { downloadRemoteImageToTemp } from "./cdn/upload.js"; @@ -167,6 +168,26 @@ export const weixinPlugin: ChannelPlugin = { looksLikeId: (raw) => raw.endsWith("@im.wechat"), }, }, + // Expose ACP / subagent thread binding support. WeChat is 1:1 direct chat + // today (no group / forum topic semantics), so we only offer the `current` + // placement and treat the inbound conversationId (the sender's WeChat id) + // as the canonical conversation ref with no parent. The actual binding + // lifetime is managed by createWeixinThreadBindingManager; the framework + // calls the hook below to dedupe to that per-account manager. + conversationBindings: { + supportsCurrentConversationBinding: true, + defaultTopLevelPlacement: "current", + resolveConversationRef: ({ conversationId }) => { + const trimmed = conversationId?.trim() ?? ""; + return trimmed ? { conversationId: trimmed } : null; + }, + createManager: ({ accountId }) => + createWeixinThreadBindingManager({ + accountId: accountId ?? undefined, + persist: false, + enableSweeper: false, + }), + }, agentPrompt: { messageToolHints: () => [ "To send an image or file to the current user, use the message tool with action='send' and set 'media' to a local file path or a remote URL. You do not need to specify 'to' — the current conversation recipient is used automatically.", @@ -365,6 +386,16 @@ export const weixinPlugin: ChannelPlugin = { const aLog = logger.withAccount(account.accountId); aLog.debug(`about to call monitorWeixinProvider`); restoreContextTokens(account.accountId); + // Eagerly initialise the ACP thread binding manager for this account so + // it loads the persistent store and starts its idle sweeper before any + // inbound traffic arrives. Subsequent framework-driven calls to + // createManager() (via conversationBindings below) will be deduped and + // return this same instance. + try { + createWeixinThreadBindingManager({ accountId: account.accountId }); + } catch (err) { + aLog.warn(`thread binding manager init failed: ${String(err)}`); + } aLog.info(`starting weixin webhook`); ctx.setStatus?.({ diff --git a/src/runtime.test.ts b/src/runtime.test.ts new file mode 100644 index 0000000..77c8795 --- /dev/null +++ b/src/runtime.test.ts @@ -0,0 +1,68 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; + +// runtime.ts pulls in logger which in turn reaches for fs/stream paths. Stub +// the logger so these thin unit tests don't race with real log writes. +vi.mock("./util/logger.js", () => ({ + logger: { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +// biome-ignore lint/suspicious/noExplicitAny: tests treat PluginRuntime shape as opaque +type AnyRuntime = any; + +const MOCK_RUNTIME: AnyRuntime = { + channel: { id: "openclaw-weixin" }, +}; + +async function loadRuntime() { + vi.resetModules(); + return import("./runtime.js"); +} + +describe("weixin runtime singleton", () => { + afterEach(() => { + vi.resetModules(); + }); + + it("setWeixinRuntime + getWeixinRuntime round-trip", async () => { + const mod = await loadRuntime(); + mod.setWeixinRuntime(MOCK_RUNTIME); + expect(mod.getWeixinRuntime()).toBe(MOCK_RUNTIME); + }); + + it("getWeixinRuntime throws before initialization", async () => { + const mod = await loadRuntime(); + expect(() => mod.getWeixinRuntime()).toThrow(/not initialized/); + }); + + it("waitForWeixinRuntime returns immediately when already set", async () => { + const mod = await loadRuntime(); + mod.setWeixinRuntime(MOCK_RUNTIME); + const result = await mod.waitForWeixinRuntime(100); + expect(result).toBe(MOCK_RUNTIME); + }); + + it("waitForWeixinRuntime times out when never set", async () => { + const mod = await loadRuntime(); + await expect(mod.waitForWeixinRuntime(50)).rejects.toThrow(/timeout/); + }); + + it("resolveWeixinChannelRuntime prefers the channelRuntime param when provided", async () => { + const mod = await loadRuntime(); + const injected = { id: "from-ctx" }; + // biome-ignore lint/suspicious/noExplicitAny: test fixture + const result = await mod.resolveWeixinChannelRuntime({ channelRuntime: injected as any }); + expect(result).toBe(injected); + }); + + it("resolveWeixinChannelRuntime falls back to the module-global when no ctx is passed", async () => { + const mod = await loadRuntime(); + mod.setWeixinRuntime(MOCK_RUNTIME); + const result = await mod.resolveWeixinChannelRuntime({}); + expect(result).toBe(MOCK_RUNTIME.channel); + }); +}); diff --git a/src/thread-bindings.test.ts b/src/thread-bindings.test.ts new file mode 100644 index 0000000..e661a70 --- /dev/null +++ b/src/thread-bindings.test.ts @@ -0,0 +1,690 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// Prevent the sweeper from firing during tests and avoid touching the real +// conversation-runtime state shared with a running gateway. vi.hoisted is +// required because vi.mock factories are hoisted above top-level locals. +const { registerSessionBindingAdapterMock, unregisterSessionBindingAdapterMock } = vi.hoisted( + () => ({ + registerSessionBindingAdapterMock: vi.fn(), + unregisterSessionBindingAdapterMock: vi.fn(), + }), +); + +vi.mock("openclaw/plugin-sdk/conversation-runtime", () => ({ + registerSessionBindingAdapter: registerSessionBindingAdapterMock, + unregisterSessionBindingAdapter: unregisterSessionBindingAdapterMock, + resolveThreadBindingConversationIdFromBindingId: vi.fn( + ({ accountId, bindingId }: { accountId: string; bindingId: string }) => { + const prefix = `${accountId}:`; + return bindingId.startsWith(prefix) ? bindingId.slice(prefix.length) : null; + }, + ), + resolveThreadBindingEffectiveExpiresAt: vi.fn(() => 0), + formatThreadBindingDurationLabel: vi.fn((ms: number) => `${ms}ms`), +})); + +vi.mock("openclaw/plugin-sdk/json-store", () => ({ + writeJsonFileAtomically: vi.fn(async (filePath: string, data: unknown) => { + await fs.promises.mkdir(path.dirname(filePath), { recursive: true }); + await fs.promises.writeFile(filePath, JSON.stringify(data, null, 2), "utf-8"); + }), +})); + +vi.mock("openclaw/plugin-sdk/routing", () => ({ + normalizeAccountId: (raw?: string | null) => (raw ?? "").trim() || "default", +})); + +vi.mock("openclaw/plugin-sdk/runtime-env", () => ({ + logVerbose: vi.fn(), +})); + +const { stateDirRef } = vi.hoisted(() => ({ stateDirRef: { current: "" } })); + +vi.mock("openclaw/plugin-sdk/state-paths", () => ({ + resolveStateDir: (_env: NodeJS.ProcessEnv, _homedir: typeof os.homedir) => stateDirRef.current, +})); + +vi.mock("openclaw/plugin-sdk/text-runtime", () => ({ + normalizeOptionalString: (raw: unknown) => { + if (typeof raw !== "string") { + return undefined; + } + const trimmed = raw.trim(); + return trimmed ? trimmed : undefined; + }, +})); + +import { + __testing, + createWeixinThreadBindingManager, + getWeixinThreadBindingManager, + registerWeixinThreadBindings, +} from "./thread-bindings.js"; + +/** + * Pull the SessionBindingAdapter that was registered with the mocked + * registerSessionBindingAdapter. Lets tests exercise bind/resolve/touch/unbind + * through the same entry point the ACP runtime uses. + */ +// biome-ignore lint/suspicious/noExplicitAny: test seam +function lastRegisteredAdapter(): any { + const calls = registerSessionBindingAdapterMock.mock.calls; + expect(calls.length).toBeGreaterThan(0); + return calls[calls.length - 1]?.[0]; +} + +const CHANNEL = "openclaw-weixin"; + +describe("weixin thread bindings", () => { + beforeEach(async () => { + stateDirRef.current = await fs.promises.mkdtemp( + path.join(os.tmpdir(), "openclaw-weixin-bindings-"), + ); + await __testing.stopAllWeixinThreadBindings(); + registerSessionBindingAdapterMock.mockClear(); + unregisterSessionBindingAdapterMock.mockClear(); + }); + + afterEach(async () => { + await __testing.stopAllWeixinThreadBindings(); + if (stateDirRef.current) { + await fs.promises.rm(stateDirRef.current, { recursive: true, force: true }); + stateDirRef.current = ""; + } + }); + + it("dedupes manager instances per accountId", () => { + const a = createWeixinThreadBindingManager({ + accountId: "acc1", + enableSweeper: false, + persist: false, + }); + const b = createWeixinThreadBindingManager({ + accountId: "acc1", + enableSweeper: false, + persist: false, + }); + expect(a).toBe(b); + expect(getWeixinThreadBindingManager("acc1")).toBe(a); + expect(registerSessionBindingAdapterMock).toHaveBeenCalledTimes(1); + }); + + it("isolates bindings across accounts", () => { + const mgrA = createWeixinThreadBindingManager({ + accountId: "accA", + enableSweeper: false, + persist: false, + }); + const mgrB = createWeixinThreadBindingManager({ + accountId: "accB", + enableSweeper: false, + persist: false, + }); + expect(mgrA).not.toBe(mgrB); + expect(mgrA.listBindings()).toEqual([]); + expect(mgrB.listBindings()).toEqual([]); + expect(registerSessionBindingAdapterMock).toHaveBeenCalledTimes(2); + }); + + it("resolves the bindings store path under the state dir", () => { + const bindingsPath = __testing.resolveBindingsPath("accX"); + expect(bindingsPath).toBe( + path.join(stateDirRef.current, "openclaw-weixin", "thread-bindings-accX.json"), + ); + }); + + it("returns null when touchConversation is called on an unknown id", () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accTouch", + enableSweeper: false, + persist: false, + }); + expect(mgr.touchConversation("not-bound@im.wechat")).toBeNull(); + }); + + it("binds a current-placement ACP conversation and makes it resolvable", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accBind", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + const result = await adapter.bind({ + targetSessionKey: "agent:claude:acp:test-session", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accBind", + conversationId: "user-1@im.wechat", + }, + metadata: { label: "Test bind", agentId: "main" }, + }); + + expect(result).not.toBeNull(); + expect(result.targetSessionKey).toBe("agent:claude:acp:test-session"); + expect(result.conversation).toEqual({ + channel: CHANNEL, + accountId: "accBind", + conversationId: "user-1@im.wechat", + }); + + expect(mgr.listBindings()).toHaveLength(1); + expect(mgr.getByConversationId("user-1@im.wechat")?.targetSessionKey).toBe( + "agent:claude:acp:test-session", + ); + + const lookup = adapter.resolveByConversation({ + channel: CHANNEL, + accountId: "accBind", + conversationId: "user-1@im.wechat", + }); + expect(lookup?.targetSessionKey).toBe("agent:claude:acp:test-session"); + }); + + it("rejects child placement (no group/forum topic support)", async () => { + createWeixinThreadBindingManager({ + accountId: "accChild", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + const result = await adapter.bind({ + targetSessionKey: "agent:claude:acp:x", + targetKind: "session", + placement: "child", + conversation: { + channel: CHANNEL, + accountId: "accChild", + conversationId: "user-1@im.wechat", + }, + }); + + expect(result).toBeNull(); + }); + + it("rejects bind from a foreign channel", async () => { + createWeixinThreadBindingManager({ + accountId: "accChan", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + const result = await adapter.bind({ + targetSessionKey: "agent:claude:acp:x", + targetKind: "session", + placement: "current", + conversation: { + channel: "telegram", + accountId: "accChan", + conversationId: "user-1@im.wechat", + }, + }); + + expect(result).toBeNull(); + }); + + it("rejects bind when targetSessionKey is blank", async () => { + createWeixinThreadBindingManager({ + accountId: "accBlank", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + const result = await adapter.bind({ + targetSessionKey: " ", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accBlank", + conversationId: "user-1@im.wechat", + }, + }); + + expect(result).toBeNull(); + }); + + it("touchConversation bumps lastActivityAt", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accTouch2", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:tt", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accTouch2", + conversationId: "user-touch@im.wechat", + }, + }); + + const beforeTouch = mgr.getByConversationId("user-touch@im.wechat"); + expect(beforeTouch).toBeDefined(); + const touchedAt = (beforeTouch?.lastActivityAt ?? 0) + 10_000; + const touched = mgr.touchConversation("user-touch@im.wechat", touchedAt); + expect(touched?.lastActivityAt).toBe(touchedAt); + }); + + it("unbinds by bindingId", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accUnbind", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:u1", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accUnbind", + conversationId: "user-unbind@im.wechat", + }, + }); + expect(mgr.listBindings()).toHaveLength(1); + + const removed = await adapter.unbind({ + bindingId: "accUnbind:user-unbind@im.wechat", + }); + expect(removed).toHaveLength(1); + expect(mgr.listBindings()).toHaveLength(0); + }); + + it("unbinds all bindings sharing a targetSessionKey", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accUnbindKey", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + for (const conv of ["a@im.wechat", "b@im.wechat"]) { + await adapter.bind({ + targetSessionKey: "agent:claude:acp:shared", + targetKind: "session", + placement: "current", + conversation: { channel: CHANNEL, accountId: "accUnbindKey", conversationId: conv }, + }); + } + expect(mgr.listBindings()).toHaveLength(2); + + const removed = await adapter.unbind({ + targetSessionKey: "agent:claude:acp:shared", + }); + expect(removed).toHaveLength(2); + expect(mgr.listBindings()).toHaveLength(0); + }); + + it("listBySession returns only bindings with matching targetSessionKey", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accListBySession", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:first", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accListBySession", + conversationId: "u-a@im.wechat", + }, + }); + await adapter.bind({ + targetSessionKey: "agent:claude:acp:second", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accListBySession", + conversationId: "u-b@im.wechat", + }, + }); + + const first = adapter.listBySession("agent:claude:acp:first"); + expect(first).toHaveLength(1); + expect(first[0].conversation.conversationId).toBe("u-a@im.wechat"); + + const missing = adapter.listBySession("agent:claude:acp:nope"); + expect(missing).toEqual([]); + + expect(mgr.listBindings()).toHaveLength(2); + }); + + it("persists bindings to disk when persist=true and reloads them on restart", async () => { + const mgrA = createWeixinThreadBindingManager({ + accountId: "accPersist", + enableSweeper: false, + persist: true, + }); + const adapterA = lastRegisteredAdapter(); + + await adapterA.bind({ + targetSessionKey: "agent:claude:acp:persist", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accPersist", + conversationId: "user-p@im.wechat", + }, + }); + // Allow the fire-and-forget persist enqueue to drain. + await new Promise((resolve) => setImmediate(resolve)); + + const bindingsFile = __testing.resolveBindingsPath("accPersist"); + const rawAfterBind = await fs.promises.readFile(bindingsFile, "utf-8"); + expect(rawAfterBind).toContain("user-p@im.wechat"); + + // Drop the in-memory state and recreate; the manager must rehydrate + // from the persisted file. + await __testing.stopAllWeixinThreadBindings(); + registerSessionBindingAdapterMock.mockClear(); + + const mgrB = createWeixinThreadBindingManager({ + accountId: "accPersist", + enableSweeper: false, + persist: true, + }); + expect(mgrB).not.toBe(mgrA); + const reloaded = mgrB.getByConversationId("user-p@im.wechat"); + expect(reloaded?.targetSessionKey).toBe("agent:claude:acp:persist"); + }); + + it("registerWeixinThreadBindings is a thin wrapper over createWeixinThreadBindingManager", () => { + const mgr = registerWeixinThreadBindings({ + accountId: "accRegister", + idleTimeoutMs: 5_000, + }); + expect(getWeixinThreadBindingManager("accRegister")).toBe(mgr); + expect(mgr.getIdleTimeoutMs()).toBe(5_000); + }); + + it("stops the manager and clears registry on stopAllWeixinThreadBindings", async () => { + createWeixinThreadBindingManager({ + accountId: "accStop", + enableSweeper: false, + persist: false, + }); + expect(getWeixinThreadBindingManager("accStop")).not.toBeNull(); + await __testing.stopAllWeixinThreadBindings(); + expect(getWeixinThreadBindingManager("accStop")).toBeNull(); + expect(unregisterSessionBindingAdapterMock).toHaveBeenCalled(); + }); + + it("merges metadata into an existing binding when rebinding the same conversationId", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accRebind", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:v1", + targetKind: "session", + placement: "current", + conversation: { channel: CHANNEL, accountId: "accRebind", conversationId: "u@im.wechat" }, + metadata: { agentId: "main", label: "first", boundBy: "user" }, + }); + const first = mgr.getByConversationId("u@im.wechat"); + expect(first?.agentId).toBe("main"); + expect(first?.label).toBe("first"); + + // Rebind without agentId — should preserve existing one via the merge path. + await adapter.bind({ + targetSessionKey: "agent:claude:acp:v2", + targetKind: "session", + placement: "current", + conversation: { channel: CHANNEL, accountId: "accRebind", conversationId: "u@im.wechat" }, + metadata: { label: "second" }, + }); + const second = mgr.getByConversationId("u@im.wechat"); + expect(second?.targetSessionKey).toBe("agent:claude:acp:v2"); + expect(second?.agentId).toBe("main"); // carried over from the first bind + expect(second?.label).toBe("second"); // overwritten by the second bind + }); + + it("reads idleTimeoutMs and maxAgeMs overrides from metadata", async () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accTimeouts", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:t", + targetKind: "session", + placement: "current", + conversation: { channel: CHANNEL, accountId: "accTimeouts", conversationId: "ttl@im.wechat" }, + metadata: { idleTimeoutMs: 2_000, maxAgeMs: 10_000 }, + }); + const record = mgr.getByConversationId("ttl@im.wechat"); + expect(record?.idleTimeoutMs).toBe(2_000); + expect(record?.maxAgeMs).toBe(10_000); + }); + + it("returns null when resolveByConversation is called with a foreign channel or empty id", () => { + createWeixinThreadBindingManager({ + accountId: "accResolve", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + + expect( + adapter.resolveByConversation({ + channel: "telegram", + accountId: "accResolve", + conversationId: "x@im.wechat", + }), + ).toBeNull(); + expect( + adapter.resolveByConversation({ + channel: CHANNEL, + accountId: "accResolve", + conversationId: "", + }), + ).toBeNull(); + }); + + it("handles loadBindingsFromDisk for missing file (ENOENT) gracefully", () => { + // Point at an accountId whose store file has never been written. + const mgr = createWeixinThreadBindingManager({ + accountId: "accMissing", + enableSweeper: false, + persist: true, + }); + expect(mgr.listBindings()).toEqual([]); + }); + + it("ignores on-disk bindings with wrong store version", async () => { + // Hand-write a bogus store file, then spin up a manager and expect it + // to reject the entire file (version mismatch) rather than crash. + const storePath = __testing.resolveBindingsPath("accVersion"); + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + await fs.promises.writeFile( + storePath, + JSON.stringify({ + version: 999, + bindings: [ + { + conversationId: "u@im.wechat", + targetSessionKey: "agent:claude:acp:x", + boundAt: Date.now(), + lastActivityAt: Date.now(), + }, + ], + }), + "utf-8", + ); + + const mgr = createWeixinThreadBindingManager({ + accountId: "accVersion", + enableSweeper: false, + persist: true, + }); + expect(mgr.listBindings()).toEqual([]); + }); + + it("ignores on-disk bindings with missing conversationId or targetSessionKey", async () => { + const storePath = __testing.resolveBindingsPath("accBadEntries"); + await fs.promises.mkdir(path.dirname(storePath), { recursive: true }); + await fs.promises.writeFile( + storePath, + JSON.stringify({ + version: 1, + bindings: [ + { targetSessionKey: "agent:claude:acp:x", boundAt: 1, lastActivityAt: 1 }, // no conversationId + { conversationId: "u@im.wechat", boundAt: 1, lastActivityAt: 1 }, // no targetSessionKey + { + conversationId: "good@im.wechat", + targetSessionKey: "agent:claude:acp:ok", + boundAt: 1, + lastActivityAt: 1, + idleTimeoutMs: 500, + maxAgeMs: 1_000, + agentId: "main", + label: "ok", + boundBy: "user", + metadata: { hint: "survive" }, + }, + ], + }), + "utf-8", + ); + + const mgr = createWeixinThreadBindingManager({ + accountId: "accBadEntries", + enableSweeper: false, + persist: true, + }); + const bindings = mgr.listBindings(); + expect(bindings).toHaveLength(1); + expect(bindings[0]?.conversationId).toBe("good@im.wechat"); + expect(bindings[0]?.idleTimeoutMs).toBe(500); + expect(bindings[0]?.agentId).toBe("main"); + }); + + it("sweeper removes idle-expired bindings", async () => { + vi.useFakeTimers(); + try { + const mgr = createWeixinThreadBindingManager({ + accountId: "accSweep", + enableSweeper: true, + persist: false, + idleTimeoutMs: 100, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:sweep", + targetKind: "session", + placement: "current", + conversation: { channel: CHANNEL, accountId: "accSweep", conversationId: "sw@im.wechat" }, + }); + expect(mgr.listBindings()).toHaveLength(1); + + // Advance wall clock past idleTimeoutMs, then tick the sweeper interval. + vi.advanceTimersByTime(61_000); + expect(mgr.listBindings()).toHaveLength(0); + } finally { + vi.useRealTimers(); + } + }); + + it("normalizes bogus durations to the fallback", () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accNorm", + enableSweeper: false, + persist: false, + // Intentionally non-numeric to hit the normalizeDurationMs fallback. + idleTimeoutMs: Number.NaN, + maxAgeMs: -5, + }); + expect(mgr.getIdleTimeoutMs()).toBe(24 * 60 * 60 * 1000); + expect(mgr.getMaxAgeMs()).toBe(0); + }); + + it("unbind returns [] when the bindingId cannot be resolved", async () => { + createWeixinThreadBindingManager({ + accountId: "accBadBinding", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + const result = await adapter.unbind({ bindingId: "garbage" }); + expect(result).toEqual([]); + }); + + it("touch is a no-op when the bindingId cannot be resolved", () => { + const mgr = createWeixinThreadBindingManager({ + accountId: "accTouchBad", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + adapter.touch("garbage"); + expect(mgr.listBindings()).toEqual([]); + }); + + it("adapter listBySession returns [] for a blank targetSessionKey", () => { + createWeixinThreadBindingManager({ + accountId: "accListBlank", + enableSweeper: false, + persist: false, + }); + const adapter = lastRegisteredAdapter(); + expect(adapter.listBySession(" ")).toEqual([]); + }); + + it("sweeper leaves non-expired bindings alone between ticks", async () => { + vi.useFakeTimers(); + try { + const mgr = createWeixinThreadBindingManager({ + accountId: "accSweepKeep", + enableSweeper: true, + persist: false, + // Very long idle timeout so a single sweep tick never expires. + idleTimeoutMs: 60 * 60 * 1000, + }); + const adapter = lastRegisteredAdapter(); + + await adapter.bind({ + targetSessionKey: "agent:claude:acp:keep", + targetKind: "session", + placement: "current", + conversation: { + channel: CHANNEL, + accountId: "accSweepKeep", + conversationId: "keep@im.wechat", + }, + }); + + vi.advanceTimersByTime(61_000); // one sweep interval tick + expect(mgr.listBindings()).toHaveLength(1); + } finally { + vi.useRealTimers(); + } + }); + +}); diff --git a/src/thread-bindings.ts b/src/thread-bindings.ts new file mode 100644 index 0000000..0d0cb8b --- /dev/null +++ b/src/thread-bindings.ts @@ -0,0 +1,743 @@ +// WeChat thread bindings adapter. +// +// Lets the openclaw ACP runtime keep a persistent session bound to a WeChat +// 1:1 conversation, so `sessions_spawn(runtime="acp", mode="session", +// thread=true)` can actually route follow-up prompts back to the same +// Claude Code / Codex / Gemini CLI session instead of failing with +// `thread_binding_invalid: Thread bindings are unavailable for openclaw-weixin`. +// +// Structurally this is a slimmed-down port of the Telegram adapter in the +// openclaw monorepo (extensions/telegram/src/thread-bindings.ts), with the +// forum-topic / group handling removed because the WeChat plugin currently +// treats all inbound traffic as 1:1 direct chat. Conversation identifiers are +// therefore just the sender's WeChat id (e.g. `xxx@im.wechat`), scoped per +// WeChat bot accountId. + +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { + formatThreadBindingDurationLabel, + registerSessionBindingAdapter, + resolveThreadBindingConversationIdFromBindingId, + resolveThreadBindingEffectiveExpiresAt, + unregisterSessionBindingAdapter, + type BindingTargetKind, + type SessionBindingAdapter, + type SessionBindingRecord, +} from "openclaw/plugin-sdk/conversation-runtime"; +import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; +import { normalizeAccountId } from "openclaw/plugin-sdk/routing"; +import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; +import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; + +const WEIXIN_CHANNEL_ID = "openclaw-weixin"; +const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; +const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; +const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000; +const STORE_VERSION = 1; + +type WeixinBindingTargetKind = "subagent" | "acp"; + +export type WeixinThreadBindingRecord = { + accountId: string; + conversationId: string; + targetKind: WeixinBindingTargetKind; + targetSessionKey: string; + agentId?: string; + label?: string; + boundBy?: string; + boundAt: number; + lastActivityAt: number; + idleTimeoutMs?: number; + maxAgeMs?: number; + metadata?: Record; +}; + +type StoredWeixinBindingState = { + version: number; + bindings: WeixinThreadBindingRecord[]; +}; + +export type WeixinThreadBindingManager = { + accountId: string; + shouldPersistMutations: () => boolean; + getIdleTimeoutMs: () => number; + getMaxAgeMs: () => number; + getByConversationId: (conversationId: string) => WeixinThreadBindingRecord | undefined; + listBySessionKey: (targetSessionKey: string) => WeixinThreadBindingRecord[]; + listBindings: () => WeixinThreadBindingRecord[]; + touchConversation: (conversationId: string, at?: number) => WeixinThreadBindingRecord | null; + unbindConversation: (params: { conversationId: string; reason?: string }) => WeixinThreadBindingRecord | null; + unbindBySessionKey: (params: { targetSessionKey: string; reason?: string }) => WeixinThreadBindingRecord[]; + stop: () => void; +}; + +type WeixinThreadBindingsState = { + managersByAccountId: Map; + bindingsByAccountConversation: Map; + persistQueueByAccountId: Map>; +}; + +/** + * Keep WeChat thread binding state shared across bundled chunks so routing, + * binding lookups, and binding mutations all observe the same live registry. + */ +const WEIXIN_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.weixinThreadBindingsState"); +let threadBindingsState: WeixinThreadBindingsState | undefined; + +function getThreadBindingsState(): WeixinThreadBindingsState { + if (!threadBindingsState) { + const globalStore = globalThis as Record; + threadBindingsState = (globalStore[WEIXIN_THREAD_BINDINGS_STATE_KEY] as + | WeixinThreadBindingsState + | undefined) ?? { + managersByAccountId: new Map(), + bindingsByAccountConversation: new Map(), + persistQueueByAccountId: new Map>(), + }; + globalStore[WEIXIN_THREAD_BINDINGS_STATE_KEY] = threadBindingsState; + } + return threadBindingsState; +} + +function normalizeDurationMs(raw: unknown, fallback: number): number { + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return fallback; + } + return Math.max(0, Math.floor(raw)); +} + +function resolveBindingKey(params: { accountId: string; conversationId: string }): string { + return `${params.accountId}:${params.conversationId}`; +} + +function toSessionBindingTargetKind(raw: WeixinBindingTargetKind): BindingTargetKind { + return raw === "subagent" ? "subagent" : "session"; +} + +function toWeixinTargetKind(raw: BindingTargetKind): WeixinBindingTargetKind { + return raw === "subagent" ? "subagent" : "acp"; +} + +function toSessionBindingRecord( + record: WeixinThreadBindingRecord, + defaults: { idleTimeoutMs: number; maxAgeMs: number }, +): SessionBindingRecord { + return { + bindingId: resolveBindingKey({ + accountId: record.accountId, + conversationId: record.conversationId, + }), + targetSessionKey: record.targetSessionKey, + targetKind: toSessionBindingTargetKind(record.targetKind), + conversation: { + channel: WEIXIN_CHANNEL_ID, + accountId: record.accountId, + conversationId: record.conversationId, + }, + status: "active", + boundAt: record.boundAt, + expiresAt: resolveThreadBindingEffectiveExpiresAt({ + record, + defaultIdleTimeoutMs: defaults.idleTimeoutMs, + defaultMaxAgeMs: defaults.maxAgeMs, + }), + metadata: { + agentId: record.agentId, + label: record.label, + boundBy: record.boundBy, + lastActivityAt: record.lastActivityAt, + idleTimeoutMs: + typeof record.idleTimeoutMs === "number" + ? Math.max(0, Math.floor(record.idleTimeoutMs)) + : defaults.idleTimeoutMs, + maxAgeMs: + typeof record.maxAgeMs === "number" + ? Math.max(0, Math.floor(record.maxAgeMs)) + : defaults.maxAgeMs, + ...record.metadata, + }, + }; +} + +function fromSessionBindingInput(params: { + accountId: string; + input: { + targetSessionKey: string; + targetKind: BindingTargetKind; + conversationId: string; + metadata?: Record; + }; +}): WeixinThreadBindingRecord { + const now = Date.now(); + const metadata = params.input.metadata ?? {}; + const existing = getThreadBindingsState().bindingsByAccountConversation.get( + resolveBindingKey({ + accountId: params.accountId, + conversationId: params.input.conversationId, + }), + ); + + const record: WeixinThreadBindingRecord = { + accountId: params.accountId, + conversationId: params.input.conversationId, + targetKind: toWeixinTargetKind(params.input.targetKind), + targetSessionKey: params.input.targetSessionKey, + agentId: + typeof metadata.agentId === "string" && metadata.agentId.trim() + ? metadata.agentId.trim() + : existing?.agentId, + label: + typeof metadata.label === "string" && metadata.label.trim() + ? metadata.label.trim() + : existing?.label, + boundBy: + typeof metadata.boundBy === "string" && metadata.boundBy.trim() + ? metadata.boundBy.trim() + : existing?.boundBy, + boundAt: now, + lastActivityAt: now, + metadata: { + ...existing?.metadata, + ...metadata, + }, + }; + + if (typeof metadata.idleTimeoutMs === "number" && Number.isFinite(metadata.idleTimeoutMs)) { + record.idleTimeoutMs = Math.max(0, Math.floor(metadata.idleTimeoutMs)); + } else if (typeof existing?.idleTimeoutMs === "number") { + record.idleTimeoutMs = existing.idleTimeoutMs; + } + + if (typeof metadata.maxAgeMs === "number" && Number.isFinite(metadata.maxAgeMs)) { + record.maxAgeMs = Math.max(0, Math.floor(metadata.maxAgeMs)); + } else if (typeof existing?.maxAgeMs === "number") { + record.maxAgeMs = existing.maxAgeMs; + } + + return record; +} + +function resolveBindingsPath(accountId: string, env: NodeJS.ProcessEnv = process.env): string { + const stateDir = resolveStateDir(env, os.homedir); + return path.join(stateDir, "openclaw-weixin", `thread-bindings-${accountId}.json`); +} + +function summarizeLifecycleForLog( + record: WeixinThreadBindingRecord, + defaults: { idleTimeoutMs: number; maxAgeMs: number }, +): string { + const idleTimeoutMs = + typeof record.idleTimeoutMs === "number" ? record.idleTimeoutMs : defaults.idleTimeoutMs; + const maxAgeMs = typeof record.maxAgeMs === "number" ? record.maxAgeMs : defaults.maxAgeMs; + const idleLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(idleTimeoutMs))); + const maxAgeLabel = formatThreadBindingDurationLabel(Math.max(0, Math.floor(maxAgeMs))); + return `idle=${idleLabel} maxAge=${maxAgeLabel}`; +} + +function loadBindingsFromDisk(accountId: string): WeixinThreadBindingRecord[] { + const filePath = resolveBindingsPath(accountId); + try { + const raw = fs.readFileSync(filePath, "utf-8"); + const parsed = JSON.parse(raw) as StoredWeixinBindingState; + if (parsed?.version !== STORE_VERSION || !Array.isArray(parsed.bindings)) { + return []; + } + const bindings: WeixinThreadBindingRecord[] = []; + for (const entry of parsed.bindings) { + const conversationId = normalizeOptionalString(entry?.conversationId); + const targetSessionKey = normalizeOptionalString(entry?.targetSessionKey) ?? ""; + const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp"; + if (!conversationId || !targetSessionKey) { + continue; + } + const boundAt = + typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt) + ? Math.floor(entry.boundAt) + : Date.now(); + const lastActivityAt = + typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt) + ? Math.floor(entry.lastActivityAt) + : boundAt; + const record: WeixinThreadBindingRecord = { + accountId, + conversationId, + targetSessionKey, + targetKind, + boundAt, + lastActivityAt, + }; + if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) { + record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs)); + } + if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) { + record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs)); + } + if (typeof entry?.agentId === "string" && entry.agentId.trim()) { + record.agentId = entry.agentId.trim(); + } + if (typeof entry?.label === "string" && entry.label.trim()) { + record.label = entry.label.trim(); + } + if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) { + record.boundBy = entry.boundBy.trim(); + } + if (entry?.metadata && typeof entry.metadata === "object") { + record.metadata = { ...entry.metadata }; + } + bindings.push(record); + } + return bindings; + } catch (err) { + const code = (err as { code?: string }).code; + if (code !== "ENOENT") { + logVerbose(`weixin thread bindings load failed (${accountId}): ${String(err)}`); + } + return []; + } +} + +async function persistBindingsToDisk(params: { + accountId: string; + persist: boolean; + bindings?: WeixinThreadBindingRecord[]; +}): Promise { + if (!params.persist) { + return; + } + const payload: StoredWeixinBindingState = { + version: STORE_VERSION, + bindings: + params.bindings ?? + [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( + (entry) => entry.accountId === params.accountId, + ), + }; + await writeJsonFileAtomically(resolveBindingsPath(params.accountId), payload); +} + +function listBindingsForAccount(accountId: string): WeixinThreadBindingRecord[] { + return [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( + (entry) => entry.accountId === accountId, + ); +} + +function enqueuePersistBindings(params: { + accountId: string; + persist: boolean; + bindings?: WeixinThreadBindingRecord[]; +}): Promise { + if (!params.persist) { + return Promise.resolve(); + } + const previous = + getThreadBindingsState().persistQueueByAccountId.get(params.accountId) ?? Promise.resolve(); + const next = previous + .catch(() => undefined) + .then(async () => { + await persistBindingsToDisk(params); + }); + getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next); + void next.finally(() => { + if (getThreadBindingsState().persistQueueByAccountId.get(params.accountId) === next) { + getThreadBindingsState().persistQueueByAccountId.delete(params.accountId); + } + }); + return next; +} + +function persistBindingsSafely(params: { + accountId: string; + persist: boolean; + bindings?: WeixinThreadBindingRecord[]; + reason: string; +}): void { + void enqueuePersistBindings(params).catch((err) => { + logVerbose( + `weixin thread bindings persist failed (${params.accountId}, ${params.reason}): ${String(err)}`, + ); + }); +} + +function normalizeTimestampMs(raw: unknown): number { + if (typeof raw !== "number" || !Number.isFinite(raw)) { + return Date.now(); + } + return Math.max(0, Math.floor(raw)); +} + +function shouldExpireByIdle(params: { + now: number; + record: WeixinThreadBindingRecord; + defaultIdleTimeoutMs: number; +}): boolean { + const idleTimeoutMs = + typeof params.record.idleTimeoutMs === "number" + ? Math.max(0, Math.floor(params.record.idleTimeoutMs)) + : params.defaultIdleTimeoutMs; + if (idleTimeoutMs <= 0) { + return false; + } + return params.now >= Math.max(params.record.lastActivityAt, params.record.boundAt) + idleTimeoutMs; +} + +function shouldExpireByMaxAge(params: { + now: number; + record: WeixinThreadBindingRecord; + defaultMaxAgeMs: number; +}): boolean { + const maxAgeMs = + typeof params.record.maxAgeMs === "number" + ? Math.max(0, Math.floor(params.record.maxAgeMs)) + : params.defaultMaxAgeMs; + if (maxAgeMs <= 0) { + return false; + } + return params.now >= params.record.boundAt + maxAgeMs; +} + +export function createWeixinThreadBindingManager( + params: { + accountId?: string; + persist?: boolean; + idleTimeoutMs?: number; + maxAgeMs?: number; + enableSweeper?: boolean; + } = {}, +): WeixinThreadBindingManager { + const accountId = normalizeAccountId(params.accountId); + const existing = getThreadBindingsState().managersByAccountId.get(accountId); + if (existing) { + return existing; + } + + const persist = params.persist ?? true; + const idleTimeoutMs = normalizeDurationMs( + params.idleTimeoutMs, + DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS, + ); + const maxAgeMs = normalizeDurationMs(params.maxAgeMs, DEFAULT_THREAD_BINDING_MAX_AGE_MS); + + const loaded = loadBindingsFromDisk(accountId); + for (const entry of loaded) { + const key = resolveBindingKey({ accountId, conversationId: entry.conversationId }); + getThreadBindingsState().bindingsByAccountConversation.set(key, { + ...entry, + accountId, + }); + } + + let sweepTimer: NodeJS.Timeout | null = null; + + const manager: WeixinThreadBindingManager = { + accountId, + shouldPersistMutations: () => persist, + getIdleTimeoutMs: () => idleTimeoutMs, + getMaxAgeMs: () => maxAgeMs, + getByConversationId: (conversationIdRaw) => { + const conversationId = normalizeOptionalString(conversationIdRaw); + if (!conversationId) { + return undefined; + } + return getThreadBindingsState().bindingsByAccountConversation.get( + resolveBindingKey({ accountId, conversationId }), + ); + }, + listBySessionKey: (targetSessionKeyRaw) => { + const targetSessionKey = targetSessionKeyRaw.trim(); + if (!targetSessionKey) { + return []; + } + return listBindingsForAccount(accountId).filter( + (entry) => entry.targetSessionKey === targetSessionKey, + ); + }, + listBindings: () => listBindingsForAccount(accountId), + touchConversation: (conversationIdRaw, at) => { + const conversationId = normalizeOptionalString(conversationIdRaw); + if (!conversationId) { + return null; + } + const key = resolveBindingKey({ accountId, conversationId }); + const existingEntry = getThreadBindingsState().bindingsByAccountConversation.get(key); + if (!existingEntry) { + return null; + } + const nextRecord: WeixinThreadBindingRecord = { + ...existingEntry, + lastActivityAt: normalizeTimestampMs(at ?? Date.now()), + }; + getThreadBindingsState().bindingsByAccountConversation.set(key, nextRecord); + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "touch", + }); + return nextRecord; + }, + unbindConversation: (unbindParams) => { + const conversationId = normalizeOptionalString(unbindParams.conversationId); + if (!conversationId) { + return null; + } + const key = resolveBindingKey({ accountId, conversationId }); + const removed = getThreadBindingsState().bindingsByAccountConversation.get(key) ?? null; + if (!removed) { + return null; + } + getThreadBindingsState().bindingsByAccountConversation.delete(key); + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "unbind-conversation", + }); + return removed; + }, + unbindBySessionKey: (unbindParams) => { + const targetSessionKey = unbindParams.targetSessionKey.trim(); + if (!targetSessionKey) { + return []; + } + const removed: WeixinThreadBindingRecord[] = []; + for (const entry of listBindingsForAccount(accountId)) { + if (entry.targetSessionKey !== targetSessionKey) { + continue; + } + const key = resolveBindingKey({ + accountId, + conversationId: entry.conversationId, + }); + getThreadBindingsState().bindingsByAccountConversation.delete(key); + removed.push(entry); + } + if (removed.length > 0) { + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "unbind-session", + }); + } + return removed; + }, + stop: () => { + if (sweepTimer) { + clearInterval(sweepTimer); + sweepTimer = null; + } + unregisterSessionBindingAdapter({ + channel: WEIXIN_CHANNEL_ID, + accountId, + adapter: sessionBindingAdapter, + }); + const existingManager = getThreadBindingsState().managersByAccountId.get(accountId); + if (existingManager === manager) { + getThreadBindingsState().managersByAccountId.delete(accountId); + } + }, + }; + + // WeChat currently only supports 1:1 direct conversations (the channel code + // always routes inbound traffic as direct chat; group_id is ignored). That + // means we only expose the `current` placement — no `child` (forum topic / + // group thread creation). If group support lands upstream this is the place + // to add a `child` branch similar to Telegram's forum-topic handling. + const sessionBindingAdapter: SessionBindingAdapter = { + channel: WEIXIN_CHANNEL_ID, + accountId, + capabilities: { + placements: ["current"], + }, + bind: async (input) => { + if (input.conversation.channel !== WEIXIN_CHANNEL_ID) { + return null; + } + const targetSessionKey = input.targetSessionKey.trim(); + if (!targetSessionKey) { + return null; + } + if (input.placement === "child") { + logVerbose( + "weixin: child placement is not supported (no group/forum topic semantics); bind rejected.", + ); + return null; + } + const conversationId = normalizeOptionalString(input.conversation.conversationId); + if (!conversationId) { + return null; + } + const record = fromSessionBindingInput({ + accountId, + input: { + targetSessionKey, + targetKind: input.targetKind, + conversationId, + metadata: input.metadata, + }, + }); + getThreadBindingsState().bindingsByAccountConversation.set( + resolveBindingKey({ accountId, conversationId }), + record, + ); + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); + logVerbose( + `weixin: bound conversation ${conversationId} -> ${targetSessionKey} (${summarizeLifecycleForLog( + record, + { idleTimeoutMs, maxAgeMs }, + )})`, + ); + return toSessionBindingRecord(record, { idleTimeoutMs, maxAgeMs }); + }, + listBySession: (targetSessionKeyRaw) => { + const targetSessionKey = targetSessionKeyRaw.trim(); + if (!targetSessionKey) { + return []; + } + return manager + .listBySessionKey(targetSessionKey) + .map((entry) => toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs })); + }, + resolveByConversation: (ref) => { + if (ref.channel !== WEIXIN_CHANNEL_ID) { + return null; + } + const conversationId = normalizeOptionalString(ref.conversationId); + if (!conversationId) { + return null; + } + const record = manager.getByConversationId(conversationId); + return record ? toSessionBindingRecord(record, { idleTimeoutMs, maxAgeMs }) : null; + }, + touch: (bindingId, at) => { + const conversationId = resolveThreadBindingConversationIdFromBindingId({ + accountId, + bindingId, + }); + if (!conversationId) { + return; + } + manager.touchConversation(conversationId, at); + }, + unbind: async (input) => { + if (input.targetSessionKey?.trim()) { + const removed = manager.unbindBySessionKey({ + targetSessionKey: input.targetSessionKey, + reason: input.reason, + }); + if (removed.length > 0) { + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); + } + return removed.map((entry) => + toSessionBindingRecord(entry, { idleTimeoutMs, maxAgeMs }), + ); + } + const conversationId = resolveThreadBindingConversationIdFromBindingId({ + accountId, + bindingId: input.bindingId, + }); + if (!conversationId) { + return []; + } + const removed = manager.unbindConversation({ + conversationId, + reason: input.reason, + }); + if (removed) { + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); + } + return removed ? [toSessionBindingRecord(removed, { idleTimeoutMs, maxAgeMs })] : []; + }, + }; + + registerSessionBindingAdapter(sessionBindingAdapter); + + const sweeperEnabled = params.enableSweeper !== false; + if (sweeperEnabled) { + sweepTimer = setInterval(() => { + const now = Date.now(); + for (const record of listBindingsForAccount(accountId)) { + const idleExpired = shouldExpireByIdle({ + now, + record, + defaultIdleTimeoutMs: idleTimeoutMs, + }); + const maxAgeExpired = shouldExpireByMaxAge({ + now, + record, + defaultMaxAgeMs: maxAgeMs, + }); + if (!idleExpired && !maxAgeExpired) { + continue; + } + manager.unbindConversation({ + conversationId: record.conversationId, + reason: idleExpired ? "idle-expired" : "max-age-expired", + }); + } + }, THREAD_BINDINGS_SWEEP_INTERVAL_MS); + sweepTimer.unref?.(); + } + + getThreadBindingsState().managersByAccountId.set(accountId, manager); + return manager; +} + +export function getWeixinThreadBindingManager( + accountId?: string, +): WeixinThreadBindingManager | null { + return getThreadBindingsState().managersByAccountId.get(normalizeAccountId(accountId)) ?? null; +} + +/** + * Register the shared WeChat thread binding adapter. Called from plugin + * register(). Safe to call multiple times — subsequent calls return the + * already-registered manager for the same accountId. + */ +export function registerWeixinThreadBindings( + params: { + accountId?: string; + idleTimeoutMs?: number; + maxAgeMs?: number; + } = {}, +): WeixinThreadBindingManager { + return createWeixinThreadBindingManager(params); +} + +/** + * Stop every active WeChat thread binding manager and unregister the shared + * session binding adapter. Primarily used by tests; plugin shutdown paths + * can also call it to release resources. + */ +export async function stopAllWeixinThreadBindings(): Promise { + for (const manager of [...getThreadBindingsState().managersByAccountId.values()]) { + manager.stop(); + } + // Always await the queue — Promise.allSettled([]) resolves synchronously, + // so there's no reason to gate this on queue length (removing the branch + // also keeps branch coverage clean). + await Promise.allSettled([...getThreadBindingsState().persistQueueByAccountId.values()]); + getThreadBindingsState().persistQueueByAccountId.clear(); + getThreadBindingsState().managersByAccountId.clear(); + getThreadBindingsState().bindingsByAccountConversation.clear(); +} + +export const __testing = { + stopAllWeixinThreadBindings, + resolveBindingsPath, +};