Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 37 additions & 19 deletions examples/openclaw-plugin/context-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ export type ContextEngineWithSessionMapping = ContextEngine & {
resolveOVSession: (sessionKey: string) => Promise<string>;
/** Commit (extract + archive) then delete the OV session, so a fresh one is created on next use. */
commitOVSession: (sessionKey: string) => Promise<void>;
/** Serialize a commit operation through the per-session commit lock to prevent CONFLICT errors. */
commitWithLock: (ovSessionId: string, fn: () => Promise<void>) => Promise<void>;
};

type Logger = {
Expand Down Expand Up @@ -265,19 +267,31 @@ export function createMemoryOpenVikingContextEngine(params: {
resolveAgentId,
} = params;

// Per-session commit lock to prevent CONFLICT errors from overlapping commits.
const commitLocks = new Map<string, Promise<void>>();

async function withCommitLock(sessionId: string, fn: () => Promise<void>): Promise<void> {
const previous = commitLocks.get(sessionId) ?? Promise.resolve();
const current = previous.then(fn, fn); // run fn after previous settles (success or fail)
commitLocks.set(sessionId, current.then(() => {}, () => {})); // swallow to avoid unhandled rejection on the stored promise
return current;
}

async function doCommitOVSession(sessionKey: string): Promise<void> {
try {
const client = await getClient();
const agentId = resolveAgentId(sessionKey);
const ovSessionId = mapSessionKeyToOVSessionId(sessionKey);
const commitResult = await client.commitSession(ovSessionId, { wait: true, agentId });
logger.info(
`openviking: committed OV session for sessionKey=${sessionKey}, ovSessionId=${ovSessionId}, archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, task_id=${commitResult.task_id ?? "none"}`,
);
await client.deleteSession(ovSessionId, agentId).catch(() => {});
} catch (err) {
warnOrInfo(logger, `openviking: commit failed for sessionKey=${sessionKey}: ${String(err)}`);
}
const ovSessionId = mapSessionKeyToOVSessionId(sessionKey);
await withCommitLock(ovSessionId, async () => {
try {
const client = await getClient();
const agentId = resolveAgentId(sessionKey);
const commitResult = await client.commitSession(ovSessionId, { wait: true, agentId });
logger.info(
`openviking: committed OV session for sessionKey=${sessionKey}, ovSessionId=${ovSessionId}, archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, task_id=${commitResult.task_id ?? "none"}`,
);
await client.deleteSession(ovSessionId, agentId).catch(() => {});
} catch (err) {
warnOrInfo(logger, `openviking: commit failed for sessionKey=${sessionKey}: ${String(err)}`);
}
});
}

function extractSessionKey(runtimeContext: Record<string, unknown> | undefined): string | undefined {
Expand Down Expand Up @@ -305,6 +319,8 @@ export function createMemoryOpenVikingContextEngine(params: {

commitOVSession: doCommitOVSession,

commitWithLock: (ovSessionId: string, fn: () => Promise<void>) => withCommitLock(ovSessionId, fn),

// --- standard ContextEngine methods ---

async ingest(): Promise<IngestResult> {
Expand Down Expand Up @@ -368,13 +384,15 @@ export function createMemoryOpenVikingContextEngine(params: {
const OVSessionId = sessionKey
? mapSessionKeyToOVSessionId(sessionKey)
: afterTurnParams.sessionId;
await client.addSessionMessage(OVSessionId, "user", decision.normalizedText, agentId);
const commitResult = await client.commitSession(OVSessionId, { wait: true, agentId });
logger.info(
`openviking: committed ${newCount} messages in session=${OVSessionId}, ` +
`archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, ` +
`task_id=${commitResult.task_id ?? "none"} ${toJsonLog({ captured: [trimForLog(turnText, 260)] })}`,
);
await withCommitLock(OVSessionId, async () => {
await client.addSessionMessage(OVSessionId, "user", decision.normalizedText, agentId);
const commitResult = await client.commitSession(OVSessionId, { wait: true, agentId });
logger.info(
`openviking: committed ${newCount} messages in session=${OVSessionId}, ` +
`archived=${commitResult.archived ?? false}, memories=${commitResult.memories_extracted ?? 0}, ` +
`task_id=${commitResult.task_id ?? "none"} ${toJsonLog({ captured: [trimForLog(turnText, 260)] })}`,
);
});
} catch (err) {
warnOrInfo(logger, `openviking: auto-capture failed: ${String(err)}`);
}
Expand Down