From 55a9b70e8c3fac007e4fbae8d5697ff36ece6c70 Mon Sep 17 00:00:00 2001 From: Itodo-S Date: Sat, 25 Apr 2026 14:21:57 +0100 Subject: [PATCH] feat(data): add org retention enforcement job with audit + dry-run --- src/config/retention.ts | 87 +++++++++++ src/jobs/dataRetentionJob.test.ts | 170 +++++++++++++++++++++ src/jobs/dataRetentionJob.ts | 128 ++++++++++++++++ src/jobs/index.ts | 1 + src/repositories/retentionRepository.ts | 195 ++++++++++++++++++++++++ 5 files changed, 581 insertions(+) create mode 100644 src/config/retention.ts create mode 100644 src/jobs/dataRetentionJob.test.ts create mode 100644 src/jobs/dataRetentionJob.ts create mode 100644 src/repositories/retentionRepository.ts diff --git a/src/config/retention.ts b/src/config/retention.ts new file mode 100644 index 0000000..cc82bc4 --- /dev/null +++ b/src/config/retention.ts @@ -0,0 +1,87 @@ +/** + * Org-level data-retention configuration. + * + * Each entity type can have an independent TTL (time-to-live) expressed in + * days. A value of `0` means "keep forever" (no pruning for that type). + * + * Settings are consumed by the `DataRetentionJob` and can be overridden via + * environment variables, making it straightforward to adjust without a code + * deploy. + */ + +export interface EntityRetentionConfig { + /** Days to keep records after their `created_at` timestamp. 0 = keep forever. */ + ttlDays: number +} + +export interface RetentionConfig { + /** + * When true the job logs what *would* be deleted without touching the DB. + * Default: false. + */ + dryRun: boolean + + /** Maximum rows deleted per entity type per run (prevents runaway deletes). */ + batchLimit: number + + /** Per-entity TTL configuration. */ + entities: { + scoreHistory: EntityRetentionConfig + auditLogs: EntityRetentionConfig + slashEvents: EntityRetentionConfig + outboxEvents: EntityRetentionConfig + } +} + +export const DEFAULT_RETENTION_CONFIG: RetentionConfig = { + dryRun: false, + batchLimit: 5_000, + entities: { + scoreHistory: { ttlDays: 90 }, + auditLogs: { ttlDays: 365 }, + slashEvents: { ttlDays: 0 }, + outboxEvents: { ttlDays: 30 }, + }, +} + +function parseTtl(raw: string | undefined, fallback: number): number { + if (raw === undefined || raw === '') return fallback + const n = Number(raw) + return Number.isFinite(n) && n >= 0 ? Math.floor(n) : fallback +} + +export function loadRetentionConfig( + env: Record = process.env, + defaults: RetentionConfig = DEFAULT_RETENTION_CONFIG, +): RetentionConfig { + return { + dryRun: (env.RETENTION_DRY_RUN ?? '').toLowerCase() === 'true', + batchLimit: parseTtl(env.RETENTION_BATCH_LIMIT, defaults.batchLimit), + entities: { + scoreHistory: { + ttlDays: parseTtl( + env.RETENTION_TTL_SCORE_HISTORY_DAYS, + defaults.entities.scoreHistory.ttlDays, + ), + }, + auditLogs: { + ttlDays: parseTtl( + env.RETENTION_TTL_AUDIT_LOGS_DAYS, + defaults.entities.auditLogs.ttlDays, + ), + }, + slashEvents: { + ttlDays: parseTtl( + env.RETENTION_TTL_SLASH_EVENTS_DAYS, + defaults.entities.slashEvents.ttlDays, + ), + }, + outboxEvents: { + ttlDays: parseTtl( + env.RETENTION_TTL_OUTBOX_EVENTS_DAYS, + defaults.entities.outboxEvents.ttlDays, + ), + }, + }, + } +} diff --git a/src/jobs/dataRetentionJob.test.ts b/src/jobs/dataRetentionJob.test.ts new file mode 100644 index 0000000..938e804 --- /dev/null +++ b/src/jobs/dataRetentionJob.test.ts @@ -0,0 +1,170 @@ +/** + * Unit tests for DataRetentionJob + RetentionRepository + * + * Uses vitest with mock Queryable implementations — no live database needed. + */ + +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { DataRetentionJob } from './dataRetentionJob.js' +import type { RetentionConfig } from '../config/retention.js' +import type { Queryable } from '../db/repositories/queryable.js' + +// ── Helpers ─────────────────────────────────────────────────────────────── + +function makeConfig(overrides: Partial = {}): RetentionConfig { + return { + dryRun: false, + batchLimit: 100, + entities: { + scoreHistory: { ttlDays: 90 }, + auditLogs: { ttlDays: 365 }, + slashEvents: { ttlDays: 30 }, + outboxEvents: { ttlDays: 30 }, + }, + ...overrides, + } +} + +function makeDb(countResponse = 0, deleteResponse = 0): Queryable { + return { + query: vi.fn().mockImplementation((sql: string) => { + if (sql.includes('COUNT(*)')) { + return Promise.resolve({ rows: [{ cnt: String(countResponse) }], rowCount: 1 }) + } + return Promise.resolve({ rows: [], rowCount: deleteResponse }) + }), + } +} + +// ── Tests ───────────────────────────────────────────────────────────────── + +describe('DataRetentionJob', () => { + let logs: string[] + + beforeEach(() => { + logs = [] + }) + + it('returns zero totals when nothing is expired', async () => { + const db = makeDb(0, 0) + const job = new DataRetentionJob(db, makeConfig(), (m) => logs.push(m)) + + const result = await job.run() + + expect(result.totalExpired).toBe(0) + expect(result.totalDeleted).toBe(0) + expect(result.dryRun).toBe(false) + expect(result.entities).toHaveLength(4) + }) + + it('deletes expired rows and sums counts correctly', async () => { + const db = makeDb(10, 10) + const job = new DataRetentionJob(db, makeConfig(), (m) => logs.push(m)) + + const result = await job.run() + + expect(result.totalExpired).toBe(40) // 4 entities × 10 + expect(result.totalDeleted).toBe(40) + expect(result.entities.every((e) => e.deletedCount === 10)).toBe(true) + }) + + it('records startTime as valid ISO string and non-negative duration', async () => { + const job = new DataRetentionJob(makeDb(), makeConfig()) + const result = await job.run() + + expect(typeof result.startTime).toBe('string') + expect(new Date(result.startTime).getTime()).toBeGreaterThan(0) + expect(result.duration).toBeGreaterThanOrEqual(0) + }) + + it('does not issue DELETE queries in dry-run mode', async () => { + const db = makeDb(5, 5) + const job = new DataRetentionJob(db, makeConfig({ dryRun: true }), (m) => logs.push(m)) + + const result = await job.run() + + expect(result.dryRun).toBe(true) + expect(result.totalDeleted).toBe(0) + expect(result.totalExpired).toBe(20) // 4 × 5, COUNT still runs + + const queryCalls = (db.query as ReturnType).mock.calls as [string][] + const deleteCalls = queryCalls.filter(([sql]) => sql.trim().startsWith('WITH rows AS')) + expect(deleteCalls).toHaveLength(0) + }) + + it('marks all entity audits dryRun=true when in dry-run mode', async () => { + const job = new DataRetentionJob(makeDb(3, 3), makeConfig({ dryRun: true })) + const result = await job.run() + + expect(result.entities.every((e) => e.dryRun === true)).toBe(true) + }) + + it('skips all queries when all entities have ttlDays=0', async () => { + const config = makeConfig({ + entities: { + scoreHistory: { ttlDays: 0 }, + auditLogs: { ttlDays: 0 }, + slashEvents: { ttlDays: 0 }, + outboxEvents: { ttlDays: 0 }, + }, + }) + const db = makeDb(99, 99) + const job = new DataRetentionJob(db, config, (m) => logs.push(m)) + + const result = await job.run() + + expect(result.totalExpired).toBe(0) + expect(result.totalDeleted).toBe(0) + expect((db.query as ReturnType).mock.calls).toHaveLength(0) + }) + + it('only skips the entity with ttlDays=0, processes others', async () => { + const config = makeConfig({ + entities: { + scoreHistory: { ttlDays: 0 }, + auditLogs: { ttlDays: 365 }, + slashEvents: { ttlDays: 30 }, + outboxEvents: { ttlDays: 30 }, + }, + }) + const db = makeDb(4, 4) + const job = new DataRetentionJob(db, config) + const result = await job.run() + + const scoreEntity = result.entities.find((e) => e.entity === 'score_history')! + expect(scoreEntity.expiredCount).toBe(0) + expect(scoreEntity.deletedCount).toBe(0) + expect(result.totalExpired).toBe(12) // 3 active entities × 4 + expect(result.totalDeleted).toBe(12) + }) + + it('includes all 4 entity types in result', async () => { + const job = new DataRetentionJob(makeDb(), makeConfig()) + const result = await job.run() + + const names = result.entities.map((e) => e.entity).sort() + expect(names).toEqual( + ['audit_logs', 'outbox_events', 'score_history', 'slash_events'].sort(), + ) + }) + + it('logs start and completion messages', async () => { + const job = new DataRetentionJob(makeDb(), makeConfig(), (m) => logs.push(m)) + await job.run() + + expect(logs.some((l) => l.includes('Starting run'))).toBe(true) + expect(logs.some((l) => l.includes('Run complete'))).toBe(true) + }) + + it('passes batchLimit to DELETE queries', async () => { + const db = makeDb(10, 5) + const job = new DataRetentionJob(db, makeConfig({ batchLimit: 5 })) + await job.run() + + const calls = (db.query as ReturnType).mock.calls as [string, unknown[]][] + const deleteCalls = calls.filter(([sql]) => sql.trim().startsWith('WITH rows AS')) + deleteCalls.forEach(([, params]) => { + expect(params?.[1]).toBe(5) + }) + }) +}) diff --git a/src/jobs/dataRetentionJob.ts b/src/jobs/dataRetentionJob.ts new file mode 100644 index 0000000..53c30d3 --- /dev/null +++ b/src/jobs/dataRetentionJob.ts @@ -0,0 +1,128 @@ +/** + * DataRetentionJob + * + * Enforces org-level data retention by pruning records that exceed their TTL + * from each configured entity table. The job supports: + * + * - Per-entity configurable TTL (0 = keep forever) + * - Dry-run mode: logs what *would* be deleted without mutating the DB + * - Structured audit output per run (entity, count, dryRun flag) + * - Batch limits to avoid locking large tables + */ + +import type { RetentionConfig } from '../config/retention.js' +import { RetentionRepository } from '../repositories/retentionRepository.js' +import type { Queryable } from '../db/repositories/queryable.js' + +export interface RetentionEntityAudit { + entity: string + expiredCount: number + deletedCount: number + ttlDays: number + dryRun: boolean +} + +export interface DataRetentionResult { + startTime: string + duration: number + dryRun: boolean + entities: RetentionEntityAudit[] + totalDeleted: number + totalExpired: number +} + +export class DataRetentionJob { + private readonly repo: RetentionRepository + private readonly logger: (msg: string) => void + + constructor( + private readonly db: Queryable, + private readonly config: RetentionConfig, + logger?: (msg: string) => void, + ) { + this.repo = new RetentionRepository(db, config.dryRun) + this.logger = logger ?? (() => {}) + } + + async run(): Promise { + const start = Date.now() + const startTime = new Date().toISOString() + const { dryRun, batchLimit, entities } = this.config + + this.logger( + `[retention] Starting run — dryRun=${dryRun} batchLimit=${batchLimit}`, + ) + + const audits: RetentionEntityAudit[] = await Promise.all([ + this.processEntity( + 'score_history', + entities.scoreHistory.ttlDays, + batchLimit, + () => this.repo.countExpiredScoreHistory(entities.scoreHistory.ttlDays), + () => this.repo.deleteExpiredScoreHistory(entities.scoreHistory.ttlDays, batchLimit), + ), + this.processEntity( + 'audit_logs', + entities.auditLogs.ttlDays, + batchLimit, + () => this.repo.countExpiredAuditLogs(entities.auditLogs.ttlDays), + () => this.repo.deleteExpiredAuditLogs(entities.auditLogs.ttlDays, batchLimit), + ), + this.processEntity( + 'slash_events', + entities.slashEvents.ttlDays, + batchLimit, + () => this.repo.countExpiredSlashEvents(entities.slashEvents.ttlDays), + () => this.repo.deleteExpiredSlashEvents(entities.slashEvents.ttlDays, batchLimit), + ), + this.processEntity( + 'outbox_events', + entities.outboxEvents.ttlDays, + batchLimit, + () => this.repo.countExpiredOutboxEvents(entities.outboxEvents.ttlDays), + () => this.repo.deleteExpiredOutboxEvents(entities.outboxEvents.ttlDays, batchLimit), + ), + ]) + + const totalDeleted = audits.reduce((sum, a) => sum + a.deletedCount, 0) + const totalExpired = audits.reduce((sum, a) => sum + a.expiredCount, 0) + const duration = Date.now() - start + + this.logger( + `[retention] Run complete — totalExpired=${totalExpired} totalDeleted=${totalDeleted} duration=${duration}ms`, + ) + + return { startTime, duration, dryRun, entities: audits, totalDeleted, totalExpired } + } + + private async processEntity( + name: string, + ttlDays: number, + batchLimit: number, + countFn: () => Promise<{ expiredCount: number }>, + deleteFn: () => Promise<{ deletedCount: number; dryRun: boolean }>, + ): Promise { + if (ttlDays === 0) { + this.logger(`[retention] ${name} — ttlDays=0, skipping`) + return { entity: name, expiredCount: 0, deletedCount: 0, ttlDays: 0, dryRun: this.config.dryRun } + } + + const { expiredCount } = await countFn() + + this.logger( + `[retention] ${name} — ttlDays=${ttlDays} expiredCount=${expiredCount}${this.config.dryRun ? ' (dry-run)' : ''}`, + ) + + if (expiredCount === 0) { + return { entity: name, expiredCount: 0, deletedCount: 0, ttlDays, dryRun: this.config.dryRun } + } + + const { deletedCount, dryRun } = await deleteFn() + + if (!dryRun) { + this.logger(`[retention] ${name} — deleted ${deletedCount} rows`) + } + + return { entity: name, expiredCount, deletedCount, ttlDays, dryRun } + } +} diff --git a/src/jobs/index.ts b/src/jobs/index.ts index 619b5ed..5979c1d 100644 --- a/src/jobs/index.ts +++ b/src/jobs/index.ts @@ -8,3 +8,4 @@ export * from './invoiceDueDateWorker.js' export * from './exportTypes.js' export * from './exportWorker.js' export * from './batchPayoutProcessor.js' +export * from './dataRetentionJob.js' diff --git a/src/repositories/retentionRepository.ts b/src/repositories/retentionRepository.ts new file mode 100644 index 0000000..1f9aaa0 --- /dev/null +++ b/src/repositories/retentionRepository.ts @@ -0,0 +1,195 @@ +/** + * RetentionRepository + * + * Provides count-then-delete helpers for each entity type managed by the + * data-retention job. All mutating methods are no-ops when `dryRun` is true + * so the caller never needs to branch on that flag. + */ + +import type { Queryable } from '../db/repositories/queryable.js' + +export interface RetentionCountResult { + entity: string + expiredCount: number + ttlDays: number +} + +export interface RetentionDeleteResult { + entity: string + deletedCount: number + ttlDays: number + dryRun: boolean +} + +export class RetentionRepository { + constructor( + private readonly db: Queryable, + private readonly dryRun: boolean = false, + ) {} + + // ── score_history ────────────────────────────────────────────────────── + + async countExpiredScoreHistory(ttlDays: number): Promise { + if (ttlDays === 0) return { entity: 'score_history', expiredCount: 0, ttlDays } + const result = await this.db.query<{ cnt: string }>( + `SELECT COUNT(*)::text AS cnt FROM score_history + WHERE computed_at < NOW() - ($1 || ' days')::interval`, + [ttlDays], + ) + return { + entity: 'score_history', + expiredCount: parseInt(result.rows[0]?.cnt ?? '0', 10), + ttlDays, + } + } + + async deleteExpiredScoreHistory( + ttlDays: number, + batchLimit: number, + ): Promise { + if (ttlDays === 0 || this.dryRun) { + return { entity: 'score_history', deletedCount: 0, ttlDays, dryRun: this.dryRun } + } + const result = await this.db.query<{ cnt: string }>( + `WITH rows AS ( + SELECT id FROM score_history + WHERE computed_at < NOW() - ($1 || ' days')::interval + LIMIT $2 + ) + DELETE FROM score_history WHERE id IN (SELECT id FROM rows) + RETURNING 1`, + [ttlDays, batchLimit], + ) + return { + entity: 'score_history', + deletedCount: result.rowCount ?? 0, + ttlDays, + dryRun: false, + } + } + + // ── audit_logs ───────────────────────────────────────────────────────── + + async countExpiredAuditLogs(ttlDays: number): Promise { + if (ttlDays === 0) return { entity: 'audit_logs', expiredCount: 0, ttlDays } + const result = await this.db.query<{ cnt: string }>( + `SELECT COUNT(*)::text AS cnt FROM audit_logs + WHERE occurred_at < NOW() - ($1 || ' days')::interval`, + [ttlDays], + ) + return { + entity: 'audit_logs', + expiredCount: parseInt(result.rows[0]?.cnt ?? '0', 10), + ttlDays, + } + } + + async deleteExpiredAuditLogs( + ttlDays: number, + batchLimit: number, + ): Promise { + if (ttlDays === 0 || this.dryRun) { + return { entity: 'audit_logs', deletedCount: 0, ttlDays, dryRun: this.dryRun } + } + const result = await this.db.query<{ cnt: string }>( + `WITH rows AS ( + SELECT id FROM audit_logs + WHERE occurred_at < NOW() - ($1 || ' days')::interval + LIMIT $2 + ) + DELETE FROM audit_logs WHERE id IN (SELECT id FROM rows) + RETURNING 1`, + [ttlDays, batchLimit], + ) + return { + entity: 'audit_logs', + deletedCount: result.rowCount ?? 0, + ttlDays, + dryRun: false, + } + } + + // ── slash_events ─────────────────────────────────────────────────────── + + async countExpiredSlashEvents(ttlDays: number): Promise { + if (ttlDays === 0) return { entity: 'slash_events', expiredCount: 0, ttlDays } + const result = await this.db.query<{ cnt: string }>( + `SELECT COUNT(*)::text AS cnt FROM slash_events + WHERE created_at < NOW() - ($1 || ' days')::interval`, + [ttlDays], + ) + return { + entity: 'slash_events', + expiredCount: parseInt(result.rows[0]?.cnt ?? '0', 10), + ttlDays, + } + } + + async deleteExpiredSlashEvents( + ttlDays: number, + batchLimit: number, + ): Promise { + if (ttlDays === 0 || this.dryRun) { + return { entity: 'slash_events', deletedCount: 0, ttlDays, dryRun: this.dryRun } + } + const result = await this.db.query<{ cnt: string }>( + `WITH rows AS ( + SELECT id FROM slash_events + WHERE created_at < NOW() - ($1 || ' days')::interval + LIMIT $2 + ) + DELETE FROM slash_events WHERE id IN (SELECT id FROM rows) + RETURNING 1`, + [ttlDays, batchLimit], + ) + return { + entity: 'slash_events', + deletedCount: result.rowCount ?? 0, + ttlDays, + dryRun: false, + } + } + + // ── outbox_events ────────────────────────────────────────────────────── + + async countExpiredOutboxEvents(ttlDays: number): Promise { + if (ttlDays === 0) return { entity: 'outbox_events', expiredCount: 0, ttlDays } + const result = await this.db.query<{ cnt: string }>( + `SELECT COUNT(*)::text AS cnt FROM event_outbox + WHERE created_at < NOW() - ($1 || ' days')::interval + AND status IN ('published', 'failed')`, + [ttlDays], + ) + return { + entity: 'outbox_events', + expiredCount: parseInt(result.rows[0]?.cnt ?? '0', 10), + ttlDays, + } + } + + async deleteExpiredOutboxEvents( + ttlDays: number, + batchLimit: number, + ): Promise { + if (ttlDays === 0 || this.dryRun) { + return { entity: 'outbox_events', deletedCount: 0, ttlDays, dryRun: this.dryRun } + } + const result = await this.db.query<{ cnt: string }>( + `WITH rows AS ( + SELECT id FROM event_outbox + WHERE created_at < NOW() - ($1 || ' days')::interval + AND status IN ('published', 'failed') + LIMIT $2 + ) + DELETE FROM event_outbox WHERE id IN (SELECT id FROM rows) + RETURNING 1`, + [ttlDays, batchLimit], + ) + return { + entity: 'outbox_events', + deletedCount: result.rowCount ?? 0, + ttlDays, + dryRun: false, + } + } +}