Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions backend/src/api/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ import { digestSchedulerRoutes } from "./digestScheduler.js";
import { alertSuppressionRoutes } from "./alertSuppression.js";
import { externalDependenciesRoutes } from "./externalDependencies.routes.js";
import { providerHealthRegistryRoutes } from "./providerHealthRegistry.routes.js";
import { outboxAdminRoutes } from "./outbox-admin.js";
import { adminConfigRoutes } from "./admin/configs.js";
import { reconciliationRoutes } from "./reconciliation.js";

export async function registerRoutes(server: FastifyInstance) {
server.register(assetsRoutes, { prefix: "/api/v1/assets" });
Expand Down Expand Up @@ -87,6 +86,5 @@ export async function registerRoutes(server: FastifyInstance) {
server.register(alertSuppressionRoutes, { prefix: "/api/v1/alert-suppression" });
server.register(externalDependenciesRoutes, { prefix: "/api/v1/external-dependencies" });
server.register(providerHealthRegistryRoutes, { prefix: "/api/v1/providers/health" });
server.register(outboxAdminRoutes, { prefix: "/api/v1/admin/outbox" });
server.register(adminConfigRoutes, { prefix: "/api/v1/admin/configs" });
server.register(reconciliationRoutes, { prefix: "/api/v1/reconciliation" });
}
55 changes: 55 additions & 0 deletions backend/src/api/routes/reconciliation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import type { FastifyInstance, FastifyPluginOptions, FastifyReply, FastifyRequest } from "fastify";
import { z } from "zod";
import { ReconciliationService } from "../../services/reconciliation.service.js";
import { logger } from "../../utils/logger.js";

const listQuerySchema = z.object({
assetCode: z.string().min(1).optional(),
limit: z.coerce.number().int().positive().max(500).optional(),
});

export async function reconciliationRoutes(
fastify: FastifyInstance,
_options: FastifyPluginOptions
) {
const svc = new ReconciliationService();

fastify.get(
"/runs",
async (request: FastifyRequest, reply: FastifyReply) => {
const parsed = listQuerySchema.safeParse(request.query ?? {});
if (!parsed.success) {
return reply.code(400).send({ error: "Invalid query", details: parsed.error.flatten() });
}

try {
const runs = await svc.listRuns({
assetCode: parsed.data.assetCode,
limit: parsed.data.limit,
});
return { runs };
} catch (error) {
logger.error({ error }, "Failed to list reconciliation runs");
return reply.code(500).send({ error: "Failed to list reconciliation runs" });
}
}
);

fastify.get(
"/latest/:assetCode",
async (request: FastifyRequest, reply: FastifyReply) => {
const { assetCode } = request.params as { assetCode: string };
if (!assetCode) return reply.code(400).send({ error: "assetCode required" });

try {
const run = await svc.getLatestRun(assetCode);
if (!run) return reply.code(404).send({ error: "No runs found" });
return { run };
} catch (error) {
logger.error({ error, assetCode }, "Failed to fetch latest reconciliation run");
return reply.code(500).send({ error: "Failed to fetch latest reconciliation run" });
}
}
);
}

38 changes: 38 additions & 0 deletions backend/src/database/migrations/007_reconciliation_runs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import type { Knex } from "knex";

export async function up(knex: Knex): Promise<void> {
await knex.schema.createTable("reconciliation_runs", (table) => {
table.timestamp("started_at").notNullable().defaultTo(knex.fn.now());
table.uuid("id").notNullable().defaultTo(knex.raw("gen_random_uuid()"));

table.string("asset_code").notNullable();
table.string("job_id").nullable();

table
.string("status")
.notNullable()
.defaultTo("running")
.checkIn(["running", "success", "mismatch", "failed"]);

table.decimal("stellar_supply", 30, 7).nullable();
table.decimal("reported_supply", 30, 7).nullable();
table.decimal("mismatch_percentage", 20, 8).nullable();

table.integer("attempt").notNullable().defaultTo(1);
table.text("error").nullable();

table.timestamp("finished_at").nullable();
table.timestamps(true, true);

table.index(["asset_code", "started_at"]);
table.index(["status", "started_at"]);
});

await knex.raw(
"SELECT create_hypertable('reconciliation_runs', 'started_at', if_not_exists => TRUE)"
);
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.dropTableIfExists("reconciliation_runs");
}
19 changes: 19 additions & 0 deletions backend/src/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ export interface BridgeTransactionSummary {
confirmedTransactions: number;
failedTransactions: number;
}
export type ReconciliationStatus = "running" | "success" | "mismatch" | "failed";

// ─── assets ──────────────────────────────────────────────────────────────────

Expand Down Expand Up @@ -201,6 +202,24 @@ export interface VerificationResult {
job_id: string | null;
}

// ─── reconciliation_runs (hypertable) ────────────────────────────────────────

export interface ReconciliationRun {
started_at: Date;
id: string;
asset_code: string;
job_id: string | null;
status: ReconciliationStatus;
stellar_supply: string | null;
reported_supply: string | null;
mismatch_percentage: string | null;
attempt: number;
error: string | null;
finished_at: Date | null;
created_at: Date;
updated_at: Date;
}

// ─── alert_rules ─────────────────────────────────────────────────────────────

export interface AlertRule {
Expand Down
67 changes: 67 additions & 0 deletions backend/src/services/reconciliation.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { getDatabase } from "../database/connection.js";
import type { ReconciliationStatus } from "../database/types.js";

export interface CreateReconciliationRunInput {
assetCode: string;
jobId?: string | null;
attempt?: number;
}

export interface FinishReconciliationRunInput {
id: string;
status: Exclude<ReconciliationStatus, "running">;
stellarSupply?: number | null;
reportedSupply?: number | null;
mismatchPercentage?: number | null;
error?: string | null;
}

export class ReconciliationService {
private readonly db = getDatabase();

async startRun(input: CreateReconciliationRunInput): Promise<{ id: string }> {
const [row] = await this.db("reconciliation_runs")
.insert({
started_at: new Date(),
asset_code: input.assetCode,
job_id: input.jobId ?? null,
status: "running",
attempt: input.attempt ?? 1,
})
.returning<{ id: string }[]>("id");

return { id: row?.id ?? "" };
}

async finishRun(input: FinishReconciliationRunInput): Promise<void> {
await this.db("reconciliation_runs")
.where({ id: input.id })
.update({
status: input.status,
stellar_supply: input.stellarSupply ?? null,
reported_supply: input.reportedSupply ?? null,
mismatch_percentage: input.mismatchPercentage ?? null,
error: input.error ?? null,
finished_at: new Date(),
updated_at: new Date(),
});
}

async listRuns(params: { assetCode?: string; limit?: number } = {}) {
const limit = Math.min(Math.max(params.limit ?? 50, 1), 500);
const q = this.db("reconciliation_runs")
.orderBy("started_at", "desc")
.limit(limit);

if (params.assetCode) q.where({ asset_code: params.assetCode });
return q;
}

async getLatestRun(assetCode: string) {
return this.db("reconciliation_runs")
.where({ asset_code: assetCode })
.orderBy("started_at", "desc")
.first();
}
}

27 changes: 27 additions & 0 deletions backend/src/utils/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { redis } from "./redis.js";

export async function acquireLock(params: {
key: string;
value: string;
ttlMs: number;
}): Promise<boolean> {
const res = await redis.set(params.key, params.value, "PX", params.ttlMs, "NX");
return res === "OK";
}

export async function releaseLock(params: {
key: string;
value: string;
}): Promise<boolean> {
const lua = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;

const deleted = await redis.eval(lua, 1, params.key, params.value);
return Number(deleted) === 1;
}

13 changes: 11 additions & 2 deletions backend/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { processMetricsRollup } from "./metricsRollup.worker.js";
import { processDigestScheduler } from "./digestScheduler.worker.js";
import { processMetadataSync } from "./metadataSync.job.js";
import { processExternalDependencyMonitor } from "./externalDependencyMonitor.job.js";
import { processReconciliation } from "./reconciliation.job.js";
import { logger } from "../utils/logger.js";
import { initSupplyVerificationJob } from "../jobs/supplyVerification.job.js";
import { runAuditRetentionJob } from "../jobs/auditRetention.job.js";
Expand Down Expand Up @@ -58,8 +59,8 @@ export async function initJobSystem() {
case "external-dependency-monitor":
await processExternalDependencyMonitor(job);
break;
case "cache-priming":
await processCachePriming(job);
case "reconciliation":
await processReconciliation(job as any);
break;
default:
logger.warn({ jobName: job.name }, "Unknown job name in worker");
Expand Down Expand Up @@ -129,6 +130,14 @@ export async function initJobSystem() {

// External dependency checks: every 2 minutes
await jobQueue.addRepeatableJob("external-dependency-monitor", {}, "*/2 * * * *");
// reconciliation: per-asset, every hour (top of hour)
// Note: This uses the queue helper for retry/backoff defaults.
for (const assetCode of ["USDC", "EURC"]) {
await jobQueue.addJob("reconciliation", { assetCode }, {
repeat: { pattern: "0 * * * *" },
jobId: `reconciliation:${assetCode}`,
});
}

// Cache priming: High priority every hour, Full every day at 03:00 UTC
await jobQueue.addRepeatableJob("cache-priming", { priority: "high" }, "0 * * * *");
Expand Down
99 changes: 99 additions & 0 deletions backend/src/workers/reconciliation.job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import type { Job } from "bullmq";
import crypto from "crypto";
import { BridgeService } from "../services/bridge.service.js";
import { ReconciliationService } from "../services/reconciliation.service.js";
import { logger } from "../utils/logger.js";
import { acquireLock, releaseLock } from "../utils/lock.js";

export interface ReconciliationJobData {
assetCode: string;
}

function lockKey(assetCode: string) {
return `lock:reconciliation:${assetCode}`;
}

export function createReconciliationProcessor(deps?: {
bridgeService?: BridgeService;
reconciliationService?: ReconciliationService;
acquireLock?: typeof acquireLock;
releaseLock?: typeof releaseLock;
lockTtlMs?: number;
}) {
const bridgeService = deps?.bridgeService ?? new BridgeService();
const reconciliationService = deps?.reconciliationService ?? new ReconciliationService();
const acquire = deps?.acquireLock ?? acquireLock;
const release = deps?.releaseLock ?? releaseLock;
const lockTtlMs = deps?.lockTtlMs ?? 10 * 60 * 1000;

return async function processReconciliation(job: Job<ReconciliationJobData>) {
const { assetCode } = job.data;
const lockValue = crypto.randomUUID();

const locked = await acquire({
key: lockKey(assetCode),
value: lockValue,
ttlMs: lockTtlMs,
});

if (!locked) {
logger.warn({ assetCode, jobId: job.id }, "Reconciliation skipped (lock held)");
return;
}

const attempt = (job.attemptsMade ?? 0) + 1;
const started = Date.now();
const run = await reconciliationService.startRun({
assetCode,
jobId: job.id ?? null,
attempt,
});

try {
const result = await bridgeService.verifySupply(assetCode);

const status = result.errorStatus
? "failed"
: result.isFlagged
? "mismatch"
: "success";

await reconciliationService.finishRun({
id: run.id,
status,
stellarSupply: result.stellarSupply,
reportedSupply: result.ethereumReserves,
mismatchPercentage: result.mismatchPercentage,
error: result.errorStatus ?? null,
});

logger.info(
{
assetCode,
status,
mismatchPercentage: result.mismatchPercentage,
durationMs: Date.now() - started,
},
"Reconciliation complete"
);
} catch (error: any) {
const message = error?.message || String(error);

await reconciliationService.finishRun({
id: run.id,
status: "failed",
error: message,
});

logger.error({ assetCode, error: message, jobId: job.id }, "Reconciliation failed");
throw error;
} finally {
await release({ key: lockKey(assetCode), value: lockValue }).catch(() => {});
}
};
}

export async function processReconciliation(job: Job<ReconciliationJobData>) {
return createReconciliationProcessor()(job);
}

Loading
Loading