diff --git a/keeper/__tests__/poller.test.js b/keeper/__tests__/poller.test.js index 2e60561..ecbf70e 100644 --- a/keeper/__tests__/poller.test.js +++ b/keeper/__tests__/poller.test.js @@ -35,6 +35,7 @@ describe('TaskPoller', () => { tasksChecked: 0, tasksDue: 0, tasksSkipped: 0, + tasksFiltered: 0, tasksSmoothed: 0, unacceptablyLate: 0, errors: 0, @@ -42,6 +43,7 @@ describe('TaskPoller', () => { expect(poller.getCycleInsights()).toEqual({ backlogSize: 0, + filteredCount: 0, dueCount: 0, dueSoonCount: 0, minSecondsUntilDue: null, @@ -50,6 +52,10 @@ describe('TaskPoller', () => { errors: 0, }); }); + + it('should store null for filterChain when none provided', () => { + expect(poller.filterChain).toBeNull(); + }); }); describe('pollDueTasks', () => { @@ -354,3 +360,117 @@ describe('TaskPoller', () => { }); }); }); + +// ─── Filter chain integration tests ────────────────────────────────────────── + +const { TaskFilterChain } = require('../src/taskFilter'); + +describe('TaskPoller with FilterChain', () => { + let mockServer; + const contractId = 'CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM'; + + beforeEach(() => { + mockServer = { + getLatestLedger: jest.fn().mockResolvedValue({ sequence: 1000 }), + getAccount: jest.fn(), + simulateTransaction: jest.fn(), + }; + }); + + it('accepts a filterChain in constructor options', () => { + const chain = new TaskFilterChain(); + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + expect(p.filterChain).toBe(chain); + }); + + it('ignores non-TaskFilterChain values', () => { + const p = new TaskPoller(mockServer, contractId, { filterChain: { fake: true } }); + expect(p.filterChain).toBeNull(); + }); + + it('checkTask is NOT called for tasks rejected by the filter chain', async () => { + // Reject task 2, pass task 1 and 3 + const chain = new TaskFilterChain(); + chain.addFilter('blockTwo', (id) => + id === 2 ? { pass: false, reason: 'blocked' } : { pass: true, reason: 'ok' }, + ); + + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + const checkTaskSpy = jest.spyOn(p, 'checkTask').mockResolvedValue({ isDue: false, taskId: 1 }); + + await p.pollDueTasks([1, 2, 3]); + + // checkTask must only be called for tasks 1 and 3 — not 2 + const calledWith = checkTaskSpy.mock.calls.map((c) => c[0]); + expect(calledWith).toContain(1); + expect(calledWith).toContain(3); + expect(calledWith).not.toContain(2); + }); + + it('stats.tasksFiltered is populated after filtering', async () => { + const chain = new TaskFilterChain(); + chain.addFilter('blockAll', () => ({ pass: false, reason: 'blocked' })); + + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + jest.spyOn(p, 'checkTask').mockResolvedValue({ isDue: false, taskId: 1 }); + + await p.pollDueTasks([1, 2, 3]); + + expect(p.stats.tasksFiltered).toBe(3); + }); + + it('getCycleInsights().filteredCount reflects the pre-filter count', async () => { + const chain = new TaskFilterChain(); + // Reject tasks 2 and 4 + chain.addFilter('evenFilter', (id) => + id % 2 === 0 ? { pass: false, reason: 'even' } : { pass: true, reason: 'ok' }, + ); + + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + jest.spyOn(p, 'checkTask').mockResolvedValue({ isDue: false, taskId: 1 }); + + await p.pollDueTasks([1, 2, 3, 4]); + + const insights = p.getCycleInsights(); + expect(insights.filteredCount).toBe(2); + expect(insights.backlogSize).toBe(4); + }); + + it('getCycleInsights includes filteredCount:0 when no filter is attached', async () => { + const p = new TaskPoller(mockServer, contractId, {}); + jest.spyOn(p, 'checkTask').mockResolvedValue({ isDue: true, taskId: 1 }); + + await p.pollDueTasks([1]); + expect(p.getCycleInsights().filteredCount).toBe(0); + }); + + it('does not drop valid tasks — eligible set reaches checkTask', async () => { + const chain = new TaskFilterChain(); + chain.addFilter('passAll', () => ({ pass: true, reason: 'ok' })); + + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + const spy = jest.spyOn(p, 'checkTask').mockResolvedValue({ isDue: true, taskId: 1 }); + + const due = await p.pollDueTasks([1, 2, 3]); + + expect(spy).toHaveBeenCalledTimes(3); + expect(due.length).toBe(3); + }); + + it('returns due tasks only from eligible set', async () => { + const chain = new TaskFilterChain(); + // Block task 3 + chain.addFilter('blockThree', (id) => + id === 3 ? { pass: false, reason: 'blocked' } : { pass: true, reason: 'ok' }, + ); + + const p = new TaskPoller(mockServer, contractId, { filterChain: chain }); + jest.spyOn(p, 'checkTask') + .mockResolvedValueOnce({ isDue: true, taskId: 1 }) + .mockResolvedValueOnce({ isDue: true, taskId: 2 }); + + const due = await p.pollDueTasks([1, 2, 3]); + expect(due).toEqual([1, 2]); + expect(due).not.toContain(3); + }); +}); diff --git a/keeper/__tests__/taskFilter.test.js b/keeper/__tests__/taskFilter.test.js new file mode 100644 index 0000000..22b8cbf --- /dev/null +++ b/keeper/__tests__/taskFilter.test.js @@ -0,0 +1,490 @@ +'use strict'; + +/** + * Unit tests for taskFilter.js — Selection Efficiency Pre-Filter Chain + * + * Coverage targets: + * - Each individual filter (pass + reject + edge cases) + * - TaskFilterChain ordering and fail-fast behaviour + * - filterTaskIds() returning correct eligible/filtered sets + * - Stats accumulation per cycle + * - No false positives — valid tasks are never dropped + * - Filter crash safety (a throwing filter must not block the task) + * - createDefaultFilterChain() factory wires all five filters in order + * - Extensibility via addFilter() + */ + +const { + TaskFilterChain, + createDefaultFilterChain, + nullTaskIdFilter, + cachedGasFilter, + cachedTimingFilter, + idempotencyLockFilter, + circuitBreakerFilter, +} = require('../src/taskFilter'); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +/** Build a minimal fake registry.tasks Map */ +function makeRegistry(taskMap = {}) { + const tasks = new Map(); + for (const [id, fields] of Object.entries(taskMap)) { + tasks.set(Number(id), fields); + } + return { tasks }; +} + +/** Build a fake idempotency guard */ +function makeGuard(lockedIds = []) { + const locked = new Set(lockedIds); + return { isLocked: (id) => locked.has(id) }; +} + +/** Build a fake circuit breaker */ +function makeBreaker(openIds = []) { + const open = new Set(openIds); + return { isOpen: (id) => open.has(id) }; +} + +// ─── Individual filter tests ────────────────────────────────────────────────── + +describe('nullTaskIdFilter', () => { + it('passes a valid numeric task ID', () => { + expect(nullTaskIdFilter(1, {}).pass).toBe(true); + }); + + it('passes bigint task IDs', () => { + expect(nullTaskIdFilter(BigInt(42), {}).pass).toBe(true); + }); + + it('rejects null', () => { + const r = nullTaskIdFilter(null, {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('null_task_id'); + }); + + it('rejects undefined', () => { + const r = nullTaskIdFilter(undefined, {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('null_task_id'); + }); + + it('rejects NaN', () => { + const r = nullTaskIdFilter(NaN, {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('non_finite_task_id'); + }); + + it('rejects Infinity', () => { + const r = nullTaskIdFilter(Infinity, {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('non_finite_task_id'); + }); + + it('rejects a string ID', () => { + const r = nullTaskIdFilter('42', {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('invalid_task_id_type'); + }); + + it('rejects an object', () => { + const r = nullTaskIdFilter({}, {}); + expect(r.pass).toBe(false); + expect(r.reason).toBe('invalid_task_id_type'); + }); +}); + +// ───────────────────────────────────────────────────────────────────────────── + +describe('cachedGasFilter', () => { + it('passes when no registry is provided', () => { + expect(cachedGasFilter(1, {}).pass).toBe(true); + expect(cachedGasFilter(1, { registry: null }).pass).toBe(true); + }); + + it('passes when task is not yet in the cache (cache miss)', () => { + const registry = makeRegistry({}); + expect(cachedGasFilter(99, { registry }).pass).toBe(true); + expect(cachedGasFilter(99, { registry }).reason).toBe('cache_miss'); + }); + + it('rejects task with gas_balance === 0', () => { + const registry = makeRegistry({ 1: { gas_balance: 0 } }); + const r = cachedGasFilter(1, { registry }); + expect(r.pass).toBe(false); + expect(r.reason).toBe('cached_zero_gas'); + }); + + it('rejects task with gas_balance < 0', () => { + const registry = makeRegistry({ 1: { gas_balance: -50 } }); + const r = cachedGasFilter(1, { registry }); + expect(r.pass).toBe(false); + expect(r.reason).toBe('cached_zero_gas'); + }); + + it('passes task with positive gas_balance', () => { + const registry = makeRegistry({ 1: { gas_balance: 1000 } }); + expect(cachedGasFilter(1, { registry }).pass).toBe(true); + }); + + it('passes task whose cache entry has no gas_balance field yet', () => { + // Task is registered but gas_balance not yet hydrated + const registry = makeRegistry({ 1: { status: 'registered' } }); + expect(cachedGasFilter(1, { registry }).pass).toBe(true); + }); +}); + +// ───────────────────────────────────────────────────────────────────────────── + +describe('cachedTimingFilter', () => { + it('passes when no registry is provided', () => { + expect(cachedTimingFilter(1, { currentTimestamp: 1000 }).pass).toBe(true); + }); + + it('passes when currentTimestamp is absent', () => { + const registry = makeRegistry({ 1: { last_run: 500, interval: 100 } }); + expect(cachedTimingFilter(1, { registry }).pass).toBe(true); + }); + + it('passes on cache miss', () => { + const registry = makeRegistry({}); + expect(cachedTimingFilter(99, { registry, currentTimestamp: 1000 }).pass).toBe(true); + expect(cachedTimingFilter(99, { registry, currentTimestamp: 1000 }).reason).toBe('cache_miss'); + }); + + it('passes task whose cache entry is missing timing fields', () => { + const registry = makeRegistry({ 1: { gas_balance: 500 } }); + expect(cachedTimingFilter(1, { registry, currentTimestamp: 1000 }).pass).toBe(true); + }); + + it('passes task whose interval is 0 (guard against division/logic edge)', () => { + const registry = makeRegistry({ 1: { last_run: 900, interval: 0 } }); + expect(cachedTimingFilter(1, { registry, currentTimestamp: 1000 }).pass).toBe(true); + }); + + it('rejects task that is clearly not yet due', () => { + // next_run = 800 + 300 = 1100 > 1000 → not due + const registry = makeRegistry({ 1: { last_run: 800, interval: 300 } }); + const r = cachedTimingFilter(1, { registry, currentTimestamp: 1000 }); + expect(r.pass).toBe(false); + expect(r.reason).toBe('cached_not_yet_due'); + expect(r.meta.secondsUntilDue).toBe(100); + }); + + it('passes task that is exactly at boundary (last_run + interval == currentTimestamp)', () => { + // next_run = 500 + 500 = 1000 == 1000 → due + const registry = makeRegistry({ 1: { last_run: 500, interval: 500 } }); + expect(cachedTimingFilter(1, { registry, currentTimestamp: 1000 }).pass).toBe(true); + }); + + it('passes task that is past due', () => { + // next_run = 500 + 400 = 900 < 1000 → overdue, should pass + const registry = makeRegistry({ 1: { last_run: 500, interval: 400 } }); + expect(cachedTimingFilter(1, { registry, currentTimestamp: 1000 }).pass).toBe(true); + }); +}); + +// ───────────────────────────────────────────────────────────────────────────── + +describe('idempotencyLockFilter', () => { + it('passes when no guard is provided', () => { + expect(idempotencyLockFilter(1, {}).pass).toBe(true); + }); + + it('passes task that is not locked', () => { + const guard = makeGuard([2, 3]); + expect(idempotencyLockFilter(1, { idempotencyGuard: guard }).pass).toBe(true); + }); + + it('rejects task that is locked', () => { + const guard = makeGuard([1, 2]); + const r = idempotencyLockFilter(1, { idempotencyGuard: guard }); + expect(r.pass).toBe(false); + expect(r.reason).toBe('execution_locked'); + }); + + it('supports hasLock() as alternative method name', () => { + const guard = { hasLock: (id) => id === 5 }; + expect(idempotencyLockFilter(5, { idempotencyGuard: guard }).pass).toBe(false); + expect(idempotencyLockFilter(6, { idempotencyGuard: guard }).pass).toBe(true); + }); + + it('passes gracefully when guard has neither isLocked nor hasLock', () => { + const guard = {}; // duck-type miss + expect(idempotencyLockFilter(1, { idempotencyGuard: guard }).pass).toBe(true); + }); +}); + +// ───────────────────────────────────────────────────────────────────────────── + +describe('circuitBreakerFilter', () => { + it('passes when no circuit breaker is provided', () => { + expect(circuitBreakerFilter(1, {}).pass).toBe(true); + }); + + it('passes task whose circuit is closed', () => { + const breaker = makeBreaker([2]); + expect(circuitBreakerFilter(1, { circuitBreaker: breaker }).pass).toBe(true); + }); + + it('rejects task whose circuit is open', () => { + const breaker = makeBreaker([1]); + const r = circuitBreakerFilter(1, { circuitBreaker: breaker }); + expect(r.pass).toBe(false); + expect(r.reason).toBe('circuit_open'); + }); + + it('supports getState() as alternative method name', () => { + const breaker = { getState: (id) => (id === 7 ? 'open' : 'closed') }; + expect(circuitBreakerFilter(7, { circuitBreaker: breaker }).pass).toBe(false); + expect(circuitBreakerFilter(8, { circuitBreaker: breaker }).pass).toBe(true); + }); + + it('passes gracefully when breaker has neither isOpen nor getState', () => { + expect(circuitBreakerFilter(1, { circuitBreaker: {} }).pass).toBe(true); + }); +}); + +// ─── TaskFilterChain ────────────────────────────────────────────────────────── + +describe('TaskFilterChain', () => { + describe('addFilter', () => { + it('throws when filter is not a function', () => { + const chain = new TaskFilterChain(); + expect(() => chain.addFilter('bad', 'not-a-function')).toThrow(TypeError); + }); + + it('supports fluent chaining', () => { + const chain = new TaskFilterChain(); + const ret = chain.addFilter('a', () => ({ pass: true, reason: 'ok' })); + expect(ret).toBe(chain); + }); + }); + + describe('filterTaskIds — basic behaviour', () => { + it('returns all tasks as eligible when no filters are registered', () => { + const chain = new TaskFilterChain(); + const { eligible, filtered, stats } = chain.filterTaskIds([1, 2, 3], {}); + expect(eligible).toEqual([1, 2, 3]); + expect(filtered).toEqual([]); + expect(stats.totalChecked).toBe(3); + expect(stats.totalFiltered).toBe(0); + expect(stats.totalEligible).toBe(3); + }); + + it('returns empty arrays for an empty input', () => { + const chain = new TaskFilterChain(); + const { eligible, filtered } = chain.filterTaskIds([], {}); + expect(eligible).toEqual([]); + expect(filtered).toEqual([]); + }); + + it('correctly partitions eligible vs filtered tasks', () => { + const chain = new TaskFilterChain(); + // Reject even task IDs + chain.addFilter('evenFilter', (id) => + id % 2 === 0 ? { pass: false, reason: 'even' } : { pass: true, reason: 'ok' }, + ); + + const { eligible, filtered } = chain.filterTaskIds([1, 2, 3, 4, 5], {}); + expect(eligible).toEqual([1, 3, 5]); + expect(filtered).toEqual([2, 4]); + }); + }); + + describe('filterTaskIds — fail-fast (chain short-circuits on first rejection)', () => { + it('stops evaluating remaining filters once one rejects', () => { + const chain = new TaskFilterChain(); + const secondFilter = jest.fn(() => ({ pass: true, reason: 'ok' })); + + chain.addFilter('alwaysReject', () => ({ pass: false, reason: 'nope' })); + chain.addFilter('second', secondFilter); + + chain.filterTaskIds([1], {}); + // The second filter should never have been called + expect(secondFilter).not.toHaveBeenCalled(); + }); + }); + + describe('filterTaskIds — stats', () => { + it('accumulates per-filter rejection counts', () => { + const chain = new TaskFilterChain(); + chain.addFilter('gasFilter', (id) => + id === 2 ? { pass: false, reason: 'zero_gas' } : { pass: true, reason: 'ok' }, + ); + chain.addFilter('timeFilter', (id) => + id === 4 ? { pass: false, reason: 'not_due' } : { pass: true, reason: 'ok' }, + ); + + const { stats } = chain.filterTaskIds([1, 2, 3, 4, 5], {}); + expect(stats.filterRejections.gasFilter).toBe(1); + expect(stats.filterRejections.timeFilter).toBe(1); + expect(stats.totalFiltered).toBe(2); + expect(stats.totalEligible).toBe(3); + }); + + it('getLastStats() returns a snapshot of most recent cycle', () => { + const chain = new TaskFilterChain(); + chain.addFilter('f', (id) => + id === 1 ? { pass: false, reason: 'r' } : { pass: true, reason: 'ok' }, + ); + chain.filterTaskIds([1, 2], {}); + const s = chain.getLastStats(); + expect(s.totalChecked).toBe(2); + expect(s.totalFiltered).toBe(1); + }); + + it('getLastStats() returns a copy, not a reference', () => { + const chain = new TaskFilterChain(); + chain.filterTaskIds([1, 2], {}); + const s = chain.getLastStats(); + s.totalChecked = 9999; + expect(chain.getLastStats().totalChecked).toBe(2); + }); + }); + + describe('filterTaskIds — crash safety', () => { + it('passes task through when a filter throws', () => { + const chain = new TaskFilterChain(); + chain.addFilter('buggy', () => { throw new Error('oops'); }); + + const { eligible, filtered } = chain.filterTaskIds([42], {}); + // Task must NOT be dropped due to filter bug + expect(eligible).toContain(42); + expect(filtered).toHaveLength(0); + }); + }); + + describe('filter ordering is preserved', () => { + it('applies filters in registration order', () => { + const order = []; + const chain = new TaskFilterChain(); + chain.addFilter('first', (id) => { order.push('first'); return { pass: true, reason: 'ok' }; }); + chain.addFilter('second', (id) => { order.push('second'); return { pass: true, reason: 'ok' }; }); + chain.addFilter('third', (id) => { order.push('third'); return { pass: true, reason: 'ok' }; }); + + chain.filterTaskIds([1], {}); + expect(order).toEqual(['first', 'second', 'third']); + }); + }); +}); + +// ─── createDefaultFilterChain ───────────────────────────────────────────────── + +describe('createDefaultFilterChain', () => { + it('creates a TaskFilterChain instance', () => { + const chain = createDefaultFilterChain(); + expect(chain).toBeInstanceOf(TaskFilterChain); + }); + + it('registers exactly 5 built-in filters', () => { + const chain = createDefaultFilterChain(); + expect(chain._filters).toHaveLength(5); + }); + + it('filters are registered in correct order', () => { + const chain = createDefaultFilterChain(); + const names = chain._filters.map((f) => f.name); + expect(names).toEqual([ + 'nullTaskIdFilter', + 'cachedGasFilter', + 'cachedTimingFilter', + 'idempotencyLockFilter', + 'circuitBreakerFilter', + ]); + }); + + it('passes all valid tasks with no disqualifying signals', () => { + const registry = makeRegistry({ + 1: { gas_balance: 1000, last_run: 500, interval: 400 }, // overdue + 2: { gas_balance: 500, last_run: 400, interval: 500 }, // overdue + }); + const chain = createDefaultFilterChain(); + const { eligible } = chain.filterTaskIds([1, 2], { currentTimestamp: 1000, registry }); + expect(eligible).toEqual([1, 2]); + }); + + it('filters out null IDs even with no optional deps', () => { + const chain = createDefaultFilterChain(); + const { eligible, filtered } = chain.filterTaskIds([1, null, 3], {}); + expect(eligible).toEqual([1, 3]); + expect(filtered).toEqual([null]); + }); + + it('filters out zero-gas tasks from cache', () => { + const registry = makeRegistry({ 2: { gas_balance: 0, last_run: 0, interval: 1 } }); + const chain = createDefaultFilterChain(); + const { filtered } = chain.filterTaskIds([1, 2, 3], { registry, currentTimestamp: 1000 }); + expect(filtered).toContain(2); + }); + + it('filters out not-yet-due tasks from timing cache', () => { + // Task 5: next_run = 900 + 200 = 1100 > 1000 → not due + const registry = makeRegistry({ 5: { gas_balance: 500, last_run: 900, interval: 200 } }); + const chain = createDefaultFilterChain(); + const { filtered } = chain.filterTaskIds([5], { registry, currentTimestamp: 1000 }); + expect(filtered).toContain(5); + }); + + it('filters out locked tasks via idempotency guard', () => { + const guard = makeGuard([7]); + const chain = createDefaultFilterChain({ idempotencyGuard: guard }); + const { filtered } = chain.filterTaskIds([7, 8], { idempotencyGuard: guard }); + expect(filtered).toContain(7); + expect(filtered).not.toContain(8); + }); + + it('filters out circuit-open tasks', () => { + const breaker = makeBreaker([10]); + const chain = createDefaultFilterChain({ circuitBreaker: breaker }); + const { filtered } = chain.filterTaskIds([10, 11], { circuitBreaker: breaker }); + expect(filtered).toContain(10); + expect(filtered).not.toContain(11); + }); + + describe('correctness — no valid task is accidentally dropped', () => { + it('does not reject a newly-registered task with no cache entry yet', () => { + // Empty registry: task just discovered, gas/timing cache empty + const registry = makeRegistry({}); + const chain = createDefaultFilterChain(); + const { eligible } = chain.filterTaskIds([42], { registry, currentTimestamp: 1000 }); + expect(eligible).toContain(42); + }); + + it('does not reject a task whose interval is incomplete in cache', () => { + // Only gas_balance present, timing missing → must pass to let RPC decide + const registry = makeRegistry({ 1: { gas_balance: 100 } }); + const chain = createDefaultFilterChain(); + const { eligible } = chain.filterTaskIds([1], { registry, currentTimestamp: 1000 }); + expect(eligible).toContain(1); + }); + + it('does not reject a task at the exact timing boundary', () => { + // next_run = 600 + 400 = 1000 == currentTimestamp → exactly due + const registry = makeRegistry({ 1: { gas_balance: 100, last_run: 600, interval: 400 } }); + const chain = createDefaultFilterChain(); + const { eligible } = chain.filterTaskIds([1], { registry, currentTimestamp: 1000 }); + expect(eligible).toContain(1); + }); + }); + + describe('extensibility — custom filter can be appended', () => { + it('addFilter appends after the five built-ins', () => { + const chain = createDefaultFilterChain(); + chain.addFilter('myCustomFilter', () => ({ pass: true, reason: 'ok' })); + expect(chain._filters).toHaveLength(6); + expect(chain._filters[5].name).toBe('myCustomFilter'); + }); + + it('custom filter participates in rejection correctly', () => { + const chain = createDefaultFilterChain(); + chain.addFilter('blockTask9', (id) => + id === 9 ? { pass: false, reason: 'custom_block' } : { pass: true, reason: 'ok' }, + ); + const { filtered } = chain.filterTaskIds([8, 9], {}); + expect(filtered).toContain(9); + expect(filtered).not.toContain(8); + }); + }); +}); diff --git a/keeper/index.js b/keeper/index.js index 98fabf4..c8009c9 100644 --- a/keeper/index.js +++ b/keeper/index.js @@ -15,6 +15,7 @@ const { MetricsServer } = require("./src/metrics"); const HistoryManager = require("./src/history"); const { normalizeShardConfig, filterTasksForShard } = require("./src/sharding"); const { StartupValidator } = require("./src/validator"); +const { createDefaultFilterChain } = require("./src/taskFilter"); // Create root logger for the main module const logger = createLogger("keeper"); @@ -114,10 +115,18 @@ async function main() { logger: createLogger("idempotency"), }); - // Initialize polling engine with logger + // Build the pre-filter chain — eliminates non-actionable tasks before RPC calls. + // Filters run in order: null-guard → cached gas → cached timing → idempotency lock → circuit breaker. + const filterChain = createDefaultFilterChain({ + idempotencyGuard, + logger: createLogger("filter"), + }); + + // Initialize polling engine with logger and filter chain const poller = new TaskPoller(server, config.contractId, { maxConcurrentReads: process.env.MAX_CONCURRENT_READS, logger: createLogger("poller"), + filterChain, simulationCacheTtl: process.env.SIMULATION_CACHE_TTL, simulationCacheMaxSize: process.env.SIMULATION_CACHE_MAX_SIZE, metricsServer, @@ -260,8 +269,10 @@ async function main() { } // Poll for due tasks + // Pass registry so cached gas/timing filters can read previously fetched values const dueTaskIds = await poller.pollDueTasks(shardSelection.ownedTaskIds, { registry, + idempotencyGuard, }); if (dueTaskIds.length > 0) { @@ -314,7 +325,7 @@ async function main() { const shardSelection = filterTasksForShard(taskIds, shardConfig); const dueTaskIds = controlState.paused ? [] - : await poller.pollDueTasks(shardSelection.ownedTaskIds, { registry }); + : await poller.pollDueTasks(shardSelection.ownedTaskIds, { registry, idempotencyGuard }); if (dueTaskIds.length > 0) { const tasksToEnqueue = dueTaskIds.map(d => ({ taskId: d.taskId, diff --git a/keeper/src/poller.js b/keeper/src/poller.js index afa8f3a..f4a568e 100644 --- a/keeper/src/poller.js +++ b/keeper/src/poller.js @@ -1,6 +1,7 @@ const { Contract, xdr, TransactionBuilder, BASE_FEE, Networks, scValToNative } = require('@stellar/stellar-sdk'); const { createRateLimiter } = require('./concurrency'); const { createLogger } = require('./logger'); +const { TaskFilterChain } = require('./taskFilter'); const { SimulationCache } = require('./simulationCache'); const crypto = require('crypto'); @@ -16,6 +17,11 @@ class TaskPoller { // Structured logger for poller module this.logger = options.logger || createLogger('poller'); + + // Optional pre-filter chain — eliminates non-actionable tasks before RPC calls + this.filterChain = options.filterChain instanceof TaskFilterChain + ? options.filterChain + : null; this.metricsServer = options.metricsServer; this.historyManager = options.historyManager || null; this.shardLabel = options.shardLabel || null; @@ -67,6 +73,7 @@ class TaskPoller { tasksChecked: 0, tasksDue: 0, tasksSkipped: 0, + tasksFiltered: 0, tasksSmoothed: 0, unacceptablyLate: 0, errors: 0, @@ -74,6 +81,7 @@ class TaskPoller { this.lastCycleInsights = { backlogSize: 0, + filteredCount: 0, dueCount: 0, dueSoonCount: 0, minSecondsUntilDue: null, @@ -108,6 +116,7 @@ class TaskPoller { this.stats.tasksChecked = 0; this.stats.tasksDue = 0; this.stats.tasksSkipped = 0; + this.stats.tasksFiltered = 0; this.stats.tasksSmoothed = 0; this.stats.unacceptablyLate = 0; this.stats.errors = 0; @@ -119,6 +128,7 @@ class TaskPoller { this.logger.info('No tasks to check'); this.lastCycleInsights = { backlogSize: 0, + filteredCount: 0, dueCount: 0, dueSoonCount: 0, minSecondsUntilDue: null, @@ -138,12 +148,42 @@ class TaskPoller { // which might require additional RPC calls or using ledger.timestamp from contract context cycleLogger.info('Current ledger sequence', { sequence: currentTimestamp }); - // Process tasks in parallel with concurrency control - const taskChecks = taskIds.map(taskId => + // ── Pre-filter: eliminate non-actionable tasks without any RPC calls ── + let candidateIds = taskIds; + let filteredCount = 0; + + if (this.filterChain) { + const registry = (options && options.registry) || null; + const { eligible, stats: filterStats } = this.filterChain.filterTaskIds(taskIds, { + currentTimestamp, + registry, + idempotencyGuard: options && options.idempotencyGuard, + circuitBreaker: options && options.circuitBreaker, + }); + + filteredCount = filterStats.totalFiltered; + this.stats.tasksFiltered = filteredCount; + candidateIds = eligible; + + if (filteredCount > 0) { + this.logger.info('Pre-filter eliminated tasks', { + total: taskIds.length, + filtered: filteredCount, + eligible: eligible.length, + byFilter: filterStats.filterRejections, + }); + } + } + + // Process only candidate tasks in parallel with concurrency control. + // Pass registry so checkTask can hydrate the cache (gas_balance, last_run, interval) + // which enables cachedGasFilter and cachedTimingFilter to fire on subsequent cycles. + const registry = (options && options.registry) || null; + const taskChecks = candidateIds.map(taskId => this.readLimit(async () => { const startedAt = Date.now(); const correlationId = `poll-${taskId}-${crypto.randomBytes(4).toString('hex')}`; - const result = await this.checkTask(taskId, currentTimestamp, options.registry, { correlationId }); + const result = await this.checkTask(taskId, currentTimestamp, registry, { correlationId }); rpcLatencies.push(Date.now() - startedAt); return { ...result, correlationId }; }), @@ -205,6 +245,7 @@ class TaskPoller { this.lastCycleInsights = { backlogSize: taskIds.length, + filteredCount: this.stats.tasksFiltered, dueCount: dueTaskIds.length, dueSoonCount, minSecondsUntilDue: positiveDueWindows.length > 0 ? Math.min(...positiveDueWindows) : null, @@ -538,6 +579,8 @@ class TaskPoller { const l = customLogger || this.logger; l.info('Poll complete', { durationMs: duration, + backlog: this.stats.tasksChecked + this.stats.tasksFiltered, + preFiltered: this.stats.tasksFiltered, checked: this.stats.tasksChecked, due: this.stats.tasksDue, smoothed: this.stats.tasksSmoothed, diff --git a/keeper/src/taskFilter.js b/keeper/src/taskFilter.js new file mode 100644 index 0000000..ff56eee --- /dev/null +++ b/keeper/src/taskFilter.js @@ -0,0 +1,389 @@ +'use strict'; + +/** + * taskFilter.js — Selection Efficiency Pre-Filter Chain + * + * Provides an ordered chain of cheap, in-process filters that run *before* + * the expensive `getTaskConfig()` RPC simulation. Tasks that fail any filter + * are excluded early, reducing unnecessary network load and improving + * overall keeper throughput. + * + * Filter ordering (cheapest → most expensive, all in-process): + * 1. nullTaskIdFilter — guard against corrupt/undefined IDs + * 2. cachedGasFilter — skip tasks with known-zero gas balance + * 3. cachedTimingFilter — skip tasks not yet due (arithmetic check) + * 4. idempotencyLockFilter — skip tasks already locked for execution + * 5. circuitBreakerFilter — skip tasks whose circuit breaker is open + * + * After all five filters run, only genuinely candidate tasks proceed to the + * costly `checkTask()` / `getTaskConfig()` RPC path. + * + * Usage: + * const { createDefaultFilterChain } = require('./taskFilter'); + * const filterChain = createDefaultFilterChain({ idempotencyGuard, circuitBreaker }); + * const { eligible, filtered, stats } = filterChain.filterTaskIds(taskIds, context); + * + * Extending the chain: + * filterChain.addFilter('myFilter', myFilterFn); // appended at the end + */ + +// ─── Individual filter functions ──────────────────────────────────────────── +// +// Each filter has the signature: +// (taskId: number, context: FilterContext) => { pass: boolean, reason: string } +// +// FilterContext shape: +// { +// currentTimestamp: number, // current ledger sequence (proxy for time) +// registry?: TaskRegistry, // optional: provides cached task metadata +// idempotencyGuard?: object, // optional: ExecutionIdempotencyGuard instance +// circuitBreaker?: object, // optional: CircuitBreaker instance +// } + +/** + * Filter 1 — Null/invalid task ID guard. + * + * Rejects task IDs that are null, undefined, NaN, or non-numeric. These can + * appear due to corrupt registry entries and must never reach RPC calls. + * + * Cost: pure in-process arithmetic. + * Rejection rate: very low in steady state; only fires on registry bugs. + */ +function nullTaskIdFilter(taskId, _context) { + if (taskId === null || taskId === undefined) { + return { pass: false, reason: 'null_task_id' }; + } + if (typeof taskId !== 'number' && typeof taskId !== 'bigint') { + return { pass: false, reason: 'invalid_task_id_type' }; + } + if (typeof taskId === 'number' && !Number.isFinite(taskId)) { + return { pass: false, reason: 'non_finite_task_id' }; + } + return { pass: true, reason: 'ok' }; +} + +/** + * Filter 2 — Cached gas balance guard. + * + * Reads `gas_balance` from the registry's in-memory task cache. Tasks with a + * zero or negative cached balance are skipped immediately without an RPC call. + * + * If the task is not yet in the cache (newly discovered this cycle), the filter + * passes through so that `getTaskConfig()` can populate the cache. + * + * Cost: single Map.get() lookup. + * Rejection rate: moderate — every exhausted-gas task is eliminated here. + */ +function cachedGasFilter(taskId, context) { + const registry = context && context.registry; + if (!registry || !registry.tasks) { + // No cache available — pass through to RPC + return { pass: true, reason: 'no_cache' }; + } + + const cached = registry.tasks.get(taskId); + if (!cached) { + // Task not yet in cache — pass through so RPC can hydrate it + return { pass: true, reason: 'cache_miss' }; + } + + // Only reject when we have a definitive zero/negative balance reading + if (cached.gas_balance !== undefined && cached.gas_balance <= 0) { + return { pass: false, reason: 'cached_zero_gas' }; + } + + return { pass: true, reason: 'ok' }; +} + +/** + * Filter 3 — Cached timing guard. + * + * Uses cached `last_run` and `interval` fields to compute whether the task's + * next scheduled run time has arrived. This mirrors the same arithmetic used + * inside `checkTask()` (`last_run + interval <= currentTimestamp`). + * + * If cached values are absent (new task, or not yet hydrated), the filter + * passes through so the RPC can supply fresh data. + * + * Cost: two Map fields + one addition + one comparison. + * Rejection rate: high — the majority of tasks are not yet due on any given + * polling cycle, especially for tasks with long intervals. + */ +function cachedTimingFilter(taskId, context) { + const registry = context && context.registry; + const currentTimestamp = context && context.currentTimestamp; + + if (!registry || !registry.tasks || currentTimestamp === undefined) { + return { pass: true, reason: 'no_cache' }; + } + + const cached = registry.tasks.get(taskId); + if (!cached) { + return { pass: true, reason: 'cache_miss' }; + } + + const { last_run, interval } = cached; + + // Only apply timing filter when both fields are populated and valid + if ( + last_run === undefined || + interval === undefined || + !Number.isFinite(last_run) || + !Number.isFinite(interval) || + interval <= 0 + ) { + return { pass: true, reason: 'incomplete_timing_data' }; + } + + const nextRunTime = last_run + interval; + if (nextRunTime > currentTimestamp) { + return { + pass: false, + reason: 'cached_not_yet_due', + meta: { nextRunTime, currentTimestamp, secondsUntilDue: nextRunTime - currentTimestamp }, + }; + } + + return { pass: true, reason: 'ok' }; +} + +/** + * Filter 4 — Idempotency lock guard. + * + * Skips tasks that are already locked for execution in the current or a recent + * cycle. This prevents double-submission races when the queue is still + * processing a previously enqueued execution of the same task. + * + * Cost: single Set/Map lookup inside the idempotency guard. + * Rejection rate: low in normal operation; spikes under slow-execution backlog. + */ +function idempotencyLockFilter(taskId, context) { + const guard = context && context.idempotencyGuard; + if (!guard) { + return { pass: true, reason: 'no_guard' }; + } + + // Real API: getLock(taskId) returns the lock object or null if unlocked. + // Also support legacy isLocked(taskId) or hasLock(taskId) for compatibility. + let isLocked = false; + if (typeof guard.getLock === 'function') { + isLocked = guard.getLock(taskId) !== null; + } else if (typeof guard.isLocked === 'function') { + isLocked = guard.isLocked(taskId); + } else if (typeof guard.hasLock === 'function') { + isLocked = guard.hasLock(taskId); + } + + if (isLocked) { + return { pass: false, reason: 'execution_locked' }; + } + + return { pass: true, reason: 'ok' }; +} + +/** + * Filter 5 — Circuit breaker guard. + * + * Skips tasks whose circuit breaker is in the OPEN state, indicating repeated + * recent failures. This avoids piling up more retries against a consistently + * failing task until the breaker resets. + * + * Cost: single Map lookup + state comparison. + * Rejection rate: low in healthy operation; non-zero after a task starts failing. + */ +function circuitBreakerFilter(taskId, context) { + const breaker = context && context.circuitBreaker; + if (!breaker) { + return { pass: true, reason: 'no_breaker' }; + } + + // Real API: isOpen(taskId) or getState() returns 'OPEN'/'CLOSED'/'HALF_OPEN' (uppercase). + // Support both naming patterns for maximum compatibility. + let isOpen = false; + if (typeof breaker.isOpen === 'function') { + isOpen = breaker.isOpen(taskId); + } else if (typeof breaker.getState === 'function') { + const state = breaker.getState(taskId); + // Real CircuitBreaker uses uppercase State enum: 'OPEN', 'CLOSED', 'HALF_OPEN' + isOpen = state === 'OPEN' || state === 'open'; + } + + if (isOpen) { + return { pass: false, reason: 'circuit_open' }; + } + + return { pass: true, reason: 'ok' }; +} + +// ─── TaskFilterChain ───────────────────────────────────────────────────────── + +/** + * TaskFilterChain — ordered pipeline of pre-filter functions. + * + * Applies each registered filter in order (fail-fast: first rejection wins). + * Tracks per-filter and aggregate statistics for observability. + */ +class TaskFilterChain { + /** + * @param {object} [options] + * @param {object} [options.logger] — optional logger (pino-compatible) + */ + constructor(options = {}) { + /** @type {Array<{name: string, fn: Function}>} */ + this._filters = []; + this._logger = options.logger || null; + + // Aggregate stats — reset each filterTaskIds() call + this._stats = this._emptyStats(); + } + + /** + * Add a named filter to the end of the chain. + * + * @param {string} name — human-readable identifier (used in logs & stats) + * @param {Function} fn — filter function: (taskId, context) => { pass, reason } + * @returns {TaskFilterChain} this, for chaining + */ + addFilter(name, fn) { + if (typeof fn !== 'function') { + throw new TypeError(`Filter "${name}" must be a function`); + } + this._filters.push({ name, fn }); + return this; + } + + /** + * Apply the filter chain to an array of task IDs. + * + * Short-circuits on the first failing filter for each task (fail-fast). + * Tasks with undefined cached values in timing/gas filters are passed through + * to allow the RPC to hydrate the cache. + * + * @param {number[]} taskIds — full list of registered task IDs + * @param {object} [context={}] — filter context (currentTimestamp, registry, etc.) + * @returns {{ eligible: number[], filtered: number[], stats: object }} + */ + filterTaskIds(taskIds, context = {}) { + const stats = this._emptyStats(); + stats.totalChecked = taskIds.length; + + const eligible = []; + const filtered = []; + + for (const taskId of taskIds) { + let passed = true; + + for (const { name, fn } of this._filters) { + let result; + try { + result = fn(taskId, context); + } catch (err) { + // A filter crash must never block a valid task — log and pass through + if (this._logger) { + this._logger.warn('Filter threw unexpectedly — passing task through', { + filter: name, + taskId, + error: err.message, + }); + } + result = { pass: true, reason: 'filter_error' }; + } + + if (!result.pass) { + // Record which filter rejected this task + stats.filterRejections[name] = (stats.filterRejections[name] || 0) + 1; + stats.totalFiltered++; + + if (this._logger) { + this._logger.debug('Task pre-filtered', { taskId, filter: name, reason: result.reason }); + } + + passed = false; + break; // fail-fast: no need to evaluate remaining filters + } + } + + if (passed) { + eligible.push(taskId); + } else { + filtered.push(taskId); + } + } + + stats.totalEligible = eligible.length; + this._stats = stats; + + if (this._logger && stats.totalFiltered > 0) { + this._logger.info('Pre-filter cycle complete', { + checked: stats.totalChecked, + eligible: stats.totalEligible, + filtered: stats.totalFiltered, + byFilter: stats.filterRejections, + }); + } + + return { eligible, filtered, stats }; + } + + /** + * Return stats from the most recent filterTaskIds() call. + * @returns {object} + */ + getLastStats() { + return { ...this._stats, filterRejections: { ...this._stats.filterRejections } }; + } + + // ─── private ──────────────────────────────────────────────────────────── + + _emptyStats() { + return { + totalChecked: 0, + totalEligible: 0, + totalFiltered: 0, + filterRejections: {}, // { filterName: count } + }; + } +} + +// ─── Factory ───────────────────────────────────────────────────────────────── + +/** + * Create the default filter chain with all five built-in filters wired in + * the correct order. + * + * Options map directly to filter context fields; any omitted option simply + * disables the corresponding filter's active checks (it becomes a pass-through). + * + * @param {object} [options] + * @param {object} [options.idempotencyGuard] — ExecutionIdempotencyGuard instance + * @param {object} [options.circuitBreaker] — CircuitBreaker instance + * @param {object} [options.logger] — pino-compatible logger + * @returns {TaskFilterChain} + */ +function createDefaultFilterChain(options = {}) { + const chain = new TaskFilterChain({ logger: options.logger }); + + chain + .addFilter('nullTaskIdFilter', nullTaskIdFilter) + .addFilter('cachedGasFilter', cachedGasFilter) + .addFilter('cachedTimingFilter', cachedTimingFilter) + .addFilter('idempotencyLockFilter', idempotencyLockFilter) + .addFilter('circuitBreakerFilter', circuitBreakerFilter); + + return chain; +} + +// ─── Exports ───────────────────────────────────────────────────────────────── + +module.exports = { + // Class + TaskFilterChain, + // Factory + createDefaultFilterChain, + // Individual filters (exported for testing and custom chain composition) + nullTaskIdFilter, + cachedGasFilter, + cachedTimingFilter, + idempotencyLockFilter, + circuitBreakerFilter, +};