Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/monitor/monitor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { beforeEach, describe, expect, it, vi } from "vitest";

import type { WeixinMessage } from "../api/types.js";
import { monitorWeixinProvider } from "./monitor.js";

const {
mockGetUpdates,
mockGetForUser,
mockProcessOneMessage,
mockWaitForWeixinRuntime,
mockGetSyncBufFilePath,
mockLoadGetUpdatesBuf,
mockSaveGetUpdatesBuf,
mockLogger,
} = vi.hoisted(() => ({
mockGetUpdates: vi.fn(),
mockGetForUser: vi.fn(),
mockProcessOneMessage: vi.fn(),
mockWaitForWeixinRuntime: vi.fn(),
mockGetSyncBufFilePath: vi.fn(),
mockLoadGetUpdatesBuf: vi.fn(),
mockSaveGetUpdatesBuf: vi.fn(),
mockLogger: {
info: vi.fn(),
debug: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
},
}));

vi.mock("../api/api.js", () => ({
getUpdates: mockGetUpdates,
}));

vi.mock("../api/config-cache.js", () => ({
WeixinConfigManager: vi.fn().mockImplementation(() => ({
getForUser: mockGetForUser,
})),
}));

vi.mock("../messaging/process-message.js", () => ({
processOneMessage: mockProcessOneMessage,
}));

vi.mock("../runtime.js", () => ({
getWeixinRuntime: vi.fn(),
waitForWeixinRuntime: mockWaitForWeixinRuntime,
}));

vi.mock("../storage/sync-buf.js", () => ({
getSyncBufFilePath: mockGetSyncBufFilePath,
loadGetUpdatesBuf: mockLoadGetUpdatesBuf,
saveGetUpdatesBuf: mockSaveGetUpdatesBuf,
}));

vi.mock("../util/logger.js", () => ({
logger: {
withAccount: vi.fn(() => mockLogger),
},
}));

beforeEach(() => {
vi.clearAllMocks();
mockWaitForWeixinRuntime.mockResolvedValue({ channel: {} });
mockGetSyncBufFilePath.mockReturnValue("/tmp/openclaw-weixin-test-sync-buf");
mockLoadGetUpdatesBuf.mockReturnValue("");
mockGetForUser.mockResolvedValue({ typingTicket: "ticket-1" });
});

describe("monitorWeixinProvider", () => {
it("continues polling while an inbound message is still being processed", async () => {
const abortController = new AbortController();
const inbound: WeixinMessage = {
message_id: 1,
from_user_id: "user-1",
context_token: "ctx-1",
item_list: [{ type: 1, text_item: { text: "hello" } }],
};

let releaseMessageProcessing!: () => void;
const messageProcessing = new Promise<void>((resolve) => {
releaseMessageProcessing = resolve;
});
mockProcessOneMessage.mockReturnValue(messageProcessing);

let markSecondPollStarted!: () => void;
const secondPollStarted = new Promise<void>((resolve) => {
markSecondPollStarted = resolve;
});

mockGetUpdates
.mockResolvedValueOnce({ ret: 0, get_updates_buf: "buf-1", msgs: [inbound] })
.mockImplementationOnce(async () => {
markSecondPollStarted();
abortController.abort();
return { ret: 0, get_updates_buf: "buf-2", msgs: [] };
});

const monitorPromise = monitorWeixinProvider({
baseUrl: "https://api.example.test",
cdnBaseUrl: "https://cdn.example.test",
accountId: "acct-1",
config: {} as Parameters<typeof monitorWeixinProvider>[0]["config"],
abortSignal: abortController.signal,
});

await secondPollStarted;

expect(mockGetUpdates).toHaveBeenCalledTimes(2);
expect(mockProcessOneMessage).toHaveBeenCalledTimes(1);

let processed = false;
void messageProcessing.then(() => {
processed = true;
});
await Promise.resolve();
expect(processed).toBe(false);

releaseMessageProcessing();
await monitorPromise;
await messageProcessing;
});
});
62 changes: 46 additions & 16 deletions src/monitor/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { PluginRuntime } from "openclaw/plugin-sdk/core";

