Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions src/api/session-guard.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import {
SESSION_EXPIRED_ERRCODE,
pauseSession,
clearSessionPause,
isSessionPaused,
getRemainingPauseMs,
assertSessionActive,
Expand Down Expand Up @@ -94,4 +95,36 @@ describe("session-guard", () => {
vi.advanceTimersByTime(10 * 60 * 1000);
expect(isSessionPaused("acc1")).toBe(false);
});

it("pauseSession honors a custom durationMs (#155 backoff)", () => {
pauseSession("acc1", 5_000);
expect(isSessionPaused("acc1")).toBe(true);
expect(getRemainingPauseMs("acc1")).toBeLessThanOrEqual(5_000);

vi.advanceTimersByTime(5_000);
expect(isSessionPaused("acc1")).toBe(false);
});

it("clearSessionPause removes an active pause (#155 recovery)", () => {
pauseSession("acc1");
expect(isSessionPaused("acc1")).toBe(true);

clearSessionPause("acc1");
expect(isSessionPaused("acc1")).toBe(false);
expect(getRemainingPauseMs("acc1")).toBe(0);
expect(() => assertSessionActive("acc1")).not.toThrow();
});

it("clearSessionPause is a no-op when no pause is set", () => {
expect(() => clearSessionPause("acc1")).not.toThrow();
expect(isSessionPaused("acc1")).toBe(false);
});

it("clearSessionPause is per-account", () => {
pauseSession("acc1");
pauseSession("acc2");
clearSessionPause("acc1");
expect(isSessionPaused("acc1")).toBe(false);
expect(isSessionPaused("acc2")).toBe(true);
});
});
41 changes: 35 additions & 6 deletions src/api/session-guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,39 @@ export const SESSION_EXPIRED_ERRCODE = -14;

const pauseUntilMap = new Map<string, number>();

/** Pause all inbound/outbound API calls for `accountId` for one hour. */
export function pauseSession(accountId: string): void {
const until = Date.now() + SESSION_PAUSE_DURATION_MS;
/**
* Pause the long-poll loop for `accountId` (default: one hour).
*
* This pause is intentionally scoped to the `getUpdates` long-poll path only.
* Outbound REST calls (`sendMessage`, `sendTyping`, `sendMedia`, ...) are
* independent of the long-poll session and must not be blocked here — see
* #155 for the death-loop bug that resulted from gating them on this state.
*
* Callers may pass a shorter `durationMs` to implement exponential backoff
* when `notifyStart`-based recovery has just failed.
*/
export function pauseSession(accountId: string, durationMs?: number): void {
const dur = durationMs ?? SESSION_PAUSE_DURATION_MS;
const until = Date.now() + dur;
pauseUntilMap.set(accountId, until);
logger.info(
`session-guard: paused accountId=${accountId} until=${new Date(until).toISOString()} (${SESSION_PAUSE_DURATION_MS / 1000}s)`,
`session-guard: paused accountId=${accountId} until=${new Date(until).toISOString()} (${Math.round(dur / 1000)}s)`,
);
}

/** Returns `true` when the bot is still within its one-hour cooldown window. */
/**
* Clear a previously-set pause for `accountId`.
*
* Used by the monitor after a successful `notifyStart` recovery so the
* long-poll can resume immediately instead of waiting out the cooldown.
*/
export function clearSessionPause(accountId: string): void {
if (pauseUntilMap.delete(accountId)) {
logger.info(`session-guard: cleared pause for accountId=${accountId}`);
}
}

/** Returns `true` when the long-poll is still within its cooldown window. */
export function isSessionPaused(accountId: string): boolean {
const until = pauseUntilMap.get(accountId);
if (until === undefined) return false;
Expand All @@ -39,7 +62,13 @@ export function getRemainingPauseMs(accountId: string): number {
return remaining;
}

/** Throw if the session is currently paused. Call before any API request. */
/**
* Throw if the long-poll session is currently paused.
*
* NOTE: This guard is intended for callers that are part of the inbound
* long-poll lifecycle. Outbound REST send paths must NOT call this — see
* the file-level comment on `pauseSession` and #155.
*/
export function assertSessionActive(accountId: string): void {
if (isSessionPaused(accountId)) {
const remainingMin = Math.ceil(getRemainingPauseMs(accountId) / 60_000);
Expand Down
9 changes: 6 additions & 3 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import {
} from "./auth/accounts.js";
import type { ResolvedWeixinAccount } from "./auth/accounts.js";
import { notifyStop, notifyStart } from "./api/api.js";
import { assertSessionActive } from "./api/session-guard.js";
// NOTE: outbound REST send paths (`sendMessage`, `sendMedia`, `sendTyping`)
// intentionally do NOT call `assertSessionActive`. The session-pause state in
// session-guard.ts is scoped to the long-poll `getUpdates` loop; gating REST
// outbound traffic on it caused the 60-minute death loop described in #155.
// If a REST call hits an expired session, the server itself will return an
// error which the API layer surfaces — it does not need a client-side gate.
import { getContextToken, findAccountIdsByContextToken, restoreContextTokens, clearContextTokensForAccount } from "./messaging/inbound.js";
import { logger } from "./util/logger.js";
import {
Expand Down Expand Up @@ -115,7 +120,6 @@ async function sendWeixinOutbound(params: {
}): Promise<{ channel: string; messageId: string }> {
const account = resolveWeixinAccount(params.cfg, params.accountId);
const aLog = logger.withAccount(account.accountId);
assertSessionActive(account.accountId);
if (!account.configured) {
aLog.error(`sendWeixinOutbound: account not configured`);
throw new Error("weixin not configured: please run `openclaw channels login --channel openclaw-weixin`");
Expand Down Expand Up @@ -226,7 +230,6 @@ export const weixinPlugin: ChannelPlugin<ResolvedWeixinAccount> = {
const accountId = ctx.accountId || resolveOutboundAccountId(ctx.cfg, ctx.to);
const account = resolveWeixinAccount(ctx.cfg, accountId);
const aLog = logger.withAccount(account.accountId);
assertSessionActive(account.accountId);
if (!account.configured) {
aLog.error(`sendMedia: account not configured`);
throw new Error(
Expand Down
107 changes: 107 additions & 0 deletions src/messaging/outbound-bypass.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Regression test for #155: outbound REST calls (sendMessage / sendMedia /
* sendTyping) must NOT be gated on the long-poll session-pause state.
*
* The bug was that `assertSessionActive` was called from the outbound send
* paths in channel.ts, so a long-poll cooldown set by `pauseSession` would
* block REST traffic that has nothing to do with the long-poll session.
*
* The fix is to leave the session-pause map untouched by outbound paths and
* let the REST API surface its own server-side errors. These tests pin that
* contract at the `sendMessageWeixin` boundary.
*/
import { describe, it, expect, vi, beforeEach } from "vitest";

vi.mock("../util/logger.js", () => ({
logger: {
info: vi.fn(),
debug: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
}));

const { mockSendMessageApi } = vi.hoisted(() => ({
mockSendMessageApi: vi.fn(),
}));

vi.mock("../api/api.js", () => ({
sendMessage: mockSendMessageApi,
}));

vi.mock("node:crypto", () => ({
default: {
randomBytes: vi.fn(() => Buffer.from("deadbeef", "hex")),
},
}));

vi.mock("openclaw/plugin-sdk", () => ({
stripMarkdown: (text: string) => text,
}));

import { sendMessageWeixin } from "./send.js";
import { pauseSession, _resetForTest } from "../api/session-guard.js";

beforeEach(() => {
_resetForTest();
mockSendMessageApi.mockReset();
mockSendMessageApi.mockResolvedValue(undefined);
});

describe("outbound REST is independent of long-poll pause (#155)", () => {
it("sendMessageWeixin succeeds while the session is paused", async () => {
// Simulate the monitor having just hit -14 and set a long pause.
pauseSession("acc-1");

await expect(
sendMessageWeixin({
to: "user-1",
text: "hello while paused",
opts: {
baseUrl: "https://test.example/",
token: "tok",
contextToken: "ctx",
},
}),
).resolves.toMatchObject({ messageId: expect.any(String) });

expect(mockSendMessageApi).toHaveBeenCalledTimes(1);
});

it("sendMessageWeixin does not consult or mutate the pause map", async () => {
pauseSession("acc-1");
const before = (await import("../api/session-guard.js")).getRemainingPauseMs(
"acc-1",
);

await sendMessageWeixin({
to: "user-2",
text: "still working",
opts: { baseUrl: "https://test.example/", token: "tok", contextToken: "ctx" },
});

const after = (await import("../api/session-guard.js")).getRemainingPauseMs(
"acc-1",
);
// The outbound path must neither clear nor extend the pause window.
// Allow a tiny delta for elapsed wall time during the test.
expect(Math.abs(before - after)).toBeLessThan(1_000);
});

it("propagates the underlying REST error without touching pause state", async () => {
pauseSession("acc-1");
mockSendMessageApi.mockRejectedValueOnce(new Error("server -14: re-login"));

await expect(
sendMessageWeixin({
to: "user-3",
text: "boom",
opts: { baseUrl: "https://test.example/", token: "tok", contextToken: "ctx" },
}),
).rejects.toThrow(/server -14/);

// Pause was not implicitly cleared by an outbound failure either.
const { isSessionPaused } = await import("../api/session-guard.js");
expect(isSessionPaused("acc-1")).toBe(true);
});
});
Loading