diff --git a/src/index.ts b/src/index.ts index 770d4e3..654f766 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,8 @@ import { loadConfig } from './config/index.js' import { pool } from './db/pool.js' import { AnalyticsService } from './services/analytics/service.js' import { AnalyticsRefreshWorker, getAnalyticsRefreshIntervalMs } from './jobs/analyticsRefreshWorker.js' +import { AnalyticsRefreshScheduler } from './jobs/analyticsRefreshScheduler.js' +import { createAnalyticsRefreshMetrics } from './jobs/analyticsRefreshMetrics.js' import { keyManager } from './services/keyManager/index.js' app.use('/api/admin', createAdminRouter()) @@ -31,31 +33,18 @@ if (process.env.NODE_ENV !== 'test') { if (process.env.DATABASE_URL) { const thresholdSeconds = Number(process.env.ANALYTICS_STALENESS_SECONDS ?? '300') const analyticsService = new AnalyticsService(pool, thresholdSeconds) - const refreshWorker = new AnalyticsRefreshWorker(analyticsService, console.log) + const metrics = createAnalyticsRefreshMetrics() + const refreshWorker = new AnalyticsRefreshWorker(analyticsService, console.log, metrics) const intervalMs = getAnalyticsRefreshIntervalMs() - let running = false - const tick = async (): Promise => { - if (running) { - console.log('Analytics refresh is already running, skipping interval') - return - } - running = true - try { - await refreshWorker.run() - } catch (error) { - const message = error instanceof Error ? error.message : 'Unknown refresh error' - console.error(`Analytics refresh failed: ${message}`) - } finally { - running = false - } - } + const scheduler = new AnalyticsRefreshScheduler(refreshWorker, { + intervalMs, + runOnStart: true, + logger: console.log, + metrics, + }) - // Run once on startup, then periodically. - void tick() - setInterval(() => { - void tick() - }, intervalMs) + scheduler.start() } } catch (error) { console.error('Failed to start Credence API:', error) diff --git a/src/jobs/analyticsRefreshMetrics.ts b/src/jobs/analyticsRefreshMetrics.ts new file mode 100644 index 0000000..1a1d02e --- /dev/null +++ b/src/jobs/analyticsRefreshMetrics.ts @@ -0,0 +1,45 @@ +import client from 'prom-client' +import { register } from '../middleware/metrics.js' + +export const analyticsRefreshRunsTotal = new client.Counter({ + name: 'analytics_refresh_runs_total', + help: 'Total number of analytics materialized view refresh attempts', + labelNames: ['status'] as const, + registers: [register], +}) + +export const analyticsRefreshDurationSeconds = new client.Histogram({ + name: 'analytics_refresh_duration_seconds', + help: 'Duration of analytics materialized view REFRESH CONCURRENTLY in seconds', + buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60], + registers: [register], +}) + +export const analyticsViewAgeSeconds = new client.Gauge({ + name: 'analytics_view_age_seconds', + help: 'Age of the analytics_metrics_mv snapshot in seconds at the time of the last successful refresh', + registers: [register], +}) + +export const analyticsSchedulerSkipsTotal = new client.Counter({ + name: 'analytics_scheduler_skips_total', + help: 'Total number of scheduler ticks skipped due to overlap or distributed lock contention', + labelNames: ['reason'] as const, + registers: [register], +}) + +export interface AnalyticsRefreshMetrics { + incRuns(status: 'success' | 'error'): void + observeDuration(seconds: number): void + setViewAge(seconds: number): void + incSkip(reason: 'overlap' | 'lock_contention'): void +} + +export function createAnalyticsRefreshMetrics(): AnalyticsRefreshMetrics { + return { + incRuns: (status) => analyticsRefreshRunsTotal.inc({ status }), + observeDuration: (seconds) => analyticsRefreshDurationSeconds.observe(seconds), + setViewAge: (seconds) => analyticsViewAgeSeconds.set(seconds), + incSkip: (reason) => analyticsSchedulerSkipsTotal.inc({ reason }), + } +} diff --git a/src/jobs/analyticsRefreshScheduler.test.ts b/src/jobs/analyticsRefreshScheduler.test.ts new file mode 100644 index 0000000..ccff1ea --- /dev/null +++ b/src/jobs/analyticsRefreshScheduler.test.ts @@ -0,0 +1,208 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest' +import { AnalyticsRefreshScheduler } from './analyticsRefreshScheduler.js' +import type { AnalyticsRefreshWorker, AnalyticsRefreshWorkerResult } from './analyticsRefreshWorker.js' +import type { DistributedLock } from './distributedLock.js' +import type { AnalyticsRefreshMetrics } from './analyticsRefreshMetrics.js' + +const SUCCESS_RESULT: AnalyticsRefreshWorkerResult = { + refreshed: true, + duration: 42, + startTime: '2026-04-25T00:00:00.000Z', +} + +function makeWorker(result: AnalyticsRefreshWorkerResult = SUCCESS_RESULT): AnalyticsRefreshWorker { + return { run: vi.fn().mockResolvedValue(result) } as unknown as AnalyticsRefreshWorker +} + +function makeMetrics(): AnalyticsRefreshMetrics { + return { + incRuns: vi.fn(), + observeDuration: vi.fn(), + setViewAge: vi.fn(), + incSkip: vi.fn(), + } +} + +describe('AnalyticsRefreshScheduler', () => { + beforeEach(() => { + vi.useFakeTimers() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('starts and registers an interval', () => { + const worker = makeWorker() + const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 5000 }) + + scheduler.start() + expect(scheduler.isActive()).toBe(true) + + scheduler.stop() + expect(scheduler.isActive()).toBe(false) + }) + + it('does not register a second interval if already started', () => { + const worker = makeWorker() + const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 5000 }) + + scheduler.start() + scheduler.start() + + expect(worker.run).not.toHaveBeenCalled() + scheduler.stop() + }) + + it('runs worker on start when runOnStart is true', async () => { + const worker = makeWorker() + const scheduler = new AnalyticsRefreshScheduler(worker, { + intervalMs: 60_000, + runOnStart: true, + }) + + scheduler.start() + await vi.runAllTimersAsync() + + expect(worker.run).toHaveBeenCalledOnce() + scheduler.stop() + }) + + it('runs worker on each interval tick', async () => { + const worker = makeWorker() + const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 1000 }) + + scheduler.start() + await vi.advanceTimersByTimeAsync(3100) + + expect(worker.run).toHaveBeenCalledTimes(3) + scheduler.stop() + }) + + it('skips tick and increments overlap metric when already running', async () => { + let resolve!: () => void + const slowWorker = { + run: vi.fn().mockImplementation( + () => new Promise((r) => { resolve = () => r(SUCCESS_RESULT) }), + ), + } as unknown as AnalyticsRefreshWorker + + const metrics = makeMetrics() + const scheduler = new AnalyticsRefreshScheduler(slowWorker, { + intervalMs: 500, + runOnStart: true, + metrics, + }) + + scheduler.start() + // First tick fires (runOnStart) but hasn't resolved yet + await vi.advanceTimersByTimeAsync(600) + // Second tick fires while first is still running → should skip + expect(metrics.incSkip).toHaveBeenCalledWith('overlap') + + resolve() + scheduler.stop() + }) + + it('skips tick and increments lock_contention metric when distributed lock is held', async () => { + const worker = makeWorker() + const metrics = makeMetrics() + + const lock: DistributedLock = { + withLock: vi.fn().mockResolvedValue({ executed: false }), + acquire: vi.fn(), + release: vi.fn(), + heartbeat: vi.fn(), + getMetrics: vi.fn().mockReturnValue({ contentions: 1, acquisitions: 0, releases: 0, heartbeats: 0, errors: 0 }), + resetMetrics: vi.fn(), + } as unknown as DistributedLock + + const scheduler = new AnalyticsRefreshScheduler(worker, { + intervalMs: 1000, + distributedLock: lock, + metrics, + }) + + scheduler.start() + await vi.advanceTimersByTimeAsync(1100) + + expect(metrics.incSkip).toHaveBeenCalledWith('lock_contention') + expect(worker.run).not.toHaveBeenCalled() + scheduler.stop() + }) + + it('delegates to worker via distributed lock when lock is acquired', async () => { + const worker = makeWorker() + + const lock: DistributedLock = { + withLock: vi.fn().mockImplementation(async (_key: string, fn: () => Promise) => { + await fn() + return { executed: true } + }), + acquire: vi.fn(), + release: vi.fn(), + heartbeat: vi.fn(), + getMetrics: vi.fn(), + resetMetrics: vi.fn(), + } as unknown as DistributedLock + + const scheduler = new AnalyticsRefreshScheduler(worker, { + intervalMs: 1000, + distributedLock: lock, + }) + + scheduler.start() + await vi.advanceTimersByTimeAsync(1100) + + expect(worker.run).toHaveBeenCalledOnce() + scheduler.stop() + }) + + it('exposes status with run count and last result', async () => { + const worker = makeWorker() + const scheduler = new AnalyticsRefreshScheduler(worker, { + intervalMs: 1000, + runOnStart: true, + }) + + scheduler.start() + await vi.runAllTimersAsync() + + const status = scheduler.getStatus() + expect(status.active).toBe(true) + expect(status.runCount).toBe(1) + expect(status.lastResult).toEqual(SUCCESS_RESULT) + expect(status.isRunning).toBe(false) + + scheduler.stop() + expect(scheduler.getStatus().active).toBe(false) + }) + + it('uses custom lock key when provided', async () => { + const worker = makeWorker() + const lock: DistributedLock = { + withLock: vi.fn().mockResolvedValue({ executed: true, result: undefined }), + acquire: vi.fn(), + release: vi.fn(), + heartbeat: vi.fn(), + getMetrics: vi.fn(), + resetMetrics: vi.fn(), + } as unknown as DistributedLock + + const scheduler = new AnalyticsRefreshScheduler(worker, { + intervalMs: 1000, + distributedLock: lock, + lockKey: 'custom:analytics-lock', + }) + + scheduler.start() + await vi.advanceTimersByTimeAsync(1100) + + expect(lock.withLock).toHaveBeenCalledWith( + 'custom:analytics-lock', + expect.any(Function), + expect.any(Object), + ) + scheduler.stop() + }) +}) diff --git a/src/jobs/analyticsRefreshScheduler.ts b/src/jobs/analyticsRefreshScheduler.ts new file mode 100644 index 0000000..709dfcb --- /dev/null +++ b/src/jobs/analyticsRefreshScheduler.ts @@ -0,0 +1,122 @@ +import type { AnalyticsRefreshWorker, AnalyticsRefreshWorkerResult } from './analyticsRefreshWorker.js' +import type { DistributedLock } from './distributedLock.js' +import type { AnalyticsRefreshMetrics } from './analyticsRefreshMetrics.js' + +export interface AnalyticsRefreshSchedulerOptions { + intervalMs: number + runOnStart?: boolean + logger?: (message: string) => void + distributedLock?: DistributedLock + lockKey?: string + /** Lock TTL in ms. Must exceed the expected refresh duration. Defaults to min(5×intervalMs, 2min). */ + lockTtlMs?: number + metrics?: AnalyticsRefreshMetrics +} + +export interface SchedulerStatus { + active: boolean + isRunning: boolean + lastResult: AnalyticsRefreshWorkerResult | null + runCount: number +} + +export class AnalyticsRefreshScheduler { + private intervalId: ReturnType | null = null + private isRunning = false + private lastResult: AnalyticsRefreshWorkerResult | null = null + private runCount = 0 + + private readonly intervalMs: number + private readonly runOnStart: boolean + private readonly logger: (message: string) => void + private readonly distributedLock?: DistributedLock + private readonly lockKey: string + private readonly lockTtlMs: number + private readonly metrics?: AnalyticsRefreshMetrics + + constructor( + private readonly worker: AnalyticsRefreshWorker, + options: AnalyticsRefreshSchedulerOptions, + ) { + this.intervalMs = options.intervalMs + this.runOnStart = options.runOnStart ?? false + this.logger = options.logger ?? (() => {}) + this.distributedLock = options.distributedLock + this.lockKey = options.lockKey ?? 'cron:analytics-refresh' + this.lockTtlMs = options.lockTtlMs ?? Math.min(options.intervalMs * 5, 120_000) + this.metrics = options.metrics + } + + start(): void { + if (this.intervalId) { + this.logger('[AnalyticsRefreshScheduler] Already running') + return + } + + this.logger(`[AnalyticsRefreshScheduler] Starting with interval ${this.intervalMs}ms`) + + if (this.runOnStart) { + void this.tick() + } + + this.intervalId = setInterval(() => { + void this.tick() + }, this.intervalMs) + } + + stop(): void { + if (this.intervalId) { + clearInterval(this.intervalId) + this.intervalId = null + this.logger('[AnalyticsRefreshScheduler] Stopped') + } + } + + isActive(): boolean { + return this.intervalId !== null + } + + getStatus(): SchedulerStatus { + return { + active: this.isActive(), + isRunning: this.isRunning, + lastResult: this.lastResult, + runCount: this.runCount, + } + } + + private async tick(): Promise { + if (this.isRunning) { + this.logger('[AnalyticsRefreshScheduler] Skipping tick: refresh already in progress') + this.metrics?.incSkip('overlap') + return + } + + if (this.distributedLock) { + const { executed } = await this.distributedLock.withLock( + this.lockKey, + () => this.runWorker(), + { ttlMs: this.lockTtlMs, logger: this.logger }, + ) + + if (!executed) { + this.metrics?.incSkip('lock_contention') + this.logger('[AnalyticsRefreshScheduler] Skipping tick: lock held by another replica') + } + return + } + + await this.runWorker() + } + + private async runWorker(): Promise { + this.isRunning = true + try { + const result = await this.worker.run() + this.lastResult = result + this.runCount++ + } finally { + this.isRunning = false + } + } +} diff --git a/src/jobs/analyticsRefreshWorker.test.ts b/src/jobs/analyticsRefreshWorker.test.ts new file mode 100644 index 0000000..e1b8114 --- /dev/null +++ b/src/jobs/analyticsRefreshWorker.test.ts @@ -0,0 +1,112 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { AnalyticsRefreshWorker, getAnalyticsRefreshIntervalMs } from './analyticsRefreshWorker.js' +import type { AnalyticsService } from '../services/analytics/service.js' +import type { AnalyticsRefreshMetrics } from './analyticsRefreshMetrics.js' + +function makeService(overrides?: Partial): AnalyticsService { + return { + getSummary: vi.fn(), + refreshConcurrently: vi.fn().mockResolvedValue(undefined), + ...overrides, + } as unknown as AnalyticsService +} + +function makeMetrics(): AnalyticsRefreshMetrics { + return { + incRuns: vi.fn(), + observeDuration: vi.fn(), + setViewAge: vi.fn(), + incSkip: vi.fn(), + } +} + +describe('AnalyticsRefreshWorker', () => { + let logger: ReturnType + + beforeEach(() => { + logger = vi.fn() + }) + + it('calls refreshConcurrently and returns a success result', async () => { + const service = makeService() + const worker = new AnalyticsRefreshWorker(service, logger) + + const result = await worker.run() + + expect(service.refreshConcurrently).toHaveBeenCalledOnce() + expect(result.refreshed).toBe(true) + expect(result.duration).toBeGreaterThanOrEqual(0) + expect(result.startTime).toMatch(/^\d{4}-\d{2}-\d{2}T/) + expect(result.error).toBeUndefined() + }) + + it('logs start and completion messages', async () => { + const service = makeService() + const worker = new AnalyticsRefreshWorker(service, logger) + + await worker.run() + + expect(logger).toHaveBeenCalledWith(expect.stringContaining('Starting analytics')) + expect(logger).toHaveBeenCalledWith(expect.stringContaining('completed')) + }) + + it('records success metrics when metrics are provided', async () => { + const service = makeService() + const metrics = makeMetrics() + const worker = new AnalyticsRefreshWorker(service, logger, metrics) + + await worker.run() + + expect(metrics.incRuns).toHaveBeenCalledWith('success') + expect(metrics.observeDuration).toHaveBeenCalledWith(expect.any(Number)) + }) + + it('returns error result and records error metric when refresh throws', async () => { + const service = makeService({ + refreshConcurrently: vi.fn().mockRejectedValue(new Error('pg connection lost')), + }) + const metrics = makeMetrics() + const worker = new AnalyticsRefreshWorker(service, logger, metrics) + + const result = await worker.run() + + expect(result.refreshed).toBe(false) + expect(result.error).toBe('pg connection lost') + expect(metrics.incRuns).toHaveBeenCalledWith('error') + expect(metrics.observeDuration).toHaveBeenCalledWith(expect.any(Number)) + }) + + it('handles non-Error thrown values gracefully', async () => { + const service = makeService({ + refreshConcurrently: vi.fn().mockRejectedValue('string error'), + }) + const worker = new AnalyticsRefreshWorker(service, logger) + + const result = await worker.run() + + expect(result.refreshed).toBe(false) + expect(result.error).toBe('Unknown refresh error') + }) +}) + +describe('getAnalyticsRefreshIntervalMs', () => { + it('returns 5 minutes for the default cron expression', () => { + expect(getAnalyticsRefreshIntervalMs('*/5 * * * *')).toBe(5 * 60 * 1000) + }) + + it('returns 1 hour for hourly cron', () => { + expect(getAnalyticsRefreshIntervalMs('0 * * * *')).toBe(3_600_000) + }) + + it('returns 24 hours for daily cron', () => { + expect(getAnalyticsRefreshIntervalMs('0 0 * * *')).toBe(86_400_000) + }) + + it('returns 1 minute for every-minute cron', () => { + expect(getAnalyticsRefreshIntervalMs('* * * * *')).toBe(60_000) + }) + + it('throws for unsupported cron expressions', () => { + expect(() => getAnalyticsRefreshIntervalMs('0 */6 * * *')).toThrow() + }) +}) diff --git a/src/jobs/analyticsRefreshWorker.ts b/src/jobs/analyticsRefreshWorker.ts index 9cef983..481fd4b 100644 --- a/src/jobs/analyticsRefreshWorker.ts +++ b/src/jobs/analyticsRefreshWorker.ts @@ -1,16 +1,19 @@ import { parseCronToInterval } from './scheduler.js' import type { AnalyticsService } from '../services/analytics/service.js' +import type { AnalyticsRefreshMetrics } from './analyticsRefreshMetrics.js' export interface AnalyticsRefreshWorkerResult { refreshed: boolean duration: number startTime: string + error?: string } export class AnalyticsRefreshWorker { constructor( private readonly analyticsService: AnalyticsService, private readonly logger: (message: string) => void = () => {}, + private readonly metrics?: AnalyticsRefreshMetrics, ) {} async run(): Promise { @@ -18,24 +21,37 @@ export class AnalyticsRefreshWorker { const startTime = new Date().toISOString() this.logger('Starting analytics materialized view refresh') - await this.analyticsService.refreshConcurrently() - const duration = Date.now() - startMs - this.logger(`Analytics refresh completed in ${duration}ms`) - - return { - refreshed: true, - duration, - startTime, + + try { + // REFRESH MATERIALIZED VIEW CONCURRENTLY only holds a ShareUpdateExclusiveLock + // so readers are never blocked during the refresh. + await this.analyticsService.refreshConcurrently() + + const durationMs = Date.now() - startMs + const durationSeconds = durationMs / 1000 + + this.metrics?.incRuns('success') + this.metrics?.observeDuration(durationSeconds) + this.logger(`Analytics refresh completed in ${durationMs}ms`) + + return { refreshed: true, duration: durationMs, startTime } + } catch (error) { + const durationMs = Date.now() - startMs + const errorMessage = error instanceof Error ? error.message : 'Unknown refresh error' + + this.metrics?.incRuns('error') + this.metrics?.observeDuration(durationMs / 1000) + this.logger(`Analytics refresh failed after ${durationMs}ms: ${errorMessage}`) + + return { refreshed: false, duration: durationMs, startTime, error: errorMessage } } } } -export function getAnalyticsRefreshIntervalMs( - cronExpression = process.env.ANALYTICS_REFRESH_CRON ?? '*/5 * * * *', -): number { - if (cronExpression === '*/5 * * * *') { +export function getAnalyticsRefreshIntervalMs(cronExpression?: string): number { + const expr = cronExpression ?? (process.env['ANALYTICS_REFRESH_CRON'] ?? '*/5 * * * *') + if (expr === '*/5 * * * *') { return 5 * 60 * 1000 } - return parseCronToInterval(cronExpression) + return parseCronToInterval(expr) } - diff --git a/src/jobs/index.ts b/src/jobs/index.ts index 5cf66b2..4ba73c3 100644 --- a/src/jobs/index.ts +++ b/src/jobs/index.ts @@ -5,6 +5,7 @@ export * from './scheduler.js' export * from './distributedLock.js' export * from './lockedWorkers.js' export * from './analyticsRefreshWorker.js' +export * from './analyticsRefreshScheduler.js' export * from './invoiceDueDate.js' export * from './invoiceDueDateWorker.js' export * from './exportTypes.js' diff --git a/src/routes/admin/members.test.ts b/src/routes/admin/members.test.ts index 79ece72..fb3fafb 100644 --- a/src/routes/admin/members.test.ts +++ b/src/routes/admin/members.test.ts @@ -21,7 +21,7 @@ const mockService = { restoreMember: vi.fn(), } -vi.mock('../../services/members/service.ts', () => ({ +vi.mock('../../services/members/factory.ts', () => ({ MemberService: vi.fn().mockImplementation(() => mockService), })) diff --git a/src/routes/admin/webhooks.ts b/src/routes/admin/webhooks.ts index 8100258..a9ab907 100644 --- a/src/routes/admin/webhooks.ts +++ b/src/routes/admin/webhooks.ts @@ -12,7 +12,7 @@ import { auditLogService } from '../../services/audit/index.js' export function createWebhookAdminRouter(): Router { const router = Router() const store = new PostgresWebhookRepository(pool) - const webhookService = new WebhookService(store, auditLogService) + const webhookService = new WebhookService(store, undefined, undefined, auditLogService) /** * POST /api/admin/webhooks/:id/rotate diff --git a/src/services/audit/index.ts b/src/services/audit/index.ts index cc8fdb0..cf8f4d5 100644 --- a/src/services/audit/index.ts +++ b/src/services/audit/index.ts @@ -12,7 +12,7 @@ import { AuditAction } from './types.js' * In production, this would write to a database or centralized logging system */ export class AuditLogService { - constructor(private readonly repository: AuditLogRepository) {} + constructor(private readonly repository: AuditLogRepository = new InMemoryAuditLogsRepository()) {} /** * Log an admin action diff --git a/src/services/governance/redisStorage.test.ts b/src/services/governance/redisStorage.test.ts index d2e9f66..94928e5 100644 --- a/src/services/governance/redisStorage.test.ts +++ b/src/services/governance/redisStorage.test.ts @@ -1,7 +1,7 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { Redis } from 'ioredis'; import { RedisProposalStorage } from './redisStorage.js'; -import { Proposal, ProposalState } from './multisig.js'; +import type { MultisigProposal } from './types.js'; describe('RedisProposalStorage', () => { let storage: RedisProposalStorage; @@ -14,39 +14,37 @@ describe('RedisProposalStorage', () => { }; storage = new RedisProposalStorage(mockRedis as unknown as Redis); - // Mock Date.now to freeze time - vi.spyOn(Date, 'now').mockReturnValue(10000000); + vi.spyOn(Date, 'now').mockReturnValue(10_000_000); }); it('should save a proposal with TTL', async () => { - const prop: Proposal = { + const prop: MultisigProposal = { id: 'test-1', requiredSignatures: 2, - signers: new Set(['a', 'b']), + signers: ['a', 'b'], + action: 'slash_validator', signatures: new Map([['a', 'sig-a']]), slashingVotes: new Set(['c']), payload: { x: 1 }, - state: ProposalState.PENDING, - createdAt: 10000000, - expiresAt: 10000000 + 3600000, // 1 hour later + status: 'pending', + createdAt: new Date(10_000_000), + expiresAt: new Date(10_000_000 + 3_600_000), // 1 hour later }; await storage.saveProposal(prop); expect(mockRedis.set).toHaveBeenCalledTimes(1); - - // Check key, serialized json syntax, 'EX', and computed TTL (+86400 day buffer) + const setArgs = mockRedis.set.mock.calls[0]; expect(setArgs[0]).toBe('governance:proposal:test-1'); - - // Test the parsing + const savedJson = JSON.parse(setArgs[1]); expect(savedJson.signers).toEqual(['a', 'b']); expect(savedJson.signatures).toEqual([['a', 'sig-a']]); expect(savedJson.slashingVotes).toEqual(['c']); - + expect(setArgs[2]).toBe('EX'); - // TTL should be exactly 3600 + 86400 + // TTL: floor(3600000 / 1000) + 86400 = 3600 + 86400 = 90000 expect(setArgs[3]).toBe(90000); }); @@ -55,23 +53,24 @@ describe('RedisProposalStorage', () => { id: 'test-2', requiredSignatures: 1, signers: ['d'], + action: 'distribute_rewards', signatures: [], slashingVotes: [], payload: null, - state: ProposalState.APPROVED, - createdAt: 0, - expiresAt: 0, + status: 'approved', + createdAt: new Date(0).toISOString(), + expiresAt: new Date(0).toISOString(), }; mockRedis.get.mockResolvedValue(JSON.stringify(serializedData)); const prop = await storage.getProposal('test-2'); - + expect(prop).toBeDefined(); expect(prop?.id).toBe('test-2'); - expect(prop?.signers).toBeInstanceOf(Set); - expect(prop?.signers.has('d')).toBe(true); + expect(prop?.signers).toEqual(['d']); expect(prop?.signatures).toBeInstanceOf(Map); - expect(prop?.state).toBe(ProposalState.APPROVED); + expect(prop?.slashingVotes).toBeInstanceOf(Set); + expect(prop?.status).toBe('approved'); expect(mockRedis.get).toHaveBeenCalledWith('governance:proposal:test-2'); }); @@ -80,18 +79,19 @@ describe('RedisProposalStorage', () => { const prop = await storage.getProposal('test-not-found'); expect(prop).toBeUndefined(); }); - + it('should update a proposal with positive TTL', async () => { - const prop: Proposal = { + const prop: MultisigProposal = { id: 'test-3', requiredSignatures: 2, - signers: new Set(), + signers: [], + action: 'slash_validator', signatures: new Map(), slashingVotes: new Set(), - payload: { }, - state: ProposalState.PENDING, - createdAt: 10000000, - expiresAt: 10000000 + 1000, + payload: {}, + status: 'pending', + createdAt: new Date(10_000_000), + expiresAt: new Date(10_000_000 + 1_000), // 1 second later }; await storage.updateProposal(prop); @@ -100,23 +100,23 @@ describe('RedisProposalStorage', () => { const setArgs = mockRedis.set.mock.calls[0]; expect(setArgs[0]).toBe('governance:proposal:test-3'); expect(setArgs[2]).toBe('EX'); - - // TTL should be exactly 1 + 86400 = 86401 + // TTL: floor(1000 / 1000) + 86400 = 1 + 86400 = 86401 expect(setArgs[3]).toBe(86401); }); - it('should update a proposal with negative/expired TTL', async () => { - const prop: Proposal = { + it('should update a proposal with expired TTL and fall back to minimum', async () => { + const prop: MultisigProposal = { id: 'test-4', requiredSignatures: 2, - signers: new Set(), + signers: [], + action: 'slash_validator', signatures: new Map(), slashingVotes: new Set(), - payload: { }, - state: ProposalState.PENDING, - createdAt: 10000000, - // Create a scenario where TTL < 0 (i.e. long ago) - expiresAt: 10000000 - 90000000, + payload: {}, + status: 'pending', + createdAt: new Date(10_000_000), + // expiresAt far in the past → ttlSeconds < 0 + expiresAt: new Date(10_000_000 - 90_000_000), }; await storage.updateProposal(prop); @@ -125,8 +125,7 @@ describe('RedisProposalStorage', () => { const setArgs = mockRedis.set.mock.calls[0]; expect(setArgs[0]).toBe('governance:proposal:test-4'); expect(setArgs[2]).toBe('EX'); - - // Should fallback to 3600 minimal TTL + // Falls back to minimal TTL of 3600 expect(setArgs[3]).toBe(3600); }); }); diff --git a/src/services/policy/__tests__/service.test.ts b/src/services/policy/__tests__/service.test.ts index 0fe5f23..1928c25 100644 --- a/src/services/policy/__tests__/service.test.ts +++ b/src/services/policy/__tests__/service.test.ts @@ -10,10 +10,11 @@ import { describe, it, expect } from 'vitest' import { PolicyService } from '../service.js' import { PolicyStore } from '../store.js' import { AuditLogService, AuditAction } from '../../audit/index.js' +import { InMemoryAuditLogsRepository } from '../../../db/repositories/auditLogsRepository.js' function makeService() { const store = new PolicyStore() - const audit = new AuditLogService() + const audit = new AuditLogService(new InMemoryAuditLogsRepository()) const svc = new PolicyService(store, audit) return { svc, store, audit } } @@ -22,7 +23,7 @@ const actor = { id: 'admin-1', email: 'admin@credence.org' } describe('PolicyService', () => { describe('createRule', () => { - it('persists the rule and emits POLICY_RULE_CREATED audit entry', () => { + it('persists the rule and emits POLICY_RULE_CREATED audit entry', async () => { const { svc, audit } = makeService() const rule = svc.createRule(actor.id, actor.email, { orgId: 'org-acme', @@ -32,14 +33,16 @@ describe('PolicyService', () => { effect: 'allow', }) expect(rule.id).toBeDefined() - const { logs } = audit.getLogs({ action: AuditAction.POLICY_RULE_CREATED }) + // Allow the fire-and-forget logAction microtask to settle + await Promise.resolve() + const { logs } = await audit.getLogs({ action: AuditAction.POLICY_RULE_CREATED }) expect(logs).toHaveLength(1) expect(logs[0].details).toMatchObject({ ruleId: rule.id, orgId: 'org-acme' }) }) }) describe('updateRule', () => { - it('updates the rule and emits POLICY_RULE_UPDATED audit entry', () => { + it('updates the rule and emits POLICY_RULE_UPDATED audit entry', async () => { const { svc, audit } = makeService() const rule = svc.createRule(actor.id, actor.email, { orgId: 'org-acme', @@ -50,7 +53,8 @@ describe('PolicyService', () => { }) const updated = svc.updateRule(actor.id, actor.email, rule.id, { effect: 'deny' }) expect(updated.effect).toBe('deny') - const { logs } = audit.getLogs({ action: AuditAction.POLICY_RULE_UPDATED }) + await Promise.resolve() + const { logs } = await audit.getLogs({ action: AuditAction.POLICY_RULE_UPDATED }) expect(logs).toHaveLength(1) }) @@ -61,7 +65,7 @@ describe('PolicyService', () => { }) describe('deleteRule', () => { - it('removes the rule and emits POLICY_RULE_DELETED audit entry', () => { + it('removes the rule and emits POLICY_RULE_DELETED audit entry', async () => { const { svc, audit } = makeService() const rule = svc.createRule(actor.id, actor.email, { orgId: 'org-acme', @@ -72,7 +76,8 @@ describe('PolicyService', () => { }) svc.deleteRule(actor.id, actor.email, rule.id) expect(svc.getRule(rule.id)).toBeNull() - const { logs } = audit.getLogs({ action: AuditAction.POLICY_RULE_DELETED }) + await Promise.resolve() + const { logs } = await audit.getLogs({ action: AuditAction.POLICY_RULE_DELETED }) expect(logs).toHaveLength(1) }) diff --git a/src/services/webhooks/service.ts b/src/services/webhooks/service.ts index 7923103..b6c7939 100644 --- a/src/services/webhooks/service.ts +++ b/src/services/webhooks/service.ts @@ -1,7 +1,8 @@ import { randomBytes } from 'crypto' -import type { WebhookStore, WebhookEventType, WebhookPayload, WebhookDeliveryResult, WebhookConfig } from './types.js' +import type { WebhookStore, WebhookEventType, WebhookPayload, WebhookDeliveryResult, WebhookConfig, DlqStore } from './types.js' import { deliverWebhook, type DeliveryOptions } from './delivery.js' import { type AuditLogService, AuditAction } from '../audit/index.js' +import { buildDlqEntry } from './dlq.js' /** * Webhook service for delivering bond lifecycle events. @@ -12,8 +13,9 @@ export class WebhookService { constructor( private readonly store: WebhookStore, + private readonly deliveryOptions?: DeliveryOptions, + private readonly dlq?: DlqStore, private readonly auditLog?: AuditLogService, - private readonly deliveryOptions?: DeliveryOptions ) {} /** @@ -133,8 +135,9 @@ export class WebhookService { */ export function createWebhookService( store: WebhookStore, + deliveryOptions?: DeliveryOptions, + dlq?: DlqStore, auditLog?: AuditLogService, - deliveryOptions?: DeliveryOptions ): WebhookService { - return new WebhookService(store, auditLog, deliveryOptions) + return new WebhookService(store, deliveryOptions, dlq, auditLog) }