import { getUpdates } from "../api/api.js";
import { WeixinConfigManager } from "../api/config-cache.js";
import type { WeixinMessage } from "../api/types.js";
import { SESSION_EXPIRED_ERRCODE, pauseSession, getRemainingPauseMs } from "../api/session-guard.js";
import { processOneMessage } from "../messaging/process-message.js";
import { getWeixinRuntime, waitForWeixinRuntime } from "../runtime.js";
Expand All @@ -15,6 +16,7 @@ const DEFAULT_LONG_POLL_TIMEOUT_MS = 35_000;
const MAX_CONSECUTIVE_FAILURES = 3;
const BACKOFF_DELAY_MS = 30_000;
const RETRY_DELAY_MS = 2_000;
const MAX_IN_FLIGHT_MESSAGES = 32;

export type MonitorWeixinOpts = {
baseUrl: string;
Expand All @@ -32,7 +34,7 @@ export type MonitorWeixinOpts = {
};

/**
* Long-poll loop: getUpdates -> normalize -> recordInboundSession -> dispatchReplyFromConfig.
* Long-poll loop: getUpdates -> dispatch inbound messages without blocking the next poll.
* Runs until abort.
*/
export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise<void> {
Expand Down Expand Up @@ -84,6 +86,42 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise<vo

let nextTimeoutMs = longPollTimeoutMs ?? DEFAULT_LONG_POLL_TIMEOUT_MS;
let consecutiveFailures = 0;
const inFlightMessages = new Set<Promise<void>>();

const waitForInFlightSlot = async () => {
while (inFlightMessages.size >= MAX_IN_FLIGHT_MESSAGES) {
await Promise.race(inFlightMessages).catch(() => {});
}
};

const processMessageInBackground = (full: WeixinMessage) => {
const task = (async () => {
const fromUserId = full.from_user_id ?? "";
const cachedConfig = await configManager.getForUser(fromUserId, full.context_token);

await processOneMessage(full, {
accountId,
config,
channelRuntime,
baseUrl,
cdnBaseUrl,
token,
typingTicket: cachedConfig.typingTicket,
log: opts.runtime?.log ?? (() => {}),
errLog,
});
})();

inFlightMessages.add(task);
void task.catch((err) => {
errLog(`weixin message processing failed: ${String(err)}`);
aLog.error(
`processOneMessage failed: from=${full.from_user_id ?? ""} msgId=${full.message_id ?? ""} err=${String(err)} stack=${(err as Error).stack ?? ""}`,
);
}).finally(() => {
inFlightMessages.delete(task);
});
};

while (!abortSignal?.aborted) {
try {
Expand Down Expand Up @@ -165,20 +203,8 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise<vo
// allowFrom filtering is delegated to processOneMessage via the framework
// authorization pipeline (resolveSenderCommandAuthorizationWithRuntime).

const fromUserId = full.from_user_id ?? "";
const cachedConfig = await configManager.getForUser(fromUserId, full.context_token);

await processOneMessage(full, {
accountId,
config,
channelRuntime,
baseUrl,
cdnBaseUrl,
token,
typingTicket: cachedConfig.typingTicket,
log: opts.runtime?.log ?? (() => {}),
errLog,
});
await waitForInFlightSlot();
processMessageInBackground(full);
}
} catch (err) {
if (abortSignal?.aborted) {
Expand All @@ -204,7 +230,11 @@ export async function monitorWeixinProvider(opts: MonitorWeixinOpts): Promise<vo
}
}
}
aLog.info(`Monitor ended`);
if (inFlightMessages.size > 0) {
aLog.info(`Monitor ended with ${inFlightMessages.size} message task(s) still running`);
} else {
aLog.info(`Monitor ended`);
}
}

function sleep(ms: number, signal?: AbortSignal): Promise<void> {
Expand Down