Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions src/config/retention.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Org-level data-retention configuration.
*
* Each entity type can have an independent TTL (time-to-live) expressed in
* days. A value of `0` means "keep forever" (no pruning for that type).
*
* Settings are consumed by the `DataRetentionJob` and can be overridden via
* environment variables, making it straightforward to adjust without a code
* deploy.
*/

export interface EntityRetentionConfig {
/** Days to keep records after their `created_at` timestamp. 0 = keep forever. */
ttlDays: number
}

export interface RetentionConfig {
/**
* When true the job logs what *would* be deleted without touching the DB.
* Default: false.
*/
dryRun: boolean

/** Maximum rows deleted per entity type per run (prevents runaway deletes). */
batchLimit: number

/** Per-entity TTL configuration. */
entities: {
scoreHistory: EntityRetentionConfig
auditLogs: EntityRetentionConfig
slashEvents: EntityRetentionConfig
outboxEvents: EntityRetentionConfig
}
}

export const DEFAULT_RETENTION_CONFIG: RetentionConfig = {
dryRun: false,
batchLimit: 5_000,
entities: {
scoreHistory: { ttlDays: 90 },
auditLogs: { ttlDays: 365 },
slashEvents: { ttlDays: 0 },
outboxEvents: { ttlDays: 30 },
},
}

function parseTtl(raw: string | undefined, fallback: number): number {
if (raw === undefined || raw === '') return fallback
const n = Number(raw)
return Number.isFinite(n) && n >= 0 ? Math.floor(n) : fallback
}

export function loadRetentionConfig(
env: Record<string, string | undefined> = process.env,
defaults: RetentionConfig = DEFAULT_RETENTION_CONFIG,
): RetentionConfig {
return {
dryRun: (env.RETENTION_DRY_RUN ?? '').toLowerCase() === 'true',
batchLimit: parseTtl(env.RETENTION_BATCH_LIMIT, defaults.batchLimit),
entities: {
scoreHistory: {
ttlDays: parseTtl(
env.RETENTION_TTL_SCORE_HISTORY_DAYS,
defaults.entities.scoreHistory.ttlDays,
),
},
auditLogs: {
ttlDays: parseTtl(
env.RETENTION_TTL_AUDIT_LOGS_DAYS,
defaults.entities.auditLogs.ttlDays,
),
},
slashEvents: {
ttlDays: parseTtl(
env.RETENTION_TTL_SLASH_EVENTS_DAYS,
defaults.entities.slashEvents.ttlDays,
),
},
outboxEvents: {
ttlDays: parseTtl(
env.RETENTION_TTL_OUTBOX_EVENTS_DAYS,
defaults.entities.outboxEvents.ttlDays,
),
},
},
}
}
170 changes: 170 additions & 0 deletions src/jobs/dataRetentionJob.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/**
* Unit tests for DataRetentionJob + RetentionRepository
*
* Uses vitest with mock Queryable implementations — no live database needed.
*/

import { describe, it, expect, vi, beforeEach } from 'vitest'
import { DataRetentionJob } from './dataRetentionJob.js'
import type { RetentionConfig } from '../config/retention.js'
import type { Queryable } from '../db/repositories/queryable.js'

// ── Helpers ───────────────────────────────────────────────────────────────

function makeConfig(overrides: Partial<RetentionConfig> = {}): RetentionConfig {
return {
dryRun: false,
batchLimit: 100,
entities: {
scoreHistory: { ttlDays: 90 },
auditLogs: { ttlDays: 365 },
slashEvents: { ttlDays: 30 },
outboxEvents: { ttlDays: 30 },
},
...overrides,
}
}

function makeDb(countResponse = 0, deleteResponse = 0): Queryable {
return {
query: vi.fn().mockImplementation((sql: string) => {
if (sql.includes('COUNT(*)')) {
return Promise.resolve({ rows: [{ cnt: String(countResponse) }], rowCount: 1 })
}
return Promise.resolve({ rows: [], rowCount: deleteResponse })
}),
}
}

// ── Tests ─────────────────────────────────────────────────────────────────

