Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ ENABLE_API_VERSION_HEADER=true
ENABLE_SCHEMA_VERSION_HEADER=true
ENABLE_RESPONSE_TIMING=true
ENABLE_REQUEST_LOGGING=true
BACKGROUND_JOB_LOCK_TTL_MS=300000
3 changes: 3 additions & 0 deletions docs/api-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ The Access Layer Server uses versioning headers to inform clients about the curr
## Response Headers

### `X-API-Version`

Indicates the current overall version of the API. This is typically used for tracking feature sets and major API releases.

### `X-Schema-Version`

Indicates the active version of the request body schema. This version should be checked by consumers to ensure they are sending request bodies in the format expected by the server.

## Versioning Strategy

Both headers follow [Semantic Versioning (SemVer)](https://semver.org/):

- **MAJOR** version: Breaking changes to the API or schema.
- **MINOR** version: Backwards-compatible new features or additions.
- **PATCH** version: Backwards-compatible bug fixes.
Expand Down
30 changes: 15 additions & 15 deletions docs/architecture/domain-boundaries.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,34 +49,34 @@ erDiagram

### Core Entities

1. **User**: Represents a registered user. Holds authentication and basic profile data.
2. **CreatorProfile**: Represents the creator persona of a user. Tied to a specific handle and contains metadata like bio and perks.
3. **StellarWallet**: Links a user to their Stellar public address. Used for identity verification and ownership checks.
4. **IndexerDLQ**: Stores failed indexing jobs from the Stellar blockchain for manual review or reprocessing.
5. **AuditEvent**: A generic log for significant actions occurring in the system.
1. **User**: Represents a registered user. Holds authentication and basic profile data.
2. **CreatorProfile**: Represents the creator persona of a user. Tied to a specific handle and contains metadata like bio and perks.
3. **StellarWallet**: Links a user to their Stellar public address. Used for identity verification and ownership checks.
4. **IndexerDLQ**: Stores failed indexing jobs from the Stellar blockchain for manual review or reprocessing.
5. **AuditEvent**: A generic log for significant actions occurring in the system.

## Module Boundaries

The server is organized into feature-based modules under `src/modules/`. Each module is responsible for its own business logic, routes, and (where applicable) data validation.

### Major Route Groups

| Module | Responsibility | Primary Entities |
| :--- | :--- | :--- |
| `auth` | User registration, login, session management, and password resets. | `User` |
| Module | Responsibility | Primary Entities |
| :--------- | :---------------------------------------------------------------------------- | :--------------- |
| `auth` | User registration, login, session management, and password resets. | `User` |
| `creators` | Public and private creator profile management, including stats and discovery. | `CreatorProfile` |
| `wallet` | Linking and verifying Stellar wallets. | `StellarWallet` |
| `admin` | Internal management tools and system monitoring. | All |
| `health` | System health checks and status monitoring. | N/A |
| `wallet` | Linking and verifying Stellar wallets. | `StellarWallet` |
| `admin` | Internal management tools and system monitoring. | All |
| `health` | System health checks and status monitoring. | N/A |

### Cross-Module Rules

To ensure a maintainable and decoupled architecture, the following rules apply:

1. **No Direct Database Access**: Modules should not directly query Prisma models belonging to other modules if a service/utility exists.
2. **Shared Utilities**: Common logic (e.g., mail sending, logging, pagination) belongs in `src/utils/` and can be used by any module.
3. **Constants**: Shared configuration and string constants belong in `src/constants/`.
4. **Types**: Cross-cutting TypeScript types belong in `src/types/`.
1. **No Direct Database Access**: Modules should not directly query Prisma models belonging to other modules if a service/utility exists.
2. **Shared Utilities**: Common logic (e.g., mail sending, logging, pagination) belongs in `src/utils/` and can be used by any module.
3. **Constants**: Shared configuration and string constants belong in `src/constants/`.
4. **Types**: Cross-cutting TypeScript types belong in `src/types/`.

### Interaction Patterns

Expand Down
18 changes: 12 additions & 6 deletions docs/read-model-rebuild.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,40 @@ Source of truth (events / primary tables)
1. **Pause or fence the live indexer** that writes to the read model, or ensure it is idempotent enough to run concurrently with the rebuild.

2. **Truncate or soft-delete** the stale read-model rows:

```sql
TRUNCATE TABLE creator_ownership_reads;
-- or, for an incremental rebuild without full downtime:
UPDATE creator_ownership_reads SET rebuild_pending = true;
```

3. **Run the rebuild script** (location: `scripts/rebuild-read-model.sh` once implemented):

```bash
pnpm ts-node src/scripts/rebuild-read-model.ts --model=creator_ownership_reads
```

The script must process records in batches (recommended batch size: 500) and log progress at each checkpoint.

4. **Verify output:**

```sql
SELECT COUNT(*) FROM creator_ownership_reads;
-- Compare to expected count derived from source tables
```

Spot-check a sample of records against the source of truth.

5. **Resume the live indexer.** If fenced, lift the fence. If the indexer was paused, restart it and confirm it picks up from the correct cursor position.

## Expected duration

| Table size | Estimated rebuild time |
|---|---|
| < 10k rows | < 1 minute |
| 10k – 100k rows | 2–10 minutes |
| 100k – 1M rows | 15–60 minutes |
| > 1M rows | Plan for multi-hour window; use batched pagination |
| Table size | Estimated rebuild time |
| --------------- | -------------------------------------------------- |
| < 10k rows | < 1 minute |
| 10k – 100k rows | 2–10 minutes |
| 100k – 1M rows | 15–60 minutes |
| > 1M rows | Plan for multi-hour window; use batched pagination |

Duration depends on DB instance size, network, and whether indexes are rebuilt inline or deferred.

Expand All @@ -90,6 +95,7 @@ curl -X POST "$API_BASE_URL/admin/indexer/replay" \
```

Notes:

- `dryRun` defaults to `false`; omit it to run a normal replay initiation.
- In dry-run mode, the response includes `dryRun: true` and no audit event is written.
- `startLedger` must be a positive integer in both dry-run and normal mode.
Expand Down
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export const envSchema = z.object({
ENABLE_SCHEMA_VERSION_HEADER: z.coerce.boolean().default(true),
ENABLE_REQUEST_LOGGING: z.coerce.boolean().default(true),
INDEXER_JITTER_FACTOR: z.coerce.number().min(0).max(1).default(0.1),
BACKGROUND_JOB_LOCK_TTL_MS: z.coerce.number().int().positive().default(300000),
});

export const envConfig = envSchema.parse(process.env);
Expand Down
7 changes: 7 additions & 0 deletions src/modules/admin/admin.controllers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ import { emitAuditEvent } from '../../utils/audit.utils';
import { AdminRequest } from '../../middlewares/admin-guard.middleware';
import { Response } from 'express';

jest.mock('../../utils/background-job-lock.utils', () => ({
acquireJobLock: jest.fn(() => ({
acquired: true,
expiresAt: '2026-01-01T00:00:00.000Z',
})),
}));

jest.mock('../../utils/prisma.utils', () => ({
prisma: {
creatorProfile: {
Expand Down
39 changes: 39 additions & 0 deletions src/modules/admin/admin.controllers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { emitAuditEvent } from '../../utils/audit.utils';
import { AdminRequest } from '../../middlewares/admin-guard.middleware';
import { Response } from 'express';
import { z } from 'zod';
import { acquireJobLock } from '../../utils/background-job-lock.utils';
import { logger } from '../../utils/logger.utils';
import { ErrorCode } from '../../constants/error.constants';

const UpdateCreatorMetadataSchema = z.object({
isVerified: z.boolean().optional(),
Expand Down Expand Up @@ -100,6 +103,8 @@ export const httpReplayIndexerEvents: AsyncController = async (req: AdminRequest
try {
const { startLedger, dryRun = false } = req.body as { startLedger?: number; dryRun?: boolean };
const adminId = req.adminId;
const lockName = 'indexer-replay';
const lockOwner = adminId || 'unknown';

if (typeof startLedger !== 'number' || startLedger < 1) {
return sendValidationError(res, 'Invalid request body', [
Expand All @@ -112,14 +117,48 @@ export const httpReplayIndexerEvents: AsyncController = async (req: AdminRequest
]);
}

const lock = acquireJobLock({
name: lockName,
owner: lockOwner,
});

if (!lock.acquired) {
return res.status(409).json({
success: false,
error: {
code: ErrorCode.CONFLICT,
message: 'Indexer replay job is already running',
details: [
{
field: 'indexerReplayLock',
message: `Lock is held by ${lock.holder || 'another worker'} until ${lock.expiresAt || 'unknown time'}`,
},
],
},
});
}

const replayInitiated = {
type: 'INDEXER_REPLAY_INITIATED',
startLedger,
dryRun,
initiatedBy: adminId,
lock: {
name: lockName,
expiresAt: lock.expiresAt,
},
timestamp: new Date().toISOString(),
};

logger.info(
{
lockName,
lockOwner,
lockExpiresAt: lock.expiresAt,
},
'Acquired background job lock for indexer replay'
);

if (!dryRun) {
await emitAuditEvent({
actor: adminId || 'unknown',
Expand Down
76 changes: 76 additions & 0 deletions src/utils/background-job-lock.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { envConfig } from '../config';
import { logger } from './logger.utils';

interface LockEntry {
owner: string;
expiresAtMs: number;
}

const locks = new Map<string, LockEntry>();

export interface AcquireJobLockParams {
name: string;
owner: string;
ttlMs?: number;
}

export interface AcquireJobLockResult {
acquired: boolean;
expiresAt?: string;
holder?: string;
}

export function acquireJobLock({
name,
owner,
ttlMs = envConfig.BACKGROUND_JOB_LOCK_TTL_MS,
}: AcquireJobLockParams): AcquireJobLockResult {
const nowMs = Date.now();
const existing = locks.get(name);

if (existing && existing.expiresAtMs <= nowMs) {
logger.warn(
{
lockName: name,
previousOwner: existing.owner,
expiredAt: new Date(existing.expiresAtMs).toISOString(),
now: new Date(nowMs).toISOString(),
},
'Background job lock expired; reclaiming lock'
);
locks.delete(name);
}

if (locks.has(name)) {
const current = locks.get(name)!;
return {
acquired: false,
expiresAt: new Date(current.expiresAtMs).toISOString(),
holder: current.owner,
};
}

const expiresAtMs = nowMs + ttlMs;
locks.set(name, { owner, expiresAtMs });

return {
acquired: true,
expiresAt: new Date(expiresAtMs).toISOString(),
};
}

export function releaseJobLock(name: string, owner?: string): boolean {
const current = locks.get(name);
if (!current) return false;

if (owner && current.owner !== owner) {
return false;
}

locks.delete(name);
return true;
}

export function resetJobLocks(): void {
locks.clear();
}
84 changes: 84 additions & 0 deletions src/utils/test/background-job-lock.utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import {
acquireJobLock,
releaseJobLock,
resetJobLocks,
} from '../background-job-lock.utils';
import { logger } from '../logger.utils';

jest.mock('../../config', () => ({
envConfig: {
BACKGROUND_JOB_LOCK_TTL_MS: 1000,
},
}));

jest.mock('../logger.utils', () => ({
logger: {
warn: jest.fn(),
},
}));

describe('background-job-lock.utils', () => {
beforeEach(() => {
jest.useFakeTimers();
jest.setSystemTime(new Date('2026-01-01T00:00:00.000Z'));
resetJobLocks();
jest.clearAllMocks();
});

afterEach(() => {
jest.useRealTimers();
resetJobLocks();
});

it('acquires lock with default TTL from config', () => {
const result = acquireJobLock({ name: 'indexer', owner: 'worker-a' });

expect(result.acquired).toBe(true);
expect(result.expiresAt).toBe('2026-01-01T00:00:01.000Z');
});

it('supports per-call TTL override', () => {
const result = acquireJobLock({
name: 'indexer',
owner: 'worker-a',
ttlMs: 5000,
});

expect(result.acquired).toBe(true);
expect(result.expiresAt).toBe('2026-01-01T00:00:05.000Z');
});

it('rejects acquisition while active lock exists', () => {
acquireJobLock({ name: 'indexer', owner: 'worker-a' });
const result = acquireJobLock({ name: 'indexer', owner: 'worker-b' });

expect(result).toEqual({
acquired: false,
holder: 'worker-a',
expiresAt: '2026-01-01T00:00:01.000Z',
});
});

it('logs expiration and allows lock reclaim after TTL', () => {
acquireJobLock({ name: 'indexer', owner: 'worker-a' });
jest.advanceTimersByTime(1001);

const result = acquireJobLock({ name: 'indexer', owner: 'worker-b' });

expect(result.acquired).toBe(true);
expect(logger.warn).toHaveBeenCalledWith(
expect.objectContaining({
lockName: 'indexer',
previousOwner: 'worker-a',
}),
'Background job lock expired; reclaiming lock'
);
});

it('releases lock only when owner matches', () => {
acquireJobLock({ name: 'indexer', owner: 'worker-a' });

expect(releaseJobLock('indexer', 'worker-b')).toBe(false);
expect(releaseJobLock('indexer', 'worker-a')).toBe(true);
});
});
Loading