Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 11 additions & 22 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { loadConfig } from './config/index.js'
import { pool } from './db/pool.js'
import { AnalyticsService } from './services/analytics/service.js'
import { AnalyticsRefreshWorker, getAnalyticsRefreshIntervalMs } from './jobs/analyticsRefreshWorker.js'
import { AnalyticsRefreshScheduler } from './jobs/analyticsRefreshScheduler.js'
import { createAnalyticsRefreshMetrics } from './jobs/analyticsRefreshMetrics.js'
import { keyManager } from './services/keyManager/index.js'

app.use('/api/admin', createAdminRouter())
Expand All @@ -31,31 +33,18 @@ if (process.env.NODE_ENV !== 'test') {
if (process.env.DATABASE_URL) {
const thresholdSeconds = Number(process.env.ANALYTICS_STALENESS_SECONDS ?? '300')
const analyticsService = new AnalyticsService(pool, thresholdSeconds)
const refreshWorker = new AnalyticsRefreshWorker(analyticsService, console.log)
const metrics = createAnalyticsRefreshMetrics()
const refreshWorker = new AnalyticsRefreshWorker(analyticsService, console.log, metrics)
const intervalMs = getAnalyticsRefreshIntervalMs()
let running = false

const tick = async (): Promise<void> => {
if (running) {
console.log('Analytics refresh is already running, skipping interval')
return
}
running = true
try {
await refreshWorker.run()
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown refresh error'
console.error(`Analytics refresh failed: ${message}`)
} finally {
running = false
}
}
const scheduler = new AnalyticsRefreshScheduler(refreshWorker, {
intervalMs,
runOnStart: true,
logger: console.log,
metrics,
})

// Run once on startup, then periodically.
void tick()
setInterval(() => {
void tick()
}, intervalMs)
scheduler.start()
}
} catch (error) {
console.error('Failed to start Credence API:', error)
Expand Down
45 changes: 45 additions & 0 deletions src/jobs/analyticsRefreshMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import client from 'prom-client'
import { register } from '../middleware/metrics.js'

export const analyticsRefreshRunsTotal = new client.Counter({
name: 'analytics_refresh_runs_total',
help: 'Total number of analytics materialized view refresh attempts',
labelNames: ['status'] as const,
registers: [register],
})

export const analyticsRefreshDurationSeconds = new client.Histogram({
name: 'analytics_refresh_duration_seconds',
help: 'Duration of analytics materialized view REFRESH CONCURRENTLY in seconds',
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60],
registers: [register],
})

export const analyticsViewAgeSeconds = new client.Gauge({
name: 'analytics_view_age_seconds',
help: 'Age of the analytics_metrics_mv snapshot in seconds at the time of the last successful refresh',
registers: [register],
})

export const analyticsSchedulerSkipsTotal = new client.Counter({
name: 'analytics_scheduler_skips_total',
help: 'Total number of scheduler ticks skipped due to overlap or distributed lock contention',
labelNames: ['reason'] as const,
registers: [register],
})

export interface AnalyticsRefreshMetrics {
incRuns(status: 'success' | 'error'): void
observeDuration(seconds: number): void
setViewAge(seconds: number): void
incSkip(reason: 'overlap' | 'lock_contention'): void
}

export function createAnalyticsRefreshMetrics(): AnalyticsRefreshMetrics {
return {
incRuns: (status) => analyticsRefreshRunsTotal.inc({ status }),
observeDuration: (seconds) => analyticsRefreshDurationSeconds.observe(seconds),
setViewAge: (seconds) => analyticsViewAgeSeconds.set(seconds),
incSkip: (reason) => analyticsSchedulerSkipsTotal.inc({ reason }),
}
}
208 changes: 208 additions & 0 deletions src/jobs/analyticsRefreshScheduler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'
import { AnalyticsRefreshScheduler } from './analyticsRefreshScheduler.js'
import type { AnalyticsRefreshWorker, AnalyticsRefreshWorkerResult } from './analyticsRefreshWorker.js'
import type { DistributedLock } from './distributedLock.js'
import type { AnalyticsRefreshMetrics } from './analyticsRefreshMetrics.js'

const SUCCESS_RESULT: AnalyticsRefreshWorkerResult = {
refreshed: true,
duration: 42,
startTime: '2026-04-25T00:00:00.000Z',
}

function makeWorker(result: AnalyticsRefreshWorkerResult = SUCCESS_RESULT): AnalyticsRefreshWorker {
return { run: vi.fn().mockResolvedValue(result) } as unknown as AnalyticsRefreshWorker
}

function makeMetrics(): AnalyticsRefreshMetrics {
return {
incRuns: vi.fn(),
observeDuration: vi.fn(),
setViewAge: vi.fn(),
incSkip: vi.fn(),
}
}

describe('AnalyticsRefreshScheduler', () => {
beforeEach(() => {
vi.useFakeTimers()
})

afterEach(() => {
vi.useRealTimers()
})

it('starts and registers an interval', () => {
const worker = makeWorker()
const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 5000 })

scheduler.start()
expect(scheduler.isActive()).toBe(true)

scheduler.stop()
expect(scheduler.isActive()).toBe(false)
})

it('does not register a second interval if already started', () => {
const worker = makeWorker()
const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 5000 })

scheduler.start()
scheduler.start()

expect(worker.run).not.toHaveBeenCalled()
scheduler.stop()
})

it('runs worker on start when runOnStart is true', async () => {
const worker = makeWorker()
const scheduler = new AnalyticsRefreshScheduler(worker, {
intervalMs: 60_000,
runOnStart: true,
})

scheduler.start()
await vi.runAllTimersAsync()

expect(worker.run).toHaveBeenCalledOnce()
scheduler.stop()
})

it('runs worker on each interval tick', async () => {
const worker = makeWorker()
const scheduler = new AnalyticsRefreshScheduler(worker, { intervalMs: 1000 })

scheduler.start()
await vi.advanceTimersByTimeAsync(3100)

expect(worker.run).toHaveBeenCalledTimes(3)
scheduler.stop()
})

it('skips tick and increments overlap metric when already running', async () => {
let resolve!: () => void
const slowWorker = {
run: vi.fn().mockImplementation(
() => new Promise<AnalyticsRefreshWorkerResult>((r) => { resolve = () => r(SUCCESS_RESULT) }),
),
} as unknown as AnalyticsRefreshWorker

const metrics = makeMetrics()
const scheduler = new AnalyticsRefreshScheduler(slowWorker, {
intervalMs: 500,
runOnStart: true,
metrics,
})

scheduler.start()
// First tick fires (runOnStart) but hasn't resolved yet
await vi.advanceTimersByTimeAsync(600)
// Second tick fires while first is still running → should skip
expect(metrics.incSkip).toHaveBeenCalledWith('overlap')

resolve()
scheduler.stop()
})

it('skips tick and increments lock_contention metric when distributed lock is held', async () => {
const worker = makeWorker()
const metrics = makeMetrics()

const lock: DistributedLock = {
withLock: vi.fn().mockResolvedValue({ executed: false }),
acquire: vi.fn(),
release: vi.fn(),
heartbeat: vi.fn(),
getMetrics: vi.fn().mockReturnValue({ contentions: 1, acquisitions: 0, releases: 0, heartbeats: 0, errors: 0 }),
resetMetrics: vi.fn(),
} as unknown as DistributedLock

const scheduler = new AnalyticsRefreshScheduler(worker, {
intervalMs: 1000,
distributedLock: lock,
metrics,
})

scheduler.start()
await vi.advanceTimersByTimeAsync(1100)

expect(metrics.incSkip).toHaveBeenCalledWith('lock_contention')
expect(worker.run).not.toHaveBeenCalled()
scheduler.stop()
})

it('delegates to worker via distributed lock when lock is acquired', async () => {
const worker = makeWorker()

const lock: DistributedLock = {
withLock: vi.fn().mockImplementation(async (_key: string, fn: () => Promise<void>) => {
await fn()
return { executed: true }
}),
acquire: vi.fn(),
release: vi.fn(),
heartbeat: vi.fn(),
getMetrics: vi.fn(),
resetMetrics: vi.fn(),
} as unknown as DistributedLock

const scheduler = new AnalyticsRefreshScheduler(worker, {
intervalMs: 1000,
distributedLock: lock,
})

scheduler.start()
await vi.advanceTimersByTimeAsync(1100)

expect(worker.run).toHaveBeenCalledOnce()
scheduler.stop()
})

it('exposes status with run count and last result', async () => {
const worker = makeWorker()
const scheduler = new AnalyticsRefreshScheduler(worker, {
intervalMs: 1000,
runOnStart: true,
})

scheduler.start()
await vi.runAllTimersAsync()

const status = scheduler.getStatus()
expect(status.active).toBe(true)
expect(status.runCount).toBe(1)
expect(status.lastResult).toEqual(SUCCESS_RESULT)
expect(status.isRunning).toBe(false)

scheduler.stop()
expect(scheduler.getStatus().active).toBe(false)
})

it('uses custom lock key when provided', async () => {
const worker = makeWorker()
const lock: DistributedLock = {
withLock: vi.fn().mockResolvedValue({ executed: true, result: undefined }),
acquire: vi.fn(),
release: vi.fn(),
heartbeat: vi.fn(),
getMetrics: vi.fn(),
resetMetrics: vi.fn(),
} as unknown as DistributedLock

const scheduler = new AnalyticsRefreshScheduler(worker, {
intervalMs: 1000,
distributedLock: lock,
lockKey: 'custom:analytics-lock',
})

scheduler.start()
await vi.advanceTimersByTimeAsync(1100)

expect(lock.withLock).toHaveBeenCalledWith(
'custom:analytics-lock',
expect.any(Function),
expect.any(Object),
)
scheduler.stop()
})
})
Loading
Loading