describe('DataRetentionJob', () => {
let logs: string[]

beforeEach(() => {
logs = []
})

it('returns zero totals when nothing is expired', async () => {
const db = makeDb(0, 0)
const job = new DataRetentionJob(db, makeConfig(), (m) => logs.push(m))

const result = await job.run()

expect(result.totalExpired).toBe(0)
expect(result.totalDeleted).toBe(0)
expect(result.dryRun).toBe(false)
expect(result.entities).toHaveLength(4)
})

it('deletes expired rows and sums counts correctly', async () => {
const db = makeDb(10, 10)
const job = new DataRetentionJob(db, makeConfig(), (m) => logs.push(m))

const result = await job.run()

expect(result.totalExpired).toBe(40) // 4 entities × 10
expect(result.totalDeleted).toBe(40)
expect(result.entities.every((e) => e.deletedCount === 10)).toBe(true)
})

it('records startTime as valid ISO string and non-negative duration', async () => {
const job = new DataRetentionJob(makeDb(), makeConfig())
const result = await job.run()

expect(typeof result.startTime).toBe('string')
expect(new Date(result.startTime).getTime()).toBeGreaterThan(0)
expect(result.duration).toBeGreaterThanOrEqual(0)
})

it('does not issue DELETE queries in dry-run mode', async () => {
const db = makeDb(5, 5)
const job = new DataRetentionJob(db, makeConfig({ dryRun: true }), (m) => logs.push(m))

const result = await job.run()

expect(result.dryRun).toBe(true)
expect(result.totalDeleted).toBe(0)
expect(result.totalExpired).toBe(20) // 4 × 5, COUNT still runs

const queryCalls = (db.query as ReturnType<typeof vi.fn>).mock.calls as [string][]
const deleteCalls = queryCalls.filter(([sql]) => sql.trim().startsWith('WITH rows AS'))
expect(deleteCalls).toHaveLength(0)
})

it('marks all entity audits dryRun=true when in dry-run mode', async () => {
const job = new DataRetentionJob(makeDb(3, 3), makeConfig({ dryRun: true }))
const result = await job.run()

expect(result.entities.every((e) => e.dryRun === true)).toBe(true)
})

it('skips all queries when all entities have ttlDays=0', async () => {
const config = makeConfig({
entities: {
scoreHistory: { ttlDays: 0 },
auditLogs: { ttlDays: 0 },
slashEvents: { ttlDays: 0 },
outboxEvents: { ttlDays: 0 },
},
})
const db = makeDb(99, 99)
const job = new DataRetentionJob(db, config, (m) => logs.push(m))

const result = await job.run()

expect(result.totalExpired).toBe(0)
expect(result.totalDeleted).toBe(0)
expect((db.query as ReturnType<typeof vi.fn>).mock.calls).toHaveLength(0)
})

it('only skips the entity with ttlDays=0, processes others', async () => {
const config = makeConfig({
entities: {
scoreHistory: { ttlDays: 0 },
auditLogs: { ttlDays: 365 },
slashEvents: { ttlDays: 30 },
outboxEvents: { ttlDays: 30 },
},
})
const db = makeDb(4, 4)
const job = new DataRetentionJob(db, config)
const result = await job.run()

const scoreEntity = result.entities.find((e) => e.entity === 'score_history')!
expect(scoreEntity.expiredCount).toBe(0)
expect(scoreEntity.deletedCount).toBe(0)
expect(result.totalExpired).toBe(12) // 3 active entities × 4
expect(result.totalDeleted).toBe(12)
})

it('includes all 4 entity types in result', async () => {
const job = new DataRetentionJob(makeDb(), makeConfig())
const result = await job.run()

const names = result.entities.map((e) => e.entity).sort()
expect(names).toEqual(
['audit_logs', 'outbox_events', 'score_history', 'slash_events'].sort(),
)
})

it('logs start and completion messages', async () => {
const job = new DataRetentionJob(makeDb(), makeConfig(), (m) => logs.push(m))
await job.run()

expect(logs.some((l) => l.includes('Starting run'))).toBe(true)
expect(logs.some((l) => l.includes('Run complete'))).toBe(true)
})

it('passes batchLimit to DELETE queries', async () => {
const db = makeDb(10, 5)
const job = new DataRetentionJob(db, makeConfig({ batchLimit: 5 }))
await job.run()

const calls = (db.query as ReturnType<typeof vi.fn>).mock.calls as [string, unknown[]][]
const deleteCalls = calls.filter(([sql]) => sql.trim().startsWith('WITH rows AS'))
deleteCalls.forEach(([, params]) => {
expect(params?.[1]).toBe(5)
})
})
})
128 changes: 128 additions & 0 deletions src/jobs/dataRetentionJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* DataRetentionJob
*
* Enforces org-level data retention by pruning records that exceed their TTL
* from each configured entity table. The job supports:
*
* - Per-entity configurable TTL (0 = keep forever)
* - Dry-run mode: logs what *would* be deleted without mutating the DB
* - Structured audit output per run (entity, count, dryRun flag)
* - Batch limits to avoid locking large tables
*/

import type { RetentionConfig } from '../config/retention.js'
import { RetentionRepository } from '../repositories/retentionRepository.js'
import type { Queryable } from '../db/repositories/queryable.js'

export interface RetentionEntityAudit {
entity: string
expiredCount: number
deletedCount: number
ttlDays: number
dryRun: boolean
}

export interface DataRetentionResult {
startTime: string
duration: number
dryRun: boolean
entities: RetentionEntityAudit[]
totalDeleted: number
totalExpired: number
}

export class DataRetentionJob {
private readonly repo: RetentionRepository
private readonly logger: (msg: string) => void

constructor(
private readonly db: Queryable,
private readonly config: RetentionConfig,
logger?: (msg: string) => void,
) {
this.repo = new RetentionRepository(db, config.dryRun)
this.logger = logger ?? (() => {})
}

async run(): Promise<DataRetentionResult> {
const start = Date.now()
const startTime = new Date().toISOString()
const { dryRun, batchLimit, entities } = this.config

this.logger(
`[retention] Starting run — dryRun=${dryRun} batchLimit=${batchLimit}`,
)

const audits: RetentionEntityAudit[] = await Promise.all([
this.processEntity(
'score_history',
entities.scoreHistory.ttlDays,
batchLimit,
() => this.repo.countExpiredScoreHistory(entities.scoreHistory.ttlDays),
() => this.repo.deleteExpiredScoreHistory(entities.scoreHistory.ttlDays, batchLimit),
),
this.processEntity(
'audit_logs',
entities.auditLogs.ttlDays,
batchLimit,
() => this.repo.countExpiredAuditLogs(entities.auditLogs.ttlDays),
() => this.repo.deleteExpiredAuditLogs(entities.auditLogs.ttlDays, batchLimit),
),
this.processEntity(
'slash_events',
entities.slashEvents.ttlDays,
batchLimit,
() => this.repo.countExpiredSlashEvents(entities.slashEvents.ttlDays),
() => this.repo.deleteExpiredSlashEvents(entities.slashEvents.ttlDays, batchLimit),
),
this.processEntity(
'outbox_events',
entities.outboxEvents.ttlDays,
batchLimit,
() => this.repo.countExpiredOutboxEvents(entities.outboxEvents.ttlDays),
() => this.repo.deleteExpiredOutboxEvents(entities.outboxEvents.ttlDays, batchLimit),
),
])

const totalDeleted = audits.reduce((sum, a) => sum + a.deletedCount, 0)
const totalExpired = audits.reduce((sum, a) => sum + a.expiredCount, 0)
const duration = Date.now() - start

this.logger(
`[retention] Run complete — totalExpired=${totalExpired} totalDeleted=${totalDeleted} duration=${duration}ms`,
)

return { startTime, duration, dryRun, entities: audits, totalDeleted, totalExpired }
}

private async processEntity(
name: string,
ttlDays: number,
batchLimit: number,
countFn: () => Promise<{ expiredCount: number }>,
deleteFn: () => Promise<{ deletedCount: number; dryRun: boolean }>,
): Promise<RetentionEntityAudit> {
if (ttlDays === 0) {
this.logger(`[retention] ${name} — ttlDays=0, skipping`)
return { entity: name, expiredCount: 0, deletedCount: 0, ttlDays: 0, dryRun: this.config.dryRun }
}

const { expiredCount } = await countFn()

this.logger(
`[retention] ${name} — ttlDays=${ttlDays} expiredCount=${expiredCount}${this.config.dryRun ? ' (dry-run)' : ''}`,
)

if (expiredCount === 0) {
return { entity: name, expiredCount: 0, deletedCount: 0, ttlDays, dryRun: this.config.dryRun }
}

const { deletedCount, dryRun } = await deleteFn()

if (!dryRun) {
this.logger(`[retention] ${name} — deleted ${deletedCount} rows`)
}

return { entity: name, expiredCount, deletedCount, ttlDays, dryRun }
}
}
Loading
Loading