diff --git a/.env.example b/.env.example index d62e993..6c38aa1 100644 --- a/.env.example +++ b/.env.example @@ -27,3 +27,4 @@ ENABLE_API_VERSION_HEADER=true ENABLE_SCHEMA_VERSION_HEADER=true ENABLE_RESPONSE_TIMING=true ENABLE_REQUEST_LOGGING=true +BACKGROUND_JOB_LOCK_TTL_MS=300000 diff --git a/docs/api-versioning.md b/docs/api-versioning.md index 669ba3d..23be8c0 100644 --- a/docs/api-versioning.md +++ b/docs/api-versioning.md @@ -5,14 +5,17 @@ The Access Layer Server uses versioning headers to inform clients about the curr ## Response Headers ### `X-API-Version` + Indicates the current overall version of the API. This is typically used for tracking feature sets and major API releases. ### `X-Schema-Version` + Indicates the active version of the request body schema. This version should be checked by consumers to ensure they are sending request bodies in the format expected by the server. ## Versioning Strategy Both headers follow [Semantic Versioning (SemVer)](https://semver.org/): + - **MAJOR** version: Breaking changes to the API or schema. - **MINOR** version: Backwards-compatible new features or additions. - **PATCH** version: Backwards-compatible bug fixes. diff --git a/docs/architecture/domain-boundaries.md b/docs/architecture/domain-boundaries.md index 9ea856c..93906e7 100644 --- a/docs/architecture/domain-boundaries.md +++ b/docs/architecture/domain-boundaries.md @@ -49,11 +49,11 @@ erDiagram ### Core Entities -1. **User**: Represents a registered user. Holds authentication and basic profile data. -2. **CreatorProfile**: Represents the creator persona of a user. Tied to a specific handle and contains metadata like bio and perks. -3. **StellarWallet**: Links a user to their Stellar public address. Used for identity verification and ownership checks. -4. **IndexerDLQ**: Stores failed indexing jobs from the Stellar blockchain for manual review or reprocessing. -5. **AuditEvent**: A generic log for significant actions occurring in the system. +1. **User**: Represents a registered user. Holds authentication and basic profile data. +2. **CreatorProfile**: Represents the creator persona of a user. Tied to a specific handle and contains metadata like bio and perks. +3. **StellarWallet**: Links a user to their Stellar public address. Used for identity verification and ownership checks. +4. **IndexerDLQ**: Stores failed indexing jobs from the Stellar blockchain for manual review or reprocessing. +5. **AuditEvent**: A generic log for significant actions occurring in the system. ## Module Boundaries @@ -61,22 +61,22 @@ The server is organized into feature-based modules under `src/modules/`. Each mo ### Major Route Groups -| Module | Responsibility | Primary Entities | -| :--- | :--- | :--- | -| `auth` | User registration, login, session management, and password resets. | `User` | +| Module | Responsibility | Primary Entities | +| :--------- | :---------------------------------------------------------------------------- | :--------------- | +| `auth` | User registration, login, session management, and password resets. | `User` | | `creators` | Public and private creator profile management, including stats and discovery. | `CreatorProfile` | -| `wallet` | Linking and verifying Stellar wallets. | `StellarWallet` | -| `admin` | Internal management tools and system monitoring. | All | -| `health` | System health checks and status monitoring. | N/A | +| `wallet` | Linking and verifying Stellar wallets. | `StellarWallet` | +| `admin` | Internal management tools and system monitoring. | All | +| `health` | System health checks and status monitoring. | N/A | ### Cross-Module Rules To ensure a maintainable and decoupled architecture, the following rules apply: -1. **No Direct Database Access**: Modules should not directly query Prisma models belonging to other modules if a service/utility exists. -2. **Shared Utilities**: Common logic (e.g., mail sending, logging, pagination) belongs in `src/utils/` and can be used by any module. -3. **Constants**: Shared configuration and string constants belong in `src/constants/`. -4. **Types**: Cross-cutting TypeScript types belong in `src/types/`. +1. **No Direct Database Access**: Modules should not directly query Prisma models belonging to other modules if a service/utility exists. +2. **Shared Utilities**: Common logic (e.g., mail sending, logging, pagination) belongs in `src/utils/` and can be used by any module. +3. **Constants**: Shared configuration and string constants belong in `src/constants/`. +4. **Types**: Cross-cutting TypeScript types belong in `src/types/`. ### Interaction Patterns diff --git a/docs/read-model-rebuild.md b/docs/read-model-rebuild.md index 086fe58..a794e88 100644 --- a/docs/read-model-rebuild.md +++ b/docs/read-model-rebuild.md @@ -46,6 +46,7 @@ Source of truth (events / primary tables) 1. **Pause or fence the live indexer** that writes to the read model, or ensure it is idempotent enough to run concurrently with the rebuild. 2. **Truncate or soft-delete** the stale read-model rows: + ```sql TRUNCATE TABLE creator_ownership_reads; -- or, for an incremental rebuild without full downtime: @@ -53,28 +54,32 @@ Source of truth (events / primary tables) ``` 3. **Run the rebuild script** (location: `scripts/rebuild-read-model.sh` once implemented): + ```bash pnpm ts-node src/scripts/rebuild-read-model.ts --model=creator_ownership_reads ``` + The script must process records in batches (recommended batch size: 500) and log progress at each checkpoint. 4. **Verify output:** + ```sql SELECT COUNT(*) FROM creator_ownership_reads; -- Compare to expected count derived from source tables ``` + Spot-check a sample of records against the source of truth. 5. **Resume the live indexer.** If fenced, lift the fence. If the indexer was paused, restart it and confirm it picks up from the correct cursor position. ## Expected duration -| Table size | Estimated rebuild time | -|---|---| -| < 10k rows | < 1 minute | -| 10k – 100k rows | 2–10 minutes | -| 100k – 1M rows | 15–60 minutes | -| > 1M rows | Plan for multi-hour window; use batched pagination | +| Table size | Estimated rebuild time | +| --------------- | -------------------------------------------------- | +| < 10k rows | < 1 minute | +| 10k – 100k rows | 2–10 minutes | +| 100k – 1M rows | 15–60 minutes | +| > 1M rows | Plan for multi-hour window; use batched pagination | Duration depends on DB instance size, network, and whether indexes are rebuilt inline or deferred. @@ -90,6 +95,7 @@ curl -X POST "$API_BASE_URL/admin/indexer/replay" \ ``` Notes: + - `dryRun` defaults to `false`; omit it to run a normal replay initiation. - In dry-run mode, the response includes `dryRun: true` and no audit event is written. - `startLedger` must be a positive integer in both dry-run and normal mode. diff --git a/src/config.ts b/src/config.ts index 1c5fc6a..ba942d9 100644 --- a/src/config.ts +++ b/src/config.ts @@ -55,6 +55,7 @@ export const envSchema = z.object({ ENABLE_SCHEMA_VERSION_HEADER: z.coerce.boolean().default(true), ENABLE_REQUEST_LOGGING: z.coerce.boolean().default(true), INDEXER_JITTER_FACTOR: z.coerce.number().min(0).max(1).default(0.1), + BACKGROUND_JOB_LOCK_TTL_MS: z.coerce.number().int().positive().default(300000), }); export const envConfig = envSchema.parse(process.env); diff --git a/src/modules/admin/admin.controllers.test.ts b/src/modules/admin/admin.controllers.test.ts index 6aaf68c..f22af3a 100644 --- a/src/modules/admin/admin.controllers.test.ts +++ b/src/modules/admin/admin.controllers.test.ts @@ -3,6 +3,13 @@ import { emitAuditEvent } from '../../utils/audit.utils'; import { AdminRequest } from '../../middlewares/admin-guard.middleware'; import { Response } from 'express'; +jest.mock('../../utils/background-job-lock.utils', () => ({ + acquireJobLock: jest.fn(() => ({ + acquired: true, + expiresAt: '2026-01-01T00:00:00.000Z', + })), +})); + jest.mock('../../utils/prisma.utils', () => ({ prisma: { creatorProfile: { diff --git a/src/modules/admin/admin.controllers.ts b/src/modules/admin/admin.controllers.ts index 56db8f6..6f16f3b 100644 --- a/src/modules/admin/admin.controllers.ts +++ b/src/modules/admin/admin.controllers.ts @@ -10,6 +10,9 @@ import { emitAuditEvent } from '../../utils/audit.utils'; import { AdminRequest } from '../../middlewares/admin-guard.middleware'; import { Response } from 'express'; import { z } from 'zod'; +import { acquireJobLock } from '../../utils/background-job-lock.utils'; +import { logger } from '../../utils/logger.utils'; +import { ErrorCode } from '../../constants/error.constants'; const UpdateCreatorMetadataSchema = z.object({ isVerified: z.boolean().optional(), @@ -100,6 +103,8 @@ export const httpReplayIndexerEvents: AsyncController = async (req: AdminRequest try { const { startLedger, dryRun = false } = req.body as { startLedger?: number; dryRun?: boolean }; const adminId = req.adminId; + const lockName = 'indexer-replay'; + const lockOwner = adminId || 'unknown'; if (typeof startLedger !== 'number' || startLedger < 1) { return sendValidationError(res, 'Invalid request body', [ @@ -112,14 +117,48 @@ export const httpReplayIndexerEvents: AsyncController = async (req: AdminRequest ]); } + const lock = acquireJobLock({ + name: lockName, + owner: lockOwner, + }); + + if (!lock.acquired) { + return res.status(409).json({ + success: false, + error: { + code: ErrorCode.CONFLICT, + message: 'Indexer replay job is already running', + details: [ + { + field: 'indexerReplayLock', + message: `Lock is held by ${lock.holder || 'another worker'} until ${lock.expiresAt || 'unknown time'}`, + }, + ], + }, + }); + } + const replayInitiated = { type: 'INDEXER_REPLAY_INITIATED', startLedger, dryRun, initiatedBy: adminId, + lock: { + name: lockName, + expiresAt: lock.expiresAt, + }, timestamp: new Date().toISOString(), }; + logger.info( + { + lockName, + lockOwner, + lockExpiresAt: lock.expiresAt, + }, + 'Acquired background job lock for indexer replay' + ); + if (!dryRun) { await emitAuditEvent({ actor: adminId || 'unknown', diff --git a/src/utils/background-job-lock.utils.ts b/src/utils/background-job-lock.utils.ts new file mode 100644 index 0000000..c940016 --- /dev/null +++ b/src/utils/background-job-lock.utils.ts @@ -0,0 +1,76 @@ +import { envConfig } from '../config'; +import { logger } from './logger.utils'; + +interface LockEntry { + owner: string; + expiresAtMs: number; +} + +const locks = new Map(); + +export interface AcquireJobLockParams { + name: string; + owner: string; + ttlMs?: number; +} + +export interface AcquireJobLockResult { + acquired: boolean; + expiresAt?: string; + holder?: string; +} + +export function acquireJobLock({ + name, + owner, + ttlMs = envConfig.BACKGROUND_JOB_LOCK_TTL_MS, +}: AcquireJobLockParams): AcquireJobLockResult { + const nowMs = Date.now(); + const existing = locks.get(name); + + if (existing && existing.expiresAtMs <= nowMs) { + logger.warn( + { + lockName: name, + previousOwner: existing.owner, + expiredAt: new Date(existing.expiresAtMs).toISOString(), + now: new Date(nowMs).toISOString(), + }, + 'Background job lock expired; reclaiming lock' + ); + locks.delete(name); + } + + if (locks.has(name)) { + const current = locks.get(name)!; + return { + acquired: false, + expiresAt: new Date(current.expiresAtMs).toISOString(), + holder: current.owner, + }; + } + + const expiresAtMs = nowMs + ttlMs; + locks.set(name, { owner, expiresAtMs }); + + return { + acquired: true, + expiresAt: new Date(expiresAtMs).toISOString(), + }; +} + +export function releaseJobLock(name: string, owner?: string): boolean { + const current = locks.get(name); + if (!current) return false; + + if (owner && current.owner !== owner) { + return false; + } + + locks.delete(name); + return true; +} + +export function resetJobLocks(): void { + locks.clear(); +} diff --git a/src/utils/test/background-job-lock.utils.test.ts b/src/utils/test/background-job-lock.utils.test.ts new file mode 100644 index 0000000..0953b77 --- /dev/null +++ b/src/utils/test/background-job-lock.utils.test.ts @@ -0,0 +1,84 @@ +import { + acquireJobLock, + releaseJobLock, + resetJobLocks, +} from '../background-job-lock.utils'; +import { logger } from '../logger.utils'; + +jest.mock('../../config', () => ({ + envConfig: { + BACKGROUND_JOB_LOCK_TTL_MS: 1000, + }, +})); + +jest.mock('../logger.utils', () => ({ + logger: { + warn: jest.fn(), + }, +})); + +describe('background-job-lock.utils', () => { + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(new Date('2026-01-01T00:00:00.000Z')); + resetJobLocks(); + jest.clearAllMocks(); + }); + + afterEach(() => { + jest.useRealTimers(); + resetJobLocks(); + }); + + it('acquires lock with default TTL from config', () => { + const result = acquireJobLock({ name: 'indexer', owner: 'worker-a' }); + + expect(result.acquired).toBe(true); + expect(result.expiresAt).toBe('2026-01-01T00:00:01.000Z'); + }); + + it('supports per-call TTL override', () => { + const result = acquireJobLock({ + name: 'indexer', + owner: 'worker-a', + ttlMs: 5000, + }); + + expect(result.acquired).toBe(true); + expect(result.expiresAt).toBe('2026-01-01T00:00:05.000Z'); + }); + + it('rejects acquisition while active lock exists', () => { + acquireJobLock({ name: 'indexer', owner: 'worker-a' }); + const result = acquireJobLock({ name: 'indexer', owner: 'worker-b' }); + + expect(result).toEqual({ + acquired: false, + holder: 'worker-a', + expiresAt: '2026-01-01T00:00:01.000Z', + }); + }); + + it('logs expiration and allows lock reclaim after TTL', () => { + acquireJobLock({ name: 'indexer', owner: 'worker-a' }); + jest.advanceTimersByTime(1001); + + const result = acquireJobLock({ name: 'indexer', owner: 'worker-b' }); + + expect(result.acquired).toBe(true); + expect(logger.warn).toHaveBeenCalledWith( + expect.objectContaining({ + lockName: 'indexer', + previousOwner: 'worker-a', + }), + 'Background job lock expired; reclaiming lock' + ); + }); + + it('releases lock only when owner matches', () => { + acquireJobLock({ name: 'indexer', owner: 'worker-a' }); + + expect(releaseJobLock('indexer', 'worker-b')).toBe(false); + expect(releaseJobLock('indexer', 'worker-a')).toBe(true); + }); +});