diff --git a/.env.example b/.env.example index f396361..d62e993 100644 --- a/.env.example +++ b/.env.example @@ -24,5 +24,6 @@ PAYSTACK_PUBLIC_KEY= # API Configuration API_VERSION=1.0.0 ENABLE_API_VERSION_HEADER=true +ENABLE_SCHEMA_VERSION_HEADER=true ENABLE_RESPONSE_TIMING=true ENABLE_REQUEST_LOGGING=true diff --git a/README.md b/README.md index 0ddf516..538fb13 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The server is responsible for: - notifications, analytics, and moderation workflows - access checks for gated off-chain content -See [Backend Domain Model and Endpoint Boundaries](./docs/architecture/domain-boundaries.md) for a technical overview. +See [Backend Domain Model and Endpoint Boundaries](./docs/architecture/domain-boundaries.md) for a technical overview and [API Versioning](./docs/api-versioning.md) for details on schema versioning. ## Tech diff --git a/docs/api-versioning.md b/docs/api-versioning.md new file mode 100644 index 0000000..669ba3d --- /dev/null +++ b/docs/api-versioning.md @@ -0,0 +1,24 @@ +# API and Schema Versioning + +The Access Layer Server uses versioning headers to inform clients about the current API version and the expected structure of request bodies. + +## Response Headers + +### `X-API-Version` +Indicates the current overall version of the API. This is typically used for tracking feature sets and major API releases. + +### `X-Schema-Version` +Indicates the active version of the request body schema. This version should be checked by consumers to ensure they are sending request bodies in the format expected by the server. + +## Versioning Strategy + +Both headers follow [Semantic Versioning (SemVer)](https://semver.org/): +- **MAJOR** version: Breaking changes to the API or schema. +- **MINOR** version: Backwards-compatible new features or additions. +- **PATCH** version: Backwards-compatible bug fixes. + +## Expected Consumer Behavior + +1. **Check Headers**: Consumers should inspect the `X-Schema-Version` header in API responses. +2. **Schema Alignment**: If the `X-Schema-Version` major version changes, consumers must update their request body structures to match the new schema requirements. +3. **Warning Handling**: Consumers may choose to log warnings if they detect a version mismatch that they haven't yet updated to support. diff --git a/docs/indexer/EVENT_PROCESSING.md b/docs/indexer/EVENT_PROCESSING.md new file mode 100644 index 0000000..01b86c2 --- /dev/null +++ b/docs/indexer/EVENT_PROCESSING.md @@ -0,0 +1,33 @@ +# Chain Event Processing + +The indexer processes events from the blockchain to update the read models and activity feeds. To ensure data consistency and prevent duplicate processing, the following strategies are employed. + +## 1. Deduplication + +Before processing a batch of events, they should be deduped based on their unique identifier on the chain: `transactionHash` and `eventIndex`. + +The `dedupeChainEvents` helper in `src/utils/indexer-dedupe.utils.ts` provides this functionality. + +### Example Usage: + +```typescript +import { dedupeChainEvents } from '../utils/indexer-dedupe.utils'; + +const rawEvents = fetchEventsFromChain(); +const uniqueEvents = dedupeChainEvents(rawEvents); +// Proceed with processing uniqueEvents +``` + +## 2. Idempotency + +Event handlers must be idempotent. This means that processing the same event multiple times should have the same effect as processing it once. + +### Strategies for Idempotency: + +- **Database Upserts**: Use Prisma's `upsert` or `update` with unique constraints where possible. +- **State Check**: Before applying a change (like incrementing a balance), verify if the event has already been accounted for (e.g. by checking a `lastProcessedLedger` or a specific event log). +- **Atomic Transactions**: Ensure that all changes related to an event are committed in a single database transaction. + +## 3. Error Handling + +If an event fails to process after multiple retries, it is moved to the [Dead-Letter Queue (DLQ)](./DLQ_WORKFLOW.md) for manual investigation. diff --git a/src/app.ts b/src/app.ts index 791481f..5bc8cb0 100644 --- a/src/app.ts +++ b/src/app.ts @@ -12,6 +12,7 @@ import { appRateLimit } from './middlewares/rate.middleware'; import { requestIdMiddleware } from './middlewares/request-id.middleware'; import { responseTimingMiddleware } from './middlewares/response-timing.middleware'; import { apiVersionMiddleware } from './middlewares/api-version.middleware'; +import { schemaVersionMiddleware } from './middlewares/schema-version.middleware'; import { requestLoggerMiddleware } from './middlewares/request-logger.middleware'; import { envConfig } from './config'; @@ -21,6 +22,7 @@ const app: Express = express(); app.set('trust proxy', 1); app.use(responseTimingMiddleware); app.use(apiVersionMiddleware); +app.use(schemaVersionMiddleware); app.use(requestIdMiddleware); app.use(corsMiddleware()); app.use(helmet()); diff --git a/src/config.ts b/src/config.ts index 0baef39..1c5fc6a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -52,6 +52,7 @@ export const envSchema = z.object({ ENABLE_RESPONSE_TIMING: z.coerce.boolean().default(true), API_VERSION: z.string().default('1.0.0'), ENABLE_API_VERSION_HEADER: z.coerce.boolean().default(true), + ENABLE_SCHEMA_VERSION_HEADER: z.coerce.boolean().default(true), ENABLE_REQUEST_LOGGING: z.coerce.boolean().default(true), INDEXER_JITTER_FACTOR: z.coerce.number().min(0).max(1).default(0.1), }); diff --git a/src/constants/schema.constants.ts b/src/constants/schema.constants.ts new file mode 100644 index 0000000..430cd15 --- /dev/null +++ b/src/constants/schema.constants.ts @@ -0,0 +1,12 @@ +// src/constants/schema.constants.ts + +/** + * Current version of the request body schema. + * This version should be bumped whenever there are breaking changes to the request body structure. + */ +export const REQUEST_SCHEMA_VERSION = '1.0.0'; + +/** + * The response header key that carries the active request schema version. + */ +export const SCHEMA_VERSION_HEADER = 'X-Schema-Version'; diff --git a/src/middlewares/schema-version.middleware.test.ts b/src/middlewares/schema-version.middleware.test.ts new file mode 100644 index 0000000..a036fea --- /dev/null +++ b/src/middlewares/schema-version.middleware.test.ts @@ -0,0 +1,68 @@ +import { strict as assert } from 'assert'; +import { schemaVersionMiddleware } from './schema-version.middleware'; +import type { Request, Response, NextFunction } from 'express'; +import { REQUEST_SCHEMA_VERSION, SCHEMA_VERSION_HEADER } from '../constants/schema.constants'; +import { envConfig } from '../config'; + +// Minimal mock helpers +function mockRes() { + const headers: Record = {}; + return { + headers, + setHeader(name: string, value: string) { + headers[name] = value; + }, + } as unknown as Response & { headers: Record }; +} + +function mockReq() { + return {} as Request; +} + +function run() { + // sets schema version header when enabled + { + const res = mockRes(); + let called = false; + const next: NextFunction = () => { + called = true; + }; + + // Ensure it's enabled for the test + const originalValue = envConfig.ENABLE_SCHEMA_VERSION_HEADER; + (envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = true; + + schemaVersionMiddleware(mockReq(), res, next); + + assert.equal(res.headers[SCHEMA_VERSION_HEADER], REQUEST_SCHEMA_VERSION); + assert.ok(called, 'next() should be called'); + + // Restore + (envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = originalValue; + } + + // does not set header when disabled + { + const res = mockRes(); + let called = false; + const next: NextFunction = () => { + called = true; + }; + + // Ensure it's disabled for the test + const originalValue = envConfig.ENABLE_SCHEMA_VERSION_HEADER; + (envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = false; + + schemaVersionMiddleware(mockReq(), res, next); + + assert.ok(!(SCHEMA_VERSION_HEADER in res.headers), 'Header should not be set'); + assert.ok(called, 'next() should be called'); + + // Restore + (envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = originalValue; + } + + console.log('schema-version.middleware tests passed'); +} + +run(); diff --git a/src/middlewares/schema-version.middleware.ts b/src/middlewares/schema-version.middleware.ts new file mode 100644 index 0000000..64b63f4 --- /dev/null +++ b/src/middlewares/schema-version.middleware.ts @@ -0,0 +1,22 @@ +// src/middlewares/schema-version.middleware.ts +import { Request, Response, NextFunction } from 'express'; +import { envConfig } from '../config'; +import { REQUEST_SCHEMA_VERSION, SCHEMA_VERSION_HEADER } from '../constants/schema.constants'; + +/** + * Middleware that adds a schema version header to the response. + * + * This header informs the client about the expected structure of request bodies. + * + * Can be enabled/disabled via the `ENABLE_SCHEMA_VERSION_HEADER` environment variable. + */ +export const schemaVersionMiddleware = ( + _req: Request, + res: Response, + next: NextFunction +): void => { + if (envConfig.ENABLE_SCHEMA_VERSION_HEADER) { + res.setHeader(SCHEMA_VERSION_HEADER, REQUEST_SCHEMA_VERSION); + } + next(); +}; diff --git a/src/utils/indexer-dedupe.utils.test.ts b/src/utils/indexer-dedupe.utils.test.ts new file mode 100644 index 0000000..05ab689 --- /dev/null +++ b/src/utils/indexer-dedupe.utils.test.ts @@ -0,0 +1,51 @@ +import { strict as assert } from 'assert'; +import { dedupeChainEvents, ChainEvent } from './indexer-dedupe.utils'; + +function run() { + console.log('Running indexer-dedupe.utils tests...'); + + // Case 1: Unique events are all kept + { + const events: ChainEvent[] = [ + { txHash: '0x1', eventIndex: 0 }, + { txHash: '0x1', eventIndex: 1 }, + { txHash: '0x2', eventIndex: 0 }, + ]; + const deduped = dedupeChainEvents(events); + assert.equal(deduped.length, 3, 'Should keep all unique events'); + } + + // Case 2: Duplicate events are removed + { + const events: ChainEvent[] = [ + { txHash: '0x1', eventIndex: 0 }, + { txHash: '0x1', eventIndex: 0 }, // Duplicate + { txHash: '0x2', eventIndex: 0 }, + ]; + const deduped = dedupeChainEvents(events); + assert.equal(deduped.length, 2, 'Should remove duplicate events'); + assert.equal(deduped[0].txHash, '0x1'); + assert.equal(deduped[1].txHash, '0x2'); + } + + // Case 3: Empty list + { + const deduped = dedupeChainEvents([]); + assert.equal(deduped.length, 0, 'Should handle empty list'); + } + + // Case 4: Events with extra data are preserved + { + const events = [ + { txHash: '0x1', eventIndex: 0, data: 'foo' }, + { txHash: '0x1', eventIndex: 0, data: 'bar' }, // Duplicate txHash/index but different data + ]; + const deduped = dedupeChainEvents(events); + assert.equal(deduped.length, 1, 'Should dedupe regardless of extra data'); + assert.equal(deduped[0].data, 'foo', 'Should keep the first occurrence'); + } + + console.log('indexer-dedupe.utils tests passed'); +} + +run(); diff --git a/src/utils/indexer-dedupe.utils.ts b/src/utils/indexer-dedupe.utils.ts new file mode 100644 index 0000000..d79df19 --- /dev/null +++ b/src/utils/indexer-dedupe.utils.ts @@ -0,0 +1,33 @@ +/** + * Interface representing a minimal chain event for deduplication. + */ +export interface ChainEvent { + /** Transaction hash (unique across the chain) */ + txHash: string; + /** Index of the event within the transaction */ + eventIndex: number; + /** Optional ledger/block number */ + ledger?: number; + [key: string]: any; +} + +/** + * Dedupes a list of chain events based on transaction hash and event index. + * + * This ensures that if the same event is received multiple times in a batch + * (e.g. due to overlapping ingestion windows), it is only processed once. + * + * @param events - The list of events to dedupe. + * @returns A new array containing only unique events. + */ +export function dedupeChainEvents(events: T[]): T[] { + const seen = new Set(); + return events.filter((event) => { + const key = `${event.txHash}:${event.eventIndex}`; + if (seen.has(key)) { + return false; + } + seen.add(key); + return true; + }); +}