From 46fd2e6544801fbfa28147dd2e5dc5185dd72ea2 Mon Sep 17 00:00:00 2001 From: YuCreate Date: Wed, 29 Apr 2026 21:22:49 +0800 Subject: [PATCH] fix(monitor): keep polling while handling inbound messages --- src/monitor/monitor.test.ts | 123 ++++++++++++++++++++++++++++++++++++ src/monitor/monitor.ts | 62 +++++++++++++----- 2 files changed, 169 insertions(+), 16 deletions(-) create mode 100644 src/monitor/monitor.test.ts diff --git a/src/monitor/monitor.test.ts b/src/monitor/monitor.test.ts new file mode 100644 index 0000000..d90dfd4 --- /dev/null +++ b/src/monitor/monitor.test.ts @@ -0,0 +1,123 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import type { WeixinMessage } from "../api/types.js"; +import { monitorWeixinProvider } from "./monitor.js"; + +const { + mockGetUpdates, + mockGetForUser, + mockProcessOneMessage, + mockWaitForWeixinRuntime, + mockGetSyncBufFilePath, + mockLoadGetUpdatesBuf, + mockSaveGetUpdatesBuf, + mockLogger, +} = vi.hoisted(() => ({ + mockGetUpdates: vi.fn(), + mockGetForUser: vi.fn(), + mockProcessOneMessage: vi.fn(), + mockWaitForWeixinRuntime: vi.fn(), + mockGetSyncBufFilePath: vi.fn(), + mockLoadGetUpdatesBuf: vi.fn(), + mockSaveGetUpdatesBuf: vi.fn(), + mockLogger: { + info: vi.fn(), + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }, +})); + +vi.mock("../api/api.js", () => ({ + getUpdates: mockGetUpdates, +})); + +vi.mock("../api/config-cache.js", () => ({ + WeixinConfigManager: vi.fn().mockImplementation(() => ({ + getForUser: mockGetForUser, + })), +})); + +vi.mock("../messaging/process-message.js", () => ({ + processOneMessage: mockProcessOneMessage, +})); + +vi.mock("../runtime.js", () => ({ + getWeixinRuntime: vi.fn(), + waitForWeixinRuntime: mockWaitForWeixinRuntime, +})); + +vi.mock("../storage/sync-buf.js", () => ({ + getSyncBufFilePath: mockGetSyncBufFilePath, + loadGetUpdatesBuf: mockLoadGetUpdatesBuf, + saveGetUpdatesBuf: mockSaveGetUpdatesBuf, +})); + +vi.mock("../util/logger.js", () => ({ + logger: { + withAccount: vi.fn(() => mockLogger), + }, +})); + +beforeEach(() => { + vi.clearAllMocks(); + mockWaitForWeixinRuntime.mockResolvedValue({ channel: {} }); + mockGetSyncBufFilePath.mockReturnValue("/tmp/openclaw-weixin-test-sync-buf"); + mockLoadGetUpdatesBuf.mockReturnValue(""); + mockGetForUser.mockResolvedValue({ typingTicket: "ticket-1" }); +}); + +describe("monitorWeixinProvider", () => { + it("continues polling while an inbound message is still being processed", async () => { + const abortController = new AbortController(); + const inbound: WeixinMessage = { + message_id: 1, + from_user_id: "user-1", + context_token: "ctx-1", + item_list: [{ type: 1, text_item: { text: "hello" } }], + }; + + let releaseMessageProcessing!: () => void; + const messageProcessing = new Promise((resolve) => { + releaseMessageProcessing = resolve; + }); + mockProcessOneMessage.mockReturnValue(messageProcessing); + + let markSecondPollStarted!: () => void; + const secondPollStarted = new Promise((resolve) => { + markSecondPollStarted = resolve; + }); + + mockGetUpdates + .mockResolvedValueOnce({ ret: 0, get_updates_buf: "buf-1", msgs: [inbound] }) + .mockImplementationOnce(async () => { + markSecondPollStarted(); + abortController.abort(); + return { ret: 0, get_updates_buf: "buf-2", msgs: [] }; + }); + + const monitorPromise = monitorWeixinProvider({ + baseUrl: "https://api.example.test", + cdnBaseUrl: "https://cdn.example.test", + accountId: "acct-1", + config: {} as Parameters[0]["config"], + abortSignal: abortController.signal, + }); + + await secondPollStarted; + + expect(mockGetUpdates).toHaveBeenCalledTimes(2); + expect(mockProcessOneMessage).toHaveBeenCalledTimes(1); + + let processed = false; + void messageProcessing.then(() => { + processed = true; + }); + await Promise.resolve(); + expect(processed).toBe(false); + + releaseMessageProcessing(); + await monitorPromise; + await messageProcessing; + }); +}); diff --git a/src/monitor/monitor.ts b/src/monitor/monitor.ts index 697f95e..3c296cd 100644 --- a/src/monitor/monitor.ts +++ b/src/monitor/monitor.ts @@ -3,6 +3,7 @@ import type { PluginRuntime } from "openclaw/plugin-sdk/core"; import { getUpdates } from "../api/api.js"; import { WeixinConfigManager } from "../api/config-cache.js"; +import type { WeixinMessage } from "../api/types.js"; import { SESSION_EXPIRED_ERRCODE, pauseSession, getRemainingPauseMs } from "../api/session-guard.js"; import { processOneMessage } from "../messaging/process-message.js"; import { getWeixinRuntime, waitForWeixinRuntime } from "../runtime.js"; @@ -15,6 +16,7 @@ const DEFAULT_LONG_POLL_TIMEOUT_MS = 35_000; const MAX_CONSECUTIVE_FAILURES = 3; const BACKOFF_DELAY_MS = 30_000; const RETRY_DELAY_MS = 2_000; +const MAX_IN_FLIGHT_MESSAGES = 32; export type MonitorWeixinOpts = { baseUrl: string; @@ -32,7 +34,7 @@ export type MonitorWeixinOpts = { }; /** - * Long-poll loop: getUpdates -> normalize -> recordInboundSession -> dispatchReplyFromConfig. + * Long-poll loop: getUpdates -> dispatch inbound messages without blocking the next poll. * Runs until abort. */ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise { @@ -84,6 +86,42 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise>(); + + const waitForInFlightSlot = async () => { + while (inFlightMessages.size >= MAX_IN_FLIGHT_MESSAGES) { + await Promise.race(inFlightMessages).catch(() => {}); + } + }; + + const processMessageInBackground = (full: WeixinMessage) => { + const task = (async () => { + const fromUserId = full.from_user_id ?? ""; + const cachedConfig = await configManager.getForUser(fromUserId, full.context_token); + + await processOneMessage(full, { + accountId, + config, + channelRuntime, + baseUrl, + cdnBaseUrl, + token, + typingTicket: cachedConfig.typingTicket, + log: opts.runtime?.log ?? (() => {}), + errLog, + }); + })(); + + inFlightMessages.add(task); + void task.catch((err) => { + errLog(`weixin message processing failed: ${String(err)}`); + aLog.error( + `processOneMessage failed: from=${full.from_user_id ?? ""} msgId=${full.message_id ?? ""} err=${String(err)} stack=${(err as Error).stack ?? ""}`, + ); + }).finally(() => { + inFlightMessages.delete(task); + }); + }; while (!abortSignal?.aborted) { try { @@ -165,20 +203,8 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise {}), - errLog, - }); + await waitForInFlightSlot(); + processMessageInBackground(full); } } catch (err) { if (abortSignal?.aborted) { @@ -204,7 +230,11 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise 0) { + aLog.info(`Monitor ended with ${inFlightMessages.size} message task(s) still running`); + } else { + aLog.info(`Monitor ended`); + } } function sleep(ms: number, signal?: AbortSignal): Promise {