Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ PAYSTACK_PUBLIC_KEY=
# API Configuration
API_VERSION=1.0.0
ENABLE_API_VERSION_HEADER=true
ENABLE_SCHEMA_VERSION_HEADER=true
ENABLE_RESPONSE_TIMING=true
ENABLE_REQUEST_LOGGING=true
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The server is responsible for:
- notifications, analytics, and moderation workflows
- access checks for gated off-chain content

See [Backend Domain Model and Endpoint Boundaries](./docs/architecture/domain-boundaries.md) for a technical overview.
See [Backend Domain Model and Endpoint Boundaries](./docs/architecture/domain-boundaries.md) for a technical overview and [API Versioning](./docs/api-versioning.md) for details on schema versioning.

## Tech

Expand Down
24 changes: 24 additions & 0 deletions docs/api-versioning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# API and Schema Versioning

The Access Layer Server uses versioning headers to inform clients about the current API version and the expected structure of request bodies.

## Response Headers

### `X-API-Version`
Indicates the current overall version of the API. This is typically used for tracking feature sets and major API releases.

### `X-Schema-Version`
Indicates the active version of the request body schema. This version should be checked by consumers to ensure they are sending request bodies in the format expected by the server.

## Versioning Strategy

Both headers follow [Semantic Versioning (SemVer)](https://semver.org/):
- **MAJOR** version: Breaking changes to the API or schema.
- **MINOR** version: Backwards-compatible new features or additions.
- **PATCH** version: Backwards-compatible bug fixes.

## Expected Consumer Behavior

1. **Check Headers**: Consumers should inspect the `X-Schema-Version` header in API responses.
2. **Schema Alignment**: If the `X-Schema-Version` major version changes, consumers must update their request body structures to match the new schema requirements.
3. **Warning Handling**: Consumers may choose to log warnings if they detect a version mismatch that they haven't yet updated to support.
33 changes: 33 additions & 0 deletions docs/indexer/EVENT_PROCESSING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Chain Event Processing

The indexer processes events from the blockchain to update the read models and activity feeds. To ensure data consistency and prevent duplicate processing, the following strategies are employed.

## 1. Deduplication

Before processing a batch of events, they should be deduped based on their unique identifier on the chain: `transactionHash` and `eventIndex`.

The `dedupeChainEvents` helper in `src/utils/indexer-dedupe.utils.ts` provides this functionality.

### Example Usage:

```typescript
import { dedupeChainEvents } from '../utils/indexer-dedupe.utils';

const rawEvents = fetchEventsFromChain();
const uniqueEvents = dedupeChainEvents(rawEvents);
// Proceed with processing uniqueEvents
```

## 2. Idempotency

Event handlers must be idempotent. This means that processing the same event multiple times should have the same effect as processing it once.

### Strategies for Idempotency:

- **Database Upserts**: Use Prisma's `upsert` or `update` with unique constraints where possible.
- **State Check**: Before applying a change (like incrementing a balance), verify if the event has already been accounted for (e.g. by checking a `lastProcessedLedger` or a specific event log).
- **Atomic Transactions**: Ensure that all changes related to an event are committed in a single database transaction.

## 3. Error Handling

If an event fails to process after multiple retries, it is moved to the [Dead-Letter Queue (DLQ)](./DLQ_WORKFLOW.md) for manual investigation.
2 changes: 2 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { appRateLimit } from './middlewares/rate.middleware';
import { requestIdMiddleware } from './middlewares/request-id.middleware';
import { responseTimingMiddleware } from './middlewares/response-timing.middleware';
import { apiVersionMiddleware } from './middlewares/api-version.middleware';
import { schemaVersionMiddleware } from './middlewares/schema-version.middleware';
import { requestLoggerMiddleware } from './middlewares/request-logger.middleware';
import { envConfig } from './config';

Expand All @@ -21,6 +22,7 @@ const app: Express = express();
app.set('trust proxy', 1);
app.use(responseTimingMiddleware);
app.use(apiVersionMiddleware);
app.use(schemaVersionMiddleware);
app.use(requestIdMiddleware);
app.use(corsMiddleware());
app.use(helmet());
Expand Down
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export const envSchema = z.object({
ENABLE_RESPONSE_TIMING: z.coerce.boolean().default(true),
API_VERSION: z.string().default('1.0.0'),
ENABLE_API_VERSION_HEADER: z.coerce.boolean().default(true),
ENABLE_SCHEMA_VERSION_HEADER: z.coerce.boolean().default(true),
ENABLE_REQUEST_LOGGING: z.coerce.boolean().default(true),
INDEXER_JITTER_FACTOR: z.coerce.number().min(0).max(1).default(0.1),
});
Expand Down
12 changes: 12 additions & 0 deletions src/constants/schema.constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// src/constants/schema.constants.ts

/**
* Current version of the request body schema.
* This version should be bumped whenever there are breaking changes to the request body structure.
*/
export const REQUEST_SCHEMA_VERSION = '1.0.0';

/**
* The response header key that carries the active request schema version.
*/
export const SCHEMA_VERSION_HEADER = 'X-Schema-Version';
68 changes: 68 additions & 0 deletions src/middlewares/schema-version.middleware.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { strict as assert } from 'assert';
import { schemaVersionMiddleware } from './schema-version.middleware';
import type { Request, Response, NextFunction } from 'express';
import { REQUEST_SCHEMA_VERSION, SCHEMA_VERSION_HEADER } from '../constants/schema.constants';
import { envConfig } from '../config';

// Minimal mock helpers
function mockRes() {
const headers: Record<string, string> = {};
return {
headers,
setHeader(name: string, value: string) {
headers[name] = value;
},
} as unknown as Response & { headers: Record<string, string> };
}

function mockReq() {
return {} as Request;
}

function run() {
// sets schema version header when enabled
{
const res = mockRes();
let called = false;
const next: NextFunction = () => {
called = true;
};

// Ensure it's enabled for the test
const originalValue = envConfig.ENABLE_SCHEMA_VERSION_HEADER;
(envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = true;

schemaVersionMiddleware(mockReq(), res, next);

assert.equal(res.headers[SCHEMA_VERSION_HEADER], REQUEST_SCHEMA_VERSION);
assert.ok(called, 'next() should be called');

// Restore
(envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = originalValue;
}

// does not set header when disabled
{
const res = mockRes();
let called = false;
const next: NextFunction = () => {
called = true;
};

// Ensure it's disabled for the test
const originalValue = envConfig.ENABLE_SCHEMA_VERSION_HEADER;
(envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = false;

schemaVersionMiddleware(mockReq(), res, next);

assert.ok(!(SCHEMA_VERSION_HEADER in res.headers), 'Header should not be set');
assert.ok(called, 'next() should be called');

// Restore
(envConfig as any).ENABLE_SCHEMA_VERSION_HEADER = originalValue;
}

console.log('schema-version.middleware tests passed');
}

run();
22 changes: 22 additions & 0 deletions src/middlewares/schema-version.middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// src/middlewares/schema-version.middleware.ts
import { Request, Response, NextFunction } from 'express';
import { envConfig } from '../config';
import { REQUEST_SCHEMA_VERSION, SCHEMA_VERSION_HEADER } from '../constants/schema.constants';

/**
* Middleware that adds a schema version header to the response.
*
* This header informs the client about the expected structure of request bodies.
*
* Can be enabled/disabled via the `ENABLE_SCHEMA_VERSION_HEADER` environment variable.
*/
export const schemaVersionMiddleware = (
_req: Request,
res: Response,
next: NextFunction
): void => {
if (envConfig.ENABLE_SCHEMA_VERSION_HEADER) {
res.setHeader(SCHEMA_VERSION_HEADER, REQUEST_SCHEMA_VERSION);
}
next();
};
51 changes: 51 additions & 0 deletions src/utils/indexer-dedupe.utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { strict as assert } from 'assert';
import { dedupeChainEvents, ChainEvent } from './indexer-dedupe.utils';

function run() {
console.log('Running indexer-dedupe.utils tests...');

// Case 1: Unique events are all kept
{
const events: ChainEvent[] = [
{ txHash: '0x1', eventIndex: 0 },
{ txHash: '0x1', eventIndex: 1 },
{ txHash: '0x2', eventIndex: 0 },
];
const deduped = dedupeChainEvents(events);
assert.equal(deduped.length, 3, 'Should keep all unique events');
}

// Case 2: Duplicate events are removed
{
const events: ChainEvent[] = [
{ txHash: '0x1', eventIndex: 0 },
{ txHash: '0x1', eventIndex: 0 }, // Duplicate
{ txHash: '0x2', eventIndex: 0 },
];
const deduped = dedupeChainEvents(events);
assert.equal(deduped.length, 2, 'Should remove duplicate events');
assert.equal(deduped[0].txHash, '0x1');
assert.equal(deduped[1].txHash, '0x2');
}

// Case 3: Empty list
{
const deduped = dedupeChainEvents([]);
assert.equal(deduped.length, 0, 'Should handle empty list');
}

// Case 4: Events with extra data are preserved
{
const events = [
{ txHash: '0x1', eventIndex: 0, data: 'foo' },
{ txHash: '0x1', eventIndex: 0, data: 'bar' }, // Duplicate txHash/index but different data
];
const deduped = dedupeChainEvents(events);
assert.equal(deduped.length, 1, 'Should dedupe regardless of extra data');
assert.equal(deduped[0].data, 'foo', 'Should keep the first occurrence');
}

console.log('indexer-dedupe.utils tests passed');
}

run();
33 changes: 33 additions & 0 deletions src/utils/indexer-dedupe.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Interface representing a minimal chain event for deduplication.
*/
export interface ChainEvent {
/** Transaction hash (unique across the chain) */
txHash: string;
/** Index of the event within the transaction */
eventIndex: number;
/** Optional ledger/block number */
ledger?: number;
[key: string]: any;
}

/**
* Dedupes a list of chain events based on transaction hash and event index.
*
* This ensures that if the same event is received multiple times in a batch
* (e.g. due to overlapping ingestion windows), it is only processed once.
*
* @param events - The list of events to dedupe.
* @returns A new array containing only unique events.
*/
export function dedupeChainEvents<T extends ChainEvent>(events: T[]): T[] {
const seen = new Set<string>();
return events.filter((event) => {
const key = `${event.txHash}:${event.eventIndex}`;
if (seen.has(key)) {
return false;
}
seen.add(key);
return true;
});
}
Loading