diff --git a/src/api/session-guard.test.ts b/src/api/session-guard.test.ts index 22b6558..4da8558 100644 --- a/src/api/session-guard.test.ts +++ b/src/api/session-guard.test.ts @@ -3,6 +3,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; import { SESSION_EXPIRED_ERRCODE, pauseSession, + clearSessionPause, isSessionPaused, getRemainingPauseMs, assertSessionActive, @@ -94,4 +95,36 @@ describe("session-guard", () => { vi.advanceTimersByTime(10 * 60 * 1000); expect(isSessionPaused("acc1")).toBe(false); }); + + it("pauseSession honors a custom durationMs (#155 backoff)", () => { + pauseSession("acc1", 5_000); + expect(isSessionPaused("acc1")).toBe(true); + expect(getRemainingPauseMs("acc1")).toBeLessThanOrEqual(5_000); + + vi.advanceTimersByTime(5_000); + expect(isSessionPaused("acc1")).toBe(false); + }); + + it("clearSessionPause removes an active pause (#155 recovery)", () => { + pauseSession("acc1"); + expect(isSessionPaused("acc1")).toBe(true); + + clearSessionPause("acc1"); + expect(isSessionPaused("acc1")).toBe(false); + expect(getRemainingPauseMs("acc1")).toBe(0); + expect(() => assertSessionActive("acc1")).not.toThrow(); + }); + + it("clearSessionPause is a no-op when no pause is set", () => { + expect(() => clearSessionPause("acc1")).not.toThrow(); + expect(isSessionPaused("acc1")).toBe(false); + }); + + it("clearSessionPause is per-account", () => { + pauseSession("acc1"); + pauseSession("acc2"); + clearSessionPause("acc1"); + expect(isSessionPaused("acc1")).toBe(false); + expect(isSessionPaused("acc2")).toBe(true); + }); }); diff --git a/src/api/session-guard.ts b/src/api/session-guard.ts index e31094c..8a955ab 100644 --- a/src/api/session-guard.ts +++ b/src/api/session-guard.ts @@ -7,16 +7,39 @@ export const SESSION_EXPIRED_ERRCODE = -14; const pauseUntilMap = new Map(); -/** Pause all inbound/outbound API calls for `accountId` for one hour. */ -export function pauseSession(accountId: string): void { - const until = Date.now() + SESSION_PAUSE_DURATION_MS; +/** + * Pause the long-poll loop for `accountId` (default: one hour). + * + * This pause is intentionally scoped to the `getUpdates` long-poll path only. + * Outbound REST calls (`sendMessage`, `sendTyping`, `sendMedia`, ...) are + * independent of the long-poll session and must not be blocked here — see + * #155 for the death-loop bug that resulted from gating them on this state. + * + * Callers may pass a shorter `durationMs` to implement exponential backoff + * when `notifyStart`-based recovery has just failed. + */ +export function pauseSession(accountId: string, durationMs?: number): void { + const dur = durationMs ?? SESSION_PAUSE_DURATION_MS; + const until = Date.now() + dur; pauseUntilMap.set(accountId, until); logger.info( - `session-guard: paused accountId=${accountId} until=${new Date(until).toISOString()} (${SESSION_PAUSE_DURATION_MS / 1000}s)`, + `session-guard: paused accountId=${accountId} until=${new Date(until).toISOString()} (${Math.round(dur / 1000)}s)`, ); } -/** Returns `true` when the bot is still within its one-hour cooldown window. */ +/** + * Clear a previously-set pause for `accountId`. + * + * Used by the monitor after a successful `notifyStart` recovery so the + * long-poll can resume immediately instead of waiting out the cooldown. + */ +export function clearSessionPause(accountId: string): void { + if (pauseUntilMap.delete(accountId)) { + logger.info(`session-guard: cleared pause for accountId=${accountId}`); + } +} + +/** Returns `true` when the long-poll is still within its cooldown window. */ export function isSessionPaused(accountId: string): boolean { const until = pauseUntilMap.get(accountId); if (until === undefined) return false; @@ -39,7 +62,13 @@ export function getRemainingPauseMs(accountId: string): number { return remaining; } -/** Throw if the session is currently paused. Call before any API request. */ +/** + * Throw if the long-poll session is currently paused. + * + * NOTE: This guard is intended for callers that are part of the inbound + * long-poll lifecycle. Outbound REST send paths must NOT call this — see + * the file-level comment on `pauseSession` and #155. + */ export function assertSessionActive(accountId: string): void { if (isSessionPaused(accountId)) { const remainingMin = Math.ceil(getRemainingPauseMs(accountId) / 60_000); diff --git a/src/channel.ts b/src/channel.ts index f36c8db..a74a46c 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -16,7 +16,12 @@ import { } from "./auth/accounts.js"; import type { ResolvedWeixinAccount } from "./auth/accounts.js"; import { notifyStop, notifyStart } from "./api/api.js"; -import { assertSessionActive } from "./api/session-guard.js"; +// NOTE: outbound REST send paths (`sendMessage`, `sendMedia`, `sendTyping`) +// intentionally do NOT call `assertSessionActive`. The session-pause state in +// session-guard.ts is scoped to the long-poll `getUpdates` loop; gating REST +// outbound traffic on it caused the 60-minute death loop described in #155. +// If a REST call hits an expired session, the server itself will return an +// error which the API layer surfaces — it does not need a client-side gate. import { getContextToken, findAccountIdsByContextToken, restoreContextTokens, clearContextTokensForAccount } from "./messaging/inbound.js"; import { logger } from "./util/logger.js"; import { @@ -115,7 +120,6 @@ async function sendWeixinOutbound(params: { }): Promise<{ channel: string; messageId: string }> { const account = resolveWeixinAccount(params.cfg, params.accountId); const aLog = logger.withAccount(account.accountId); - assertSessionActive(account.accountId); if (!account.configured) { aLog.error(`sendWeixinOutbound: account not configured`); throw new Error("weixin not configured: please run `openclaw channels login --channel openclaw-weixin`"); @@ -226,7 +230,6 @@ export const weixinPlugin: ChannelPlugin = { const accountId = ctx.accountId || resolveOutboundAccountId(ctx.cfg, ctx.to); const account = resolveWeixinAccount(ctx.cfg, accountId); const aLog = logger.withAccount(account.accountId); - assertSessionActive(account.accountId); if (!account.configured) { aLog.error(`sendMedia: account not configured`); throw new Error( diff --git a/src/messaging/outbound-bypass.test.ts b/src/messaging/outbound-bypass.test.ts new file mode 100644 index 0000000..50482da --- /dev/null +++ b/src/messaging/outbound-bypass.test.ts @@ -0,0 +1,107 @@ +/** + * Regression test for #155: outbound REST calls (sendMessage / sendMedia / + * sendTyping) must NOT be gated on the long-poll session-pause state. + * + * The bug was that `assertSessionActive` was called from the outbound send + * paths in channel.ts, so a long-poll cooldown set by `pauseSession` would + * block REST traffic that has nothing to do with the long-poll session. + * + * The fix is to leave the session-pause map untouched by outbound paths and + * let the REST API surface its own server-side errors. These tests pin that + * contract at the `sendMessageWeixin` boundary. + */ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +vi.mock("../util/logger.js", () => ({ + logger: { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +const { mockSendMessageApi } = vi.hoisted(() => ({ + mockSendMessageApi: vi.fn(), +})); + +vi.mock("../api/api.js", () => ({ + sendMessage: mockSendMessageApi, +})); + +vi.mock("node:crypto", () => ({ + default: { + randomBytes: vi.fn(() => Buffer.from("deadbeef", "hex")), + }, +})); + +vi.mock("openclaw/plugin-sdk", () => ({ + stripMarkdown: (text: string) => text, +})); + +import { sendMessageWeixin } from "./send.js"; +import { pauseSession, _resetForTest } from "../api/session-guard.js"; + +beforeEach(() => { + _resetForTest(); + mockSendMessageApi.mockReset(); + mockSendMessageApi.mockResolvedValue(undefined); +}); + +describe("outbound REST is independent of long-poll pause (#155)", () => { + it("sendMessageWeixin succeeds while the session is paused", async () => { + // Simulate the monitor having just hit -14 and set a long pause. + pauseSession("acc-1"); + + await expect( + sendMessageWeixin({ + to: "user-1", + text: "hello while paused", + opts: { + baseUrl: "https://test.example/", + token: "tok", + contextToken: "ctx", + }, + }), + ).resolves.toMatchObject({ messageId: expect.any(String) }); + + expect(mockSendMessageApi).toHaveBeenCalledTimes(1); + }); + + it("sendMessageWeixin does not consult or mutate the pause map", async () => { + pauseSession("acc-1"); + const before = (await import("../api/session-guard.js")).getRemainingPauseMs( + "acc-1", + ); + + await sendMessageWeixin({ + to: "user-2", + text: "still working", + opts: { baseUrl: "https://test.example/", token: "tok", contextToken: "ctx" }, + }); + + const after = (await import("../api/session-guard.js")).getRemainingPauseMs( + "acc-1", + ); + // The outbound path must neither clear nor extend the pause window. + // Allow a tiny delta for elapsed wall time during the test. + expect(Math.abs(before - after)).toBeLessThan(1_000); + }); + + it("propagates the underlying REST error without touching pause state", async () => { + pauseSession("acc-1"); + mockSendMessageApi.mockRejectedValueOnce(new Error("server -14: re-login")); + + await expect( + sendMessageWeixin({ + to: "user-3", + text: "boom", + opts: { baseUrl: "https://test.example/", token: "tok", contextToken: "ctx" }, + }), + ).rejects.toThrow(/server -14/); + + // Pause was not implicitly cleared by an outbound failure either. + const { isSessionPaused } = await import("../api/session-guard.js"); + expect(isSessionPaused("acc-1")).toBe(true); + }); +}); diff --git a/src/monitor/monitor.test.ts b/src/monitor/monitor.test.ts new file mode 100644 index 0000000..fe2538c --- /dev/null +++ b/src/monitor/monitor.test.ts @@ -0,0 +1,262 @@ +/** + * Tests for the long-poll monitor's errcode -14 (session expired) handling. + * + * Regression coverage for #155: the original code unconditionally paused the + * session for 60 minutes on `-14`, then retried `getUpdates` without first + * calling `notifyStart`, which immediately returned `-14` again. The new + * behavior calls `notifyStart` to rebuild the server session and falls back + * to exponential backoff (not a 60-minute wall) if recovery itself fails. + * + * Test pacing: we run with real timers. To keep loop iterations cheap, every + * mocked `getUpdates` response is delayed by a few ms so the loop yields to + * the event loop between calls (mirrors how long-poll actually behaves). + */ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +const { mockGetUpdates, mockNotifyStart } = vi.hoisted(() => ({ + mockGetUpdates: vi.fn(), + mockNotifyStart: vi.fn(), +})); + +vi.mock("../api/api.js", () => ({ + getUpdates: mockGetUpdates, + notifyStart: mockNotifyStart, +})); + +vi.mock("../api/config-cache.js", () => ({ + WeixinConfigManager: vi.fn().mockImplementation(() => ({ + getForUser: vi.fn().mockResolvedValue({ typingTicket: undefined }), + })), +})); + +vi.mock("../messaging/process-message.js", () => ({ + processOneMessage: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock("../storage/sync-buf.js", () => ({ + getSyncBufFilePath: vi.fn(() => "/tmp/sync-buf-test"), + loadGetUpdatesBuf: vi.fn(() => undefined), + saveGetUpdatesBuf: vi.fn(), +})); + +vi.mock("../util/logger.js", () => ({ + logger: { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + withAccount: vi.fn(() => ({ + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + })), + }, +})); + +vi.mock("../util/redact.js", () => ({ + redactBody: (s: string) => s, +})); + +import { monitorWeixinProvider } from "./monitor.js"; +import { + isSessionPaused, + getRemainingPauseMs, + _resetForTest, +} from "../api/session-guard.js"; + +const SUCCESS_RESP = { ret: 0, msgs: [], get_updates_buf: "" } as const; +const ERR_14_RESP = { errcode: -14, errmsg: "session expired" } as const; + +function delayed(value: T, ms = 5): () => Promise { + return () => new Promise((r) => setTimeout(() => r(value), ms)); +} + +function delayedReject(err: Error, ms = 5): () => Promise { + return () => new Promise((_, rej) => setTimeout(() => rej(err), ms)); +} + +/** + * Build a minimal opts object that satisfies the monitor's runtime contract. + * `channelRuntime` is a stub — none of the tested branches exercise inbound + * dispatch, since we drive `getUpdates` straight into the `-14` path. + */ +function buildOpts(accountId: string, abortSignal: AbortSignal) { + return { + baseUrl: "https://test.example/", + cdnBaseUrl: "https://cdn.example/", + token: "test-token", + accountId, + config: {} as never, + channelRuntime: {} as never, + abortSignal, + longPollTimeoutMs: 50, + }; +} + +async function waitFor( + predicate: () => boolean, + opts: { timeoutMs?: number; intervalMs?: number } = {}, +): Promise { + const timeoutMs = opts.timeoutMs ?? 5_000; + const intervalMs = opts.intervalMs ?? 10; + const start = Date.now(); + while (!predicate()) { + if (Date.now() - start > timeoutMs) { + throw new Error(`waitFor: predicate timed out after ${timeoutMs}ms`); + } + await new Promise((r) => setTimeout(r, intervalMs)); + } +} + +async function stop( + ac: AbortController, + p: Promise, +): Promise { + ac.abort(); + try { + await p; + } catch { + // expected — sleep() throws on abort + } +} + +describe("monitor.ts — errcode -14 recovery (#155)", () => { + beforeEach(() => { + _resetForTest(); + mockGetUpdates.mockReset(); + mockNotifyStart.mockReset(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("calls notifyStart BEFORE entering a long pause when getUpdates returns -14", async () => { + mockGetUpdates + .mockImplementationOnce(delayed(ERR_14_RESP)) + .mockImplementation(delayed(SUCCESS_RESP, 50)); + mockNotifyStart.mockImplementation(delayed({ ret: 0 } as never, 2)); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-a", ac.signal)); + + await waitFor(() => mockNotifyStart.mock.calls.length >= 1, { + timeoutMs: 10_000, + }); + await stop(ac, p); + + expect(mockNotifyStart).toHaveBeenCalledTimes(1); + expect(mockNotifyStart).toHaveBeenCalledWith( + expect.objectContaining({ + baseUrl: "https://test.example/", + token: "test-token", + }), + ); + }, 15_000); + + it("clears the pause after a successful notifyStart so getUpdates resumes within seconds", async () => { + mockGetUpdates + .mockImplementationOnce(delayed(ERR_14_RESP)) + .mockImplementation(delayed(SUCCESS_RESP, 50)); + mockNotifyStart.mockImplementation(delayed({ ret: 0 } as never, 2)); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-b", ac.signal)); + + // Wait for notifyStart to be called and at least one more getUpdates. + await waitFor( + () => + mockNotifyStart.mock.calls.length >= 1 && + mockGetUpdates.mock.calls.length >= 2, + { timeoutMs: 15_000 }, + ); + + // Right after recovery + retry, the pause must be clear. + expect(isSessionPaused("acc-b")).toBe(false); + expect(getRemainingPauseMs("acc-b")).toBe(0); + expect(mockGetUpdates.mock.calls.length).toBeGreaterThanOrEqual(2); + + await stop(ac, p); + }, 20_000); + + it("falls back to a SHORT pause (<60min) when notifyStart fails — no 60-minute wall", async () => { + mockGetUpdates.mockImplementation(delayed(ERR_14_RESP)); + mockNotifyStart.mockImplementation(delayedReject(new Error("network down"))); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-c", ac.signal)); + + // Wait both for notifyStart to be observed AND the pause map to reflect + // the fallback backoff window (set just before the awaited notifyStart). + await waitFor( + () => + mockNotifyStart.mock.calls.length >= 1 && + getRemainingPauseMs("acc-c") > 0, + { timeoutMs: 10_000 }, + ); + + // Pause window after a failed recovery attempt must be strictly less + // than the original 60-minute wall — that's the whole point of #155. + const remaining = getRemainingPauseMs("acc-c"); + expect(remaining).toBeGreaterThan(0); + expect(remaining).toBeLessThan(60 * 60 * 1000); + + await stop(ac, p); + }, 15_000); + + it("on persistent -14, retries notifyStart again instead of dying for 60 minutes", async () => { + mockGetUpdates.mockImplementation(delayed(ERR_14_RESP)); + mockNotifyStart.mockImplementation(delayedReject(new Error("upstream 500"))); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-d", ac.signal)); + + // Two notifyStart attempts must happen well inside 60 minutes + // (we cap the wait at a few seconds and assert the call count). + await waitFor(() => mockNotifyStart.mock.calls.length >= 2, { + timeoutMs: 30_000, + }); + + expect(mockNotifyStart.mock.calls.length).toBeGreaterThanOrEqual(2); + await stop(ac, p); + }, 35_000); + + it("behaves identically on normal traffic — no notifyStart, no pause", async () => { + mockGetUpdates.mockImplementation(delayed(SUCCESS_RESP, 50)); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-e", ac.signal)); + + await waitFor(() => mockGetUpdates.mock.calls.length >= 3); + + expect(mockNotifyStart).not.toHaveBeenCalled(); + expect(isSessionPaused("acc-e")).toBe(false); + + await stop(ac, p); + }); + + it("recovers a second time when -14 reappears after a prior recovery (no death loop)", async () => { + mockGetUpdates + .mockImplementationOnce(delayed(ERR_14_RESP)) + .mockImplementationOnce(delayed(SUCCESS_RESP, 20)) + .mockImplementationOnce(delayed(ERR_14_RESP)) + .mockImplementation(delayed(SUCCESS_RESP, 50)); + mockNotifyStart.mockImplementation(delayed({ ret: 0 } as never, 2)); + + const ac = new AbortController(); + const p = monitorWeixinProvider(buildOpts("acc-f", ac.signal)); + + // Wait for the second notifyStart to be observed, then give the loop a + // tick to run its post-await `clearSessionPause` so we don't race on it. + await waitFor(() => mockNotifyStart.mock.calls.length >= 2, { + timeoutMs: 30_000, + }); + await waitFor(() => !isSessionPaused("acc-f"), { timeoutMs: 2_000 }); + + expect(mockNotifyStart).toHaveBeenCalledTimes(2); + expect(isSessionPaused("acc-f")).toBe(false); + + await stop(ac, p); + }, 35_000); +}); diff --git a/src/monitor/monitor.ts b/src/monitor/monitor.ts index 65e1c72..1a65535 100644 --- a/src/monitor/monitor.ts +++ b/src/monitor/monitor.ts @@ -1,9 +1,14 @@ import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; import type { PluginRuntime } from "openclaw/plugin-sdk/core"; -import { getUpdates } from "../api/api.js"; +import { getUpdates, notifyStart } from "../api/api.js"; import { WeixinConfigManager } from "../api/config-cache.js"; -import { SESSION_EXPIRED_ERRCODE, pauseSession, getRemainingPauseMs } from "../api/session-guard.js"; +import { + SESSION_EXPIRED_ERRCODE, + pauseSession, + clearSessionPause, + getRemainingPauseMs, +} from "../api/session-guard.js"; import { processOneMessage } from "../messaging/process-message.js"; import { getSyncBufFilePath, loadGetUpdatesBuf, saveGetUpdatesBuf } from "../storage/sync-buf.js"; import { logger } from "../util/logger.js"; @@ -15,6 +20,19 @@ const MAX_CONSECUTIVE_FAILURES = 3; const BACKOFF_DELAY_MS = 30_000; const RETRY_DELAY_MS = 2_000; +/** + * Recovery tuning for the `errcode: -14` (session expired) branch. + * + * Strategy: call `notifyStart` to rebuild the server-side session, then + * retry `getUpdates` within seconds. On `notifyStart` failure, fall back + * to exponential backoff (NOT the original 60-minute wall) so we keep + * trying without entering a death loop (#155). + */ +const SESSION_RECOVERY_RETRY_DELAY_MS = 5_000; +const SESSION_RECOVERY_BACKOFF_INITIAL_MS = 5_000; +const SESSION_RECOVERY_BACKOFF_MAX_MS = 5 * 60_000; +const NOTIFY_START_TIMEOUT_MS = 10_000; + export type MonitorWeixinOpts = { baseUrl: string; cdnBaseUrl: string; @@ -85,6 +103,7 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise 0 ? waitMs : SESSION_RECOVERY_RETRY_DELAY_MS, abortSignal); + } continue; } @@ -148,6 +192,7 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise