diff --git a/keeper/__tests__/snapshot_reconciliation.test.js b/keeper/__tests__/snapshot_reconciliation.test.js new file mode 100644 index 0000000..b592461 --- /dev/null +++ b/keeper/__tests__/snapshot_reconciliation.test.js @@ -0,0 +1,121 @@ +const fs = require('fs'); +const { xdr, Address } = require('@stellar/stellar-sdk'); + +jest.mock('fs'); + +const TaskRegistry = require('../src/registry'); + +function makeV1Event(symbolName, taskId, value = null, ledger = 1000, ledgerCloseAt = '2026-04-29T12:00:00Z') { + const topic0 = xdr.ScVal.scvSymbol(symbolName).toXDR('base64'); + const topic1 = xdr.ScVal.scvSymbol('v1').toXDR('base64'); + const topic2 = xdr.ScVal.scvU64(xdr.Uint64.fromString(String(taskId))).toXDR('base64'); + + return { + topic: [topic0, topic1, topic2], + value: value ? xdr.ScVal.fromXDR(value, 'base64').toXDR('base64') : null, + ledger, + ledgerCloseAt, + id: `event-${ledger}-${taskId}` + }; +} + +function mockServer(events = []) { + return { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 2000 }), + getEvents: jest.fn().mockResolvedValue({ events, cursor: 'next-cursor' }), + }; +} + +describe('TaskRegistry Reconciliation', () => { + beforeEach(() => { + jest.clearAllMocks(); + fs.existsSync.mockReturnValue(false); + fs.mkdirSync.mockReturnValue(undefined); + fs.writeFileSync.mockReturnValue(undefined); + }); + + test('reconciles multiple event types correctly', async () => { + const { nativeToScVal } = require('@stellar/stellar-sdk'); + const events = [ + makeV1Event('TaskRegistered', 1), + makeV1Event('KeeperPaid', 1, nativeToScVal([ + Address.fromString('GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF'), + BigInt(100) + ]).toXDR('base64'), 1010, '2026-04-29T12:05:00Z'), + makeV1Event('GasDeposited', 1, nativeToScVal([ + Address.fromString('GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF'), + BigInt(500) + ]).toXDR('base64'), 1020), + makeV1Event('TaskPaused', 1, null, 1030), + makeV1Event('TaskResumed', 1, null, 1040), + ]; + + const server = mockServer(events); + // Force a snapshot load + fs.existsSync.mockReturnValue(true); + fs.readFileSync.mockReturnValue(JSON.stringify({ + version: 1, + taskIds: [1], + tasks: { + '1': { id: 1, gas_balance: 1000, last_run: 0, is_active: true } + }, + lastSeenLedger: 1000 + })); + + const registry = new TaskRegistry(server, 'CABC123'); + await registry.init(); + + const task = registry.tasks.get(1); + expect(task.id).toBe(1); + // 1000 (initial) - 100 (KeeperPaid) + 500 (GasDeposited) = 1400 + expect(task.gas_balance).toBe(1400); + // last_run should be from 12:05:00Z = 1777464300? No, let's check the date. + // 2026-04-29T12:05:00Z + const expectedLastRun = Math.floor(new Date('2026-04-29T12:05:00Z').getTime() / 1000); + expect(task.last_run).toBe(expectedLastRun); + expect(task.is_active).toBe(true); + expect(registry.lastSeenLedger).toBe(1040); + }); + + test('handles TaskCancelled', async () => { + const events = [ + makeV1Event('TaskCancelled', 1, null, 1050), + ]; + + const server = mockServer(events); + fs.existsSync.mockReturnValue(true); + fs.readFileSync.mockReturnValue(JSON.stringify({ + version: 1, + taskIds: [1], + tasks: { + '1': { id: 1, is_active: true } + }, + lastSeenLedger: 1000 + })); + + const registry = new TaskRegistry(server, 'CABC123'); + await registry.init(); + + expect(registry.taskIds.has(1)).toBe(false); + expect(registry.tasks.has(1)).toBe(false); + }); + + test('triggers full refresh on stale snapshot', async () => { + const server = mockServer([]); + fs.existsSync.mockReturnValue(true); + fs.readFileSync.mockReturnValue(JSON.stringify({ + version: 1, + taskIds: [1], + tasks: { '1': { id: 1 } }, + lastSeenLedger: 1000 // 1000 ledgers ago from 2000 + })); + + const registry = new TaskRegistry(server, 'CABC123', { staleThreshold: 500 }); + await registry.init(); + + // lastSeenLedger was reset to 0 because (2000 - 1000) > 500 + // Then it was set to latest - 720 = 1280 + expect(registry.taskIds.size).toBe(0); + expect(registry.lastSeenLedger).toBe(1280); + }); +}); diff --git a/keeper/package-lock.json b/keeper/package-lock.json index 4e8919e..6c7d659 100644 --- a/keeper/package-lock.json +++ b/keeper/package-lock.json @@ -108,7 +108,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -3259,7 +3258,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3714,7 +3712,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -4400,7 +4397,6 @@ "integrity": "sha512-wiyGaKsDgqXvF40P8mDwiUp/KQjE1FdrIEJsM8PZ3XCiniTMXS3OHWWUe5FI5agoCnr8x4xPrTDZuxsBlNHl+Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.2", @@ -5360,7 +5356,6 @@ "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.10.1.tgz", "integrity": "sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==", "license": "MIT", - "peer": true, "dependencies": { "@ioredis/commands": "1.5.1", "cluster-key-slot": "^1.1.0", diff --git a/keeper/src/poller.js b/keeper/src/poller.js index f4a568e..7ce0b73 100644 --- a/keeper/src/poller.js +++ b/keeper/src/poller.js @@ -199,6 +199,7 @@ class TaskPoller { let maxDriftTaskId = null; results.forEach((result, index) => { + if (result.status === 'fulfilled') { const { isDue, taskId, reason, correlationId } = result.value; if (isDue) { @@ -231,7 +232,7 @@ class TaskPoller { this.stats.tasksChecked++; } else if (result.status === 'rejected') { this.stats.errors++; - this.logger.error('Error checking task', { taskId: taskIds[index], error: result.reason?.message || result.reason }); + this.logger.error('Error checking task', { taskId: candidateIds[index], error: result.reason?.message || result.reason }); } }); diff --git a/keeper/src/registry.js b/keeper/src/registry.js index f2db88e..40e32b5 100644 --- a/keeper/src/registry.js +++ b/keeper/src/registry.js @@ -5,15 +5,30 @@ const { createLogger } = require('./logger'); const DATA_DIR = path.join(__dirname, '..', 'data'); const TASKS_FILE = path.join(DATA_DIR, 'tasks.json'); +const SNAPSHOT_VERSION = 1; + +const EVENT_TOPICS = { + TaskRegistered: 'AAAADwAAAA5UYXNrUmVnaXN0ZXJlZAAA', + TaskPaused: 'AAAADwAAAApUYXNrUGF1c2VkAAA=', + TaskResumed: 'AAAADwAAAAtUYXNrUmVzdW1lZAA=', + KeeperPaid: 'AAAADwAAAApLZWVwZXJQYWlkAAA=', + GasDeposited: 'AAAADwAAAAxHYXNEZXBvc2l0ZWQ=', + GasWithdrawn: 'AAAADwAAAAxHYXNXaXRoZHJhd24=', + TaskCancelled: 'AAAADwAAAA1UYXNrQ2FuY2VsbGVkAAAA', + DependencyAdded: 'AAAADwAAAA9EZXBlbmRlbmN5QWRkZWQA', + DependencyRemoved: 'AAAADwAAABFEZXBlbmRlbmN5UmVtb3ZlZAAAAA==', +}; class TaskRegistry { constructor(server, contractId, options = {}) { this.server = server; this.contractId = contractId; this.taskIds = new Set(); - this.tasks = new Map(); // Store taskId -> taskDetails (including status, logs) + this.tasks = new Map(); // Store taskId -> TaskConfig this.lastSeenLedger = options.startLedger || 0; + this.snapshotVersion = SNAPSHOT_VERSION; this.logger = options.logger || createLogger('registry'); + this.staleThreshold = options.staleThreshold || 100000; // ~1 week of ledgers this._ensureDataDir(); this._loadFromDisk(); } @@ -24,6 +39,20 @@ class TaskRegistry { */ async init() { this.logger.info('Initializing task registry'); + + // Check if snapshot is too old or version mismatch + const latestLedger = await this.server.getLatestLedger(); + if (this.lastSeenLedger > 0 && (latestLedger.sequence - this.lastSeenLedger) > this.staleThreshold) { + this.logger.warn('Snapshot is too stale, triggering full refresh', { + lastSeen: this.lastSeenLedger, + current: latestLedger.sequence, + threshold: this.staleThreshold + }); + this.lastSeenLedger = 0; + this.tasks.clear(); + this.taskIds.clear(); + } + await this._fetchEvents(); this.logger.info('Registry initialized', { taskCount: this.taskIds.size }); } @@ -71,11 +100,12 @@ class TaskRegistry { async _fetchEvents() { try { - // We need a valid startLedger. If we don't have one, grab the latest. + const info = await this.server.getLatestLedger(); + const currentLedger = info.sequence; + if (!this.lastSeenLedger) { - const info = await this.server.getLatestLedger(); // Look back a reasonable window (default ~1 hour on testnet ≈ 720 ledgers) - this.lastSeenLedger = Math.max(info.sequence - 720, 0); + this.lastSeenLedger = Math.max(currentLedger - 720, 0); } const contractId = this.contractId; @@ -84,6 +114,11 @@ class TaskRegistry { let cursor = undefined; let hasMore = true; + const topics = [ + Object.values(EVENT_TOPICS), + ['*'] + ]; + while (hasMore) { const params = { startLedger: cursor ? undefined : this.lastSeenLedger, @@ -91,9 +126,7 @@ class TaskRegistry { { type: 'contract', contractIds: [contractId], - topics: [ - ['AAAADwAAAA9UYXNrUmVnaXN0ZXJlZAA=', '*'], // Symbol("TaskRegistered"), * - ], + topics: topics, }, ], limit: 100, @@ -113,14 +146,9 @@ class TaskRegistry { for (const event of response.events) { try { - const taskId = this._extractTaskId(event); - if (taskId !== null && !this.taskIds.has(taskId)) { - this.taskIds.add(taskId); - this.updateTask(taskId, { id: taskId, status: 'registered', registeredAt: event.ledgerCloseAt }); - this.logger.info('Discovered task ID', { taskId }); - } + this._processEvent(event); } catch (err) { - this.logger.warn('Failed to decode event', { error: err.message }); + this.logger.warn('Failed to process event', { error: err.message, eventId: event.id }); } // Track the latest ledger we've processed @@ -129,7 +157,6 @@ class TaskRegistry { } } - // If we got fewer events than the limit, we're done if (response.events.length < 100) { hasMore = false; } else { @@ -139,11 +166,97 @@ class TaskRegistry { this._saveToDisk(); } catch (err) { - // Don't crash on transient RPC errors — just log and keep going this.logger.error('Error fetching events', { error: err.message }); } } + _processEvent(event) { + const { scValToNative } = require('@stellar/stellar-sdk'); + const topics = event.topic.map(t => scValToNative(xdr.ScVal.fromXDR(t, 'base64'))); + const eventType = topics[0]; + + // Most events have taskId as the 3rd topic (index 2) in v1 + let taskId; + if (topics[1] === 'v1') { + taskId = Number(topics[2]); + } else { + // Fallback for legacy or different format + taskId = Number(topics[1]); + } + + if (isNaN(taskId)) return; + + const eventData = event.value ? scValToNative(xdr.ScVal.fromXDR(event.value, 'base64')) : null; + const ledgerTimestamp = Math.floor(new Date(event.ledgerCloseAt).getTime() / 1000); + + const task = this.tasks.get(taskId) || { id: taskId, blocked_by: [] }; + + switch (eventType) { + case 'TaskRegistered': + // If we only have the event, we might not have the full config yet. + // But we mark it as registered. + this.taskIds.add(taskId); + this.updateTask(taskId, { + id: taskId, + status: 'registered', + registeredAt: event.ledgerCloseAt, + is_active: true, + last_run: 0 + }); + break; + + case 'TaskPaused': + this.updateTask(taskId, { is_active: false, status: 'paused' }); + break; + + case 'TaskResumed': + this.updateTask(taskId, { is_active: true, status: 'active' }); + break; + + case 'KeeperPaid': + // eventData is [keeper, fee] + const fee = eventData ? Number(eventData[1]) : 100; + this.updateTask(taskId, { + last_run: ledgerTimestamp, + gas_balance: (task.gas_balance || 0) - fee, + status: 'active' + }); + break; + + case 'GasDeposited': + // eventData is [from, amount] + const depositAmount = eventData ? Number(eventData[1]) : 0; + this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) + depositAmount }); + break; + + case 'GasWithdrawn': + // eventData is [from, amount] + const withdrawAmount = eventData ? Number(eventData[1]) : 0; + this.updateTask(taskId, { gas_balance: (task.gas_balance || 0) - withdrawAmount }); + break; + + case 'TaskCancelled': + this.tasks.delete(taskId); + this.taskIds.delete(taskId); + break; + + case 'DependencyAdded': + // eventData is depends_on_task_id + const depId = Number(eventData); + const currentDeps = task.blocked_by || []; + if (!currentDeps.includes(depId)) { + this.updateTask(taskId, { blocked_by: [...currentDeps, depId] }); + } + break; + + case 'DependencyRemoved': + // eventData is depends_on_task_id + const remId = Number(eventData); + this.updateTask(taskId, { blocked_by: (task.blocked_by || []).filter(id => id !== remId) }); + break; + } + } + /** * Extract the u64 task ID from the second topic of a TaskRegistered event. */ @@ -181,6 +294,12 @@ class TaskRegistry { try { if (fs.existsSync(TASKS_FILE)) { const data = JSON.parse(fs.readFileSync(TASKS_FILE, 'utf-8')); + if (data.version && data.version !== SNAPSHOT_VERSION) { + this.logger.warn('Snapshot version mismatch, full refresh may be needed', { + fileVersion: data.version, + currentVersion: SNAPSHOT_VERSION + }); + } if (Array.isArray(data.taskIds)) { data.taskIds.forEach(id => this.taskIds.add(id)); } @@ -202,6 +321,7 @@ class TaskRegistry { _saveToDisk() { try { const data = { + version: SNAPSHOT_VERSION, taskIds: Array.from(this.taskIds).sort((a, b) => a - b), tasks: Object.fromEntries(this.tasks), lastSeenLedger: this.lastSeenLedger,