From b3f24a58997730189b876ba2380139d5ed7f247d Mon Sep 17 00:00:00 2001 From: xMACANJAWAx <97346103+macan88@users.noreply.github.com> Date: Fri, 13 Mar 2026 01:21:42 +0700 Subject: [PATCH 1/3] feat(job-queue): resilient background job retry & monitoring (#130) --- app/src/lib/jobQueue.ts | 263 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 app/src/lib/jobQueue.ts diff --git a/app/src/lib/jobQueue.ts b/app/src/lib/jobQueue.ts new file mode 100644 index 00000000..26c50d74 --- /dev/null +++ b/app/src/lib/jobQueue.ts @@ -0,0 +1,263 @@ +/** + * jobQueue.ts + * + * Resilient background job queue with exponential backoff retry and monitoring. + * + * WHY: Issue #130 requires production-ready async job execution with retry logic + * and monitoring hooks. This module provides a reusable JobQueue and JobMonitor + * that wraps any async function, retries transient failures with exponential + * backoff, and emits lifecycle events for observability. + */ + +export type JobStatus = 'PENDING' | 'RUNNING' | 'COMPLETED' | 'FAILED' | 'DEAD'; + +export interface JobRecord { + id: string; + status: JobStatus; + result?: T; + error?: Error; + attempts: number; + createdAt: number; + updatedAt: number; +} + +export interface RetryPolicy { + /** Maximum number of total attempts (1 = no retries). Default: 3 */ + maxAttempts: number; + /** Base delay in ms for exponential backoff. Default: 200 */ + baseDelayMs: number; + /** + * Predicate to decide if an error is retryable. + * WHY: We must NOT retry permanent errors (401 auth, 400 bad request) + * because retrying them wastes resources and will never succeed. + * Only transient errors (5xx, network timeouts) should be retried. + */ + isRetryable: (error: Error) => boolean; +} + +export interface MonitorHooks { + onSuccess?: (job: JobRecord) => void; + onFailure?: (job: JobRecord, error: Error) => void; + onRetry?: (job: JobRecord, attempt: number, error: Error) => void; + onDead?: (job: JobRecord) => void; +} + +/** Default retry policy: retry up to 3 attempts, skip 4xx errors. */ +export const defaultRetryPolicy: RetryPolicy = { + maxAttempts: 3, + baseDelayMs: 200, + isRetryable: (error: Error) => { + // WHY: HTTP 4xx errors are client errors and will never succeed on retry. + // We check the message for status codes as a pragmatic heuristic since + // we don't have a typed HttpError in the current API layer. + const msg = error.message ?? ''; + const permanentPattern = /\b4\d{2}\b/; + return !permanentPattern.test(msg); + }, +}; + +/** + * Generates a short unique ID for each job. + * WHY: Jobs need stable IDs so monitors and callers can correlate records + * across async boundaries without external libraries. + */ +function generateId(): string { + return `job_${Date.now()}_${Math.random().toString(36).slice(2, 9)}`; +} + +/** + * Returns a promise that resolves after `ms` milliseconds. + * WHY: We need a controllable delay for exponential backoff between retries. + * Using jest.useFakeTimers in tests allows us to fast-forward time. + */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * JobMonitor + * + * Stores job records and exposes query helpers + lifecycle hooks. + * WHY: Separating monitoring from execution keeps the queue logic + * focused on retry mechanics while the monitor handles observability. + * Callers can attach hooks for metrics, alerting, or dead-letter processing. + */ +export class JobMonitor { + private records = new Map>(); + private hooks: MonitorHooks; + + constructor(hooks: MonitorHooks = {}) { + this.hooks = hooks; + } + + /** @internal Called by JobQueue to register a new job. */ + _register(id: string): JobRecord { + const record: JobRecord = { + id, + status: 'PENDING', + attempts: 0, + createdAt: Date.now(), + updatedAt: Date.now(), + }; + this.records.set(id, record); + return record; + } + + /** @internal Called by JobQueue on each attempt start. */ + _markRunning(id: string): void { + const r = this._get(id); + r.status = 'RUNNING'; + r.attempts += 1; + r.updatedAt = Date.now(); + } + + /** @internal Called by JobQueue on successful completion. */ + _markCompleted(id: string, result: T): void { + const r = this._get(id); + r.status = 'COMPLETED'; + r.result = result; + r.updatedAt = Date.now(); + this.hooks.onSuccess?.(r); + } + + /** @internal Called by JobQueue when a retryable error occurs. */ + _markRetrying(id: string, error: Error): void { + const r = this._get(id); + r.error = error; + r.updatedAt = Date.now(); + this.hooks.onRetry?.(r, r.attempts, error); + } + + /** @internal Called by JobQueue on non-retryable or final failure. */ + _markFailed(id: string, error: Error, dead: boolean): void { + const r = this._get(id); + r.status = dead ? 'DEAD' : 'FAILED'; + r.error = error; + r.updatedAt = Date.now(); + if (dead) { + this.hooks.onDead?.(r); + } else { + this.hooks.onFailure?.(r, error); + } + } + + /** Returns the current status of a job by ID. */ + getJobStatus(id: string): JobStatus | undefined { + return this.records.get(id)?.status; + } + + /** Returns the full job record for a given ID. */ + getJob(id: string): JobRecord | undefined { + return this.records.get(id); + } + + /** + * Returns all jobs that reached the DEAD status. + * WHY: Dead-letter queue pattern — callers can inspect permanently failed + * jobs for manual intervention or alerting without losing the history. + */ + getDeadJobs(): JobRecord[] { + return Array.from(this.records.values()).filter((r) => r.status === 'DEAD'); + } + + /** + * Clears all stored records. + * WHY: Prevents unbounded memory growth in long-running processes. + * Callers should periodically flush old records or call this on cleanup. + */ + clear(): void { + this.records.clear(); + } + + private _get(id: string): JobRecord { + const r = this.records.get(id); + if (!r) throw new Error(`JobMonitor: unknown job id "${id}"`); + return r; + } +} + +/** + * JobQueue + * + * Enqueues async tasks and executes them with retry + monitoring. + * + * Usage: + * ```ts + * const monitor = new JobMonitor({ onSuccess: (job) => console.log('done', job.id) }); + * const queue = new JobQueue({ monitor }); + * + * const result = await queue.enqueue(() => api.getDashboardSummary({ month: '2026-02' })); + * ``` + */ +export class JobQueue { + private policy: RetryPolicy; + private monitor: JobMonitor; + + constructor(options: { policy?: Partial; monitor?: JobMonitor } = {}) { + this.policy = { ...defaultRetryPolicy, ...options.policy }; + // WHY: Allow callers to share a monitor across multiple queues for + // centralised observability, or use an isolated monitor per queue. + this.monitor = options.monitor ?? new JobMonitor(); + } + + /** + * Enqueues a task and returns a promise that resolves with the result + * or rejects after all retry attempts are exhausted. + * + * @param task - A zero-argument function returning a Promise. + * @returns Promise resolved when the task succeeds. + */ + async enqueue(task: () => Promise): Promise { + const id = generateId(); + this.monitor._register(id); + + const { maxAttempts, baseDelayMs, isRetryable } = this.policy; + + let lastError: Error = new Error('Unknown error'); + + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + this.monitor._markRunning(id); + + try { + const result = await task(); + this.monitor._markCompleted(id, result); + return result; + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + + const isLast = attempt === maxAttempts; + const canRetry = isRetryable(lastError); + + if (!canRetry) { + // WHY: Permanent errors (e.g. 401) should fail immediately rather + // than wasting attempts that will also fail for the same reason. + this.monitor._markFailed(id, lastError, false); + throw lastError; + } + + if (isLast) { + // WHY: Job goes to DEAD state when all retries are exhausted, + // enabling dead-letter inspection without losing failure context. + this.monitor._markFailed(id, lastError, true); + throw lastError; + } + + this.monitor._markRetrying(id, lastError); + + // Exponential backoff: 200ms, 400ms, 800ms, … + // WHY: Immediate retries can overwhelm a struggling service. + // Exponential backoff gives the dependency time to recover. + const delay = baseDelayMs * Math.pow(2, attempt - 1); + await sleep(delay); + } + } + + // Unreachable but satisfies TypeScript's control-flow analysis. + throw lastError; + } + + /** Exposes the underlying monitor for status queries. */ + getMonitor(): JobMonitor { + return this.monitor; + } +} From c5588d60eb2d40ff53f3557647e192941a9aec7c Mon Sep 17 00:00:00 2001 From: xMACANJAWAx <97346103+macan88@users.noreply.github.com> Date: Fri, 13 Mar 2026 01:21:44 +0700 Subject: [PATCH 2/3] feat(job-queue): resilient background job retry & monitoring (#130) --- app/src/__tests__/jobQueue.test.ts | 166 +++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 app/src/__tests__/jobQueue.test.ts diff --git a/app/src/__tests__/jobQueue.test.ts b/app/src/__tests__/jobQueue.test.ts new file mode 100644 index 00000000..801789d5 --- /dev/null +++ b/app/src/__tests__/jobQueue.test.ts @@ -0,0 +1,166 @@ +/** + * jobQueue.test.ts + * + * Tests for the JobQueue and JobMonitor resilience layer (Issue #130). + * + * WHY these test cases: + * 1. Retry with exponential backoff — core acceptance criterion. + * 2. No retry on permanent (4xx) errors — prevents wasted attempts and + * avoids hammering auth/validation endpoints that will never recover. + * 3. Dead-letter queue after exhausted retries — ensures failed jobs are + * observable for manual intervention without losing error context. + */ + +import { JobMonitor, JobQueue, defaultRetryPolicy } from '../lib/jobQueue'; + +// WHY: We fast-forward fake timers so tests don't actually wait for +// exponential backoff delays (200ms, 400ms …) during CI runs. +jest.useFakeTimers(); + +/** Helper: run the queue's internal promise while advancing timers. */ +async function drainWithTimers(promise: Promise): Promise { + // We interleave timer advancement with microtask flushing so that + // each await sleep() inside the retry loop is resolved. + const flushPromise = () => Promise.resolve(); + let result: unknown; + let error: unknown; + let settled = false; + + promise + .then((v) => { result = v; settled = true; }) + .catch((e) => { error = e; settled = true; }); + + // Advance time in small increments, flushing microtasks between each step. + // 4 iterations covers up to 3 retries with baseDelayMs=1 in tests. + for (let i = 0; i < 8; i++) { + await flushPromise(); + jest.advanceTimersByTime(10_000); // jump past any backoff delay + await flushPromise(); + if (settled) break; + } + + if (error !== undefined) throw error; + return result; +} + +describe('JobQueue — retry & monitoring', () => { + afterEach(() => { + jest.clearAllTimers(); + }); + + it('retries a transient failure up to maxAttempts and resolves on success', async () => { + /** + * WHY: Validates the primary acceptance criterion — transient errors + * (e.g. 503 Service Unavailable) should be retried with backoff. + * The task fails twice then succeeds on the third attempt. + */ + const monitor = new JobMonitor(); + const queue = new JobQueue({ + monitor, + policy: { maxAttempts: 3, baseDelayMs: 1 }, // tiny delay for tests + }); + + let callCount = 0; + const task = jest.fn(async () => { + callCount++; + if (callCount < 3) throw new Error('503 Service Unavailable'); + return 'ok'; + }); + + const promise = queue.enqueue(task); + const result = await drainWithTimers(promise); + + expect(callCount).toBe(3); // 2 retries occurred + expect(result).toBe('ok'); + + // Find the completed job in the monitor + const monitor_ = queue.getMonitor(); + const dead = monitor_.getDeadJobs(); + expect(dead).toHaveLength(0); // No dead jobs — it eventually succeeded + }); + + it('does NOT retry permanent (4xx) errors and marks job FAILED immediately', async () => { + /** + * WHY: Retrying a 401 Unauthorized or 400 Bad Request is wasteful and + * will never succeed. The retry policy must short-circuit on permanent + * errors to preserve resource budgets and avoid confusing log noise. + */ + const onFailureMock = jest.fn(); + const onRetryMock = jest.fn(); + const monitor = new JobMonitor({ + onFailure: onFailureMock, + onRetry: onRetryMock, + }); + const queue = new JobQueue({ + monitor, + policy: { maxAttempts: 3, baseDelayMs: 1 }, + }); + + let callCount = 0; + const task = jest.fn(async () => { + callCount++; + throw new Error('401 Unauthorized'); + }); + + const promise = queue.enqueue(task); + await expect(drainWithTimers(promise)).rejects.toThrow('401 Unauthorized'); + + expect(callCount).toBe(1); // Only one attempt — no retries + expect(onRetryMock).not.toHaveBeenCalled(); + expect(onFailureMock).toHaveBeenCalledTimes(1); + expect(onFailureMock.mock.calls[0][0]).toMatchObject({ status: 'FAILED', attempts: 1 }); + }); + + it('moves job to dead-letter queue after all retry attempts are exhausted', async () => { + /** + * WHY: When a transient error persists across all retries the job must + * be placed in the dead-letter queue so operators can inspect it and + * decide on manual remediation without losing failure context. + */ + const onDeadMock = jest.fn(); + const onRetryMock = jest.fn(); + const monitor = new JobMonitor({ + onDead: onDeadMock, + onRetry: onRetryMock, + }); + const queue = new JobQueue({ + monitor, + policy: { maxAttempts: 3, baseDelayMs: 1 }, + }); + + const task = jest.fn(async () => { + throw new Error('Network timeout'); + }); + + const promise = queue.enqueue(task); + await expect(drainWithTimers(promise)).rejects.toThrow('Network timeout'); + + expect(task).toHaveBeenCalledTimes(3); // All 3 attempts were made + + // onRetry fires after attempts 1 and 2 (not the final failure) + expect(onRetryMock).toHaveBeenCalledTimes(2); + + // Job ends up in dead-letter state + expect(onDeadMock).toHaveBeenCalledTimes(1); + const deadJobs = monitor.getDeadJobs(); + expect(deadJobs).toHaveLength(1); + expect(deadJobs[0].status).toBe('DEAD'); + expect(deadJobs[0].attempts).toBe(3); + expect(deadJobs[0].error?.message).toBe('Network timeout'); + }); +}); + +describe('defaultRetryPolicy.isRetryable', () => { + it('treats 5xx and network errors as retryable', () => { + expect(defaultRetryPolicy.isRetryable(new Error('500 Internal Server Error'))).toBe(true); + expect(defaultRetryPolicy.isRetryable(new Error('503 Service Unavailable'))).toBe(true); + expect(defaultRetryPolicy.isRetryable(new Error('Network timeout'))).toBe(true); + }); + + it('treats 4xx errors as permanent (not retryable)', () => { + expect(defaultRetryPolicy.isRetryable(new Error('401 Unauthorized'))).toBe(false); + expect(defaultRetryPolicy.isRetryable(new Error('400 Bad Request'))).toBe(false); + expect(defaultRetryPolicy.isRetryable(new Error('403 Forbidden'))).toBe(false); + expect(defaultRetryPolicy.isRetryable(new Error('404 Not Found'))).toBe(false); + }); +}); From 4308bff9cb6ab11e7eb4009469b9d4c15c7718f5 Mon Sep 17 00:00:00 2001 From: xMACANJAWAx <97346103+macan88@users.noreply.github.com> Date: Fri, 13 Mar 2026 01:21:45 +0700 Subject: [PATCH 3/3] feat(job-queue): resilient background job retry & monitoring (#130) --- docs/job-queue.md | 90 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 docs/job-queue.md diff --git a/docs/job-queue.md b/docs/job-queue.md new file mode 100644 index 00000000..dbf64dc9 --- /dev/null +++ b/docs/job-queue.md @@ -0,0 +1,90 @@ +# Resilient Background Job Queue + +Implemented for [Issue #130](../../issues/130). + +## Overview + +`JobQueue` wraps any async function with **exponential backoff retry** and **lifecycle monitoring**. It is designed for API calls that can experience transient failures (network blips, 5xx responses) while safely ignoring permanent errors (4xx). + +## Quick Start + +```ts +import { JobQueue, JobMonitor } from '@/lib/jobQueue'; +import { getDashboardSummary } from '@/api/dashboard'; + +// 1. Create a shared monitor with lifecycle hooks +const monitor = new JobMonitor({ + onSuccess: (job) => console.info('Job completed', job.id), + onRetry: (job, attempt, err) => console.warn(`Retry #${attempt}`, err.message), + onDead: (job) => console.error('Job dead-lettered', job.id, job.error), +}); + +// 2. Create a queue (shared or per-feature) +const queue = new JobQueue({ monitor }); + +// 3. Enqueue a task — retries happen automatically +const summary = await queue.enqueue(() => + getDashboardSummary({ month: '2026-02' }) +); +``` + +## Retry Policy + +| Option | Default | Description | +|---|---|---| +| `maxAttempts` | `3` | Total attempts including the first | +| `baseDelayMs` | `200` | Base delay; doubles each retry (200 → 400 → 800 ms) | +| `isRetryable` | skip 4xx | Return `false` to abort retries immediately | + +Customise the policy at queue creation: + +```ts +const queue = new JobQueue({ + policy: { + maxAttempts: 5, + baseDelayMs: 500, + isRetryable: (err) => !err.message.includes('401'), + }, +}); +``` + +## Job Lifecycle + +``` +PENDING → RUNNING → COMPLETED (happy path) + → RUNNING (retry) → … → COMPLETED + → FAILED (permanent 4xx error) + → DEAD (all retries exhausted) +``` + +## Dead-Letter Queue + +Jobs that exhaust all retries are moved to `DEAD` state and can be inspected: + +```ts +const dead = monitor.getDeadJobs(); +dead.forEach((job) => { + console.error(`Job ${job.id} failed after ${job.attempts} attempts:`, job.error); +}); +``` + +Call `monitor.clear()` periodically to prevent unbounded memory growth in long-running processes. + +## Memory Management + +- The monitor stores **all** job records in memory. For high-throughput scenarios, flush completed jobs regularly with `monitor.clear()` or implement a retention policy using `monitor.getJob(id)` and custom eviction logic. +- Each `JobQueue` instance creates an isolated promise chain per job. There are no global listeners and no cross-job side effects. + +## Testing + +Use `jest.useFakeTimers()` to fast-forward backoff delays: + +```ts +jest.useFakeTimers(); + +const promise = queue.enqueue(task); +jest.runAllTimers(); // advance all pending setTimeout calls +const result = await promise; +``` + +See `app/src/__tests__/jobQueue.test.ts` for full examples.