diff --git a/backend/src/api/routes/index.ts b/backend/src/api/routes/index.ts index 84712fa6..568cb880 100644 --- a/backend/src/api/routes/index.ts +++ b/backend/src/api/routes/index.ts @@ -40,8 +40,7 @@ import { digestSchedulerRoutes } from "./digestScheduler.js"; import { alertSuppressionRoutes } from "./alertSuppression.js"; import { externalDependenciesRoutes } from "./externalDependencies.routes.js"; import { providerHealthRegistryRoutes } from "./providerHealthRegistry.routes.js"; -import { outboxAdminRoutes } from "./outbox-admin.js"; -import { adminConfigRoutes } from "./admin/configs.js"; +import { reconciliationRoutes } from "./reconciliation.js"; export async function registerRoutes(server: FastifyInstance) { server.register(assetsRoutes, { prefix: "/api/v1/assets" }); @@ -87,6 +86,5 @@ export async function registerRoutes(server: FastifyInstance) { server.register(alertSuppressionRoutes, { prefix: "/api/v1/alert-suppression" }); server.register(externalDependenciesRoutes, { prefix: "/api/v1/external-dependencies" }); server.register(providerHealthRegistryRoutes, { prefix: "/api/v1/providers/health" }); - server.register(outboxAdminRoutes, { prefix: "/api/v1/admin/outbox" }); - server.register(adminConfigRoutes, { prefix: "/api/v1/admin/configs" }); + server.register(reconciliationRoutes, { prefix: "/api/v1/reconciliation" }); } diff --git a/backend/src/api/routes/reconciliation.ts b/backend/src/api/routes/reconciliation.ts new file mode 100644 index 00000000..58a80c23 --- /dev/null +++ b/backend/src/api/routes/reconciliation.ts @@ -0,0 +1,55 @@ +import type { FastifyInstance, FastifyPluginOptions, FastifyReply, FastifyRequest } from "fastify"; +import { z } from "zod"; +import { ReconciliationService } from "../../services/reconciliation.service.js"; +import { logger } from "../../utils/logger.js"; + +const listQuerySchema = z.object({ + assetCode: z.string().min(1).optional(), + limit: z.coerce.number().int().positive().max(500).optional(), +}); + +export async function reconciliationRoutes( + fastify: FastifyInstance, + _options: FastifyPluginOptions +) { + const svc = new ReconciliationService(); + + fastify.get( + "/runs", + async (request: FastifyRequest, reply: FastifyReply) => { + const parsed = listQuerySchema.safeParse(request.query ?? {}); + if (!parsed.success) { + return reply.code(400).send({ error: "Invalid query", details: parsed.error.flatten() }); + } + + try { + const runs = await svc.listRuns({ + assetCode: parsed.data.assetCode, + limit: parsed.data.limit, + }); + return { runs }; + } catch (error) { + logger.error({ error }, "Failed to list reconciliation runs"); + return reply.code(500).send({ error: "Failed to list reconciliation runs" }); + } + } + ); + + fastify.get( + "/latest/:assetCode", + async (request: FastifyRequest, reply: FastifyReply) => { + const { assetCode } = request.params as { assetCode: string }; + if (!assetCode) return reply.code(400).send({ error: "assetCode required" }); + + try { + const run = await svc.getLatestRun(assetCode); + if (!run) return reply.code(404).send({ error: "No runs found" }); + return { run }; + } catch (error) { + logger.error({ error, assetCode }, "Failed to fetch latest reconciliation run"); + return reply.code(500).send({ error: "Failed to fetch latest reconciliation run" }); + } + } + ); +} + diff --git a/backend/src/database/migrations/007_reconciliation_runs.ts b/backend/src/database/migrations/007_reconciliation_runs.ts new file mode 100644 index 00000000..44aa24e9 --- /dev/null +++ b/backend/src/database/migrations/007_reconciliation_runs.ts @@ -0,0 +1,38 @@ +import type { Knex } from "knex"; + +export async function up(knex: Knex): Promise { + await knex.schema.createTable("reconciliation_runs", (table) => { + table.timestamp("started_at").notNullable().defaultTo(knex.fn.now()); + table.uuid("id").notNullable().defaultTo(knex.raw("gen_random_uuid()")); + + table.string("asset_code").notNullable(); + table.string("job_id").nullable(); + + table + .string("status") + .notNullable() + .defaultTo("running") + .checkIn(["running", "success", "mismatch", "failed"]); + + table.decimal("stellar_supply", 30, 7).nullable(); + table.decimal("reported_supply", 30, 7).nullable(); + table.decimal("mismatch_percentage", 20, 8).nullable(); + + table.integer("attempt").notNullable().defaultTo(1); + table.text("error").nullable(); + + table.timestamp("finished_at").nullable(); + table.timestamps(true, true); + + table.index(["asset_code", "started_at"]); + table.index(["status", "started_at"]); + }); + + await knex.raw( + "SELECT create_hypertable('reconciliation_runs', 'started_at', if_not_exists => TRUE)" + ); +} + +export async function down(knex: Knex): Promise { + await knex.schema.dropTableIfExists("reconciliation_runs"); +} diff --git a/backend/src/database/types.ts b/backend/src/database/types.ts index fd506623..13ee164a 100644 --- a/backend/src/database/types.ts +++ b/backend/src/database/types.ts @@ -52,6 +52,7 @@ export interface BridgeTransactionSummary { confirmedTransactions: number; failedTransactions: number; } +export type ReconciliationStatus = "running" | "success" | "mismatch" | "failed"; // ─── assets ────────────────────────────────────────────────────────────────── @@ -201,6 +202,24 @@ export interface VerificationResult { job_id: string | null; } +// ─── reconciliation_runs (hypertable) ──────────────────────────────────────── + +export interface ReconciliationRun { + started_at: Date; + id: string; + asset_code: string; + job_id: string | null; + status: ReconciliationStatus; + stellar_supply: string | null; + reported_supply: string | null; + mismatch_percentage: string | null; + attempt: number; + error: string | null; + finished_at: Date | null; + created_at: Date; + updated_at: Date; +} + // ─── alert_rules ───────────────────────────────────────────────────────────── export interface AlertRule { diff --git a/backend/src/services/reconciliation.service.ts b/backend/src/services/reconciliation.service.ts new file mode 100644 index 00000000..e5618cf0 --- /dev/null +++ b/backend/src/services/reconciliation.service.ts @@ -0,0 +1,67 @@ +import { getDatabase } from "../database/connection.js"; +import type { ReconciliationStatus } from "../database/types.js"; + +export interface CreateReconciliationRunInput { + assetCode: string; + jobId?: string | null; + attempt?: number; +} + +export interface FinishReconciliationRunInput { + id: string; + status: Exclude; + stellarSupply?: number | null; + reportedSupply?: number | null; + mismatchPercentage?: number | null; + error?: string | null; +} + +export class ReconciliationService { + private readonly db = getDatabase(); + + async startRun(input: CreateReconciliationRunInput): Promise<{ id: string }> { + const [row] = await this.db("reconciliation_runs") + .insert({ + started_at: new Date(), + asset_code: input.assetCode, + job_id: input.jobId ?? null, + status: "running", + attempt: input.attempt ?? 1, + }) + .returning<{ id: string }[]>("id"); + + return { id: row?.id ?? "" }; + } + + async finishRun(input: FinishReconciliationRunInput): Promise { + await this.db("reconciliation_runs") + .where({ id: input.id }) + .update({ + status: input.status, + stellar_supply: input.stellarSupply ?? null, + reported_supply: input.reportedSupply ?? null, + mismatch_percentage: input.mismatchPercentage ?? null, + error: input.error ?? null, + finished_at: new Date(), + updated_at: new Date(), + }); + } + + async listRuns(params: { assetCode?: string; limit?: number } = {}) { + const limit = Math.min(Math.max(params.limit ?? 50, 1), 500); + const q = this.db("reconciliation_runs") + .orderBy("started_at", "desc") + .limit(limit); + + if (params.assetCode) q.where({ asset_code: params.assetCode }); + return q; + } + + async getLatestRun(assetCode: string) { + return this.db("reconciliation_runs") + .where({ asset_code: assetCode }) + .orderBy("started_at", "desc") + .first(); + } +} + diff --git a/backend/src/utils/lock.ts b/backend/src/utils/lock.ts new file mode 100644 index 00000000..8324145e --- /dev/null +++ b/backend/src/utils/lock.ts @@ -0,0 +1,27 @@ +import { redis } from "./redis.js"; + +export async function acquireLock(params: { + key: string; + value: string; + ttlMs: number; +}): Promise { + const res = await redis.set(params.key, params.value, "PX", params.ttlMs, "NX"); + return res === "OK"; +} + +export async function releaseLock(params: { + key: string; + value: string; +}): Promise { + const lua = ` + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + `; + + const deleted = await redis.eval(lua, 1, params.key, params.value); + return Number(deleted) === 1; +} + diff --git a/backend/src/workers/index.ts b/backend/src/workers/index.ts index 5e28e745..dfd8dd97 100644 --- a/backend/src/workers/index.ts +++ b/backend/src/workers/index.ts @@ -8,6 +8,7 @@ import { processMetricsRollup } from "./metricsRollup.worker.js"; import { processDigestScheduler } from "./digestScheduler.worker.js"; import { processMetadataSync } from "./metadataSync.job.js"; import { processExternalDependencyMonitor } from "./externalDependencyMonitor.job.js"; +import { processReconciliation } from "./reconciliation.job.js"; import { logger } from "../utils/logger.js"; import { initSupplyVerificationJob } from "../jobs/supplyVerification.job.js"; import { runAuditRetentionJob } from "../jobs/auditRetention.job.js"; @@ -58,8 +59,8 @@ export async function initJobSystem() { case "external-dependency-monitor": await processExternalDependencyMonitor(job); break; - case "cache-priming": - await processCachePriming(job); + case "reconciliation": + await processReconciliation(job as any); break; default: logger.warn({ jobName: job.name }, "Unknown job name in worker"); @@ -129,6 +130,14 @@ export async function initJobSystem() { // External dependency checks: every 2 minutes await jobQueue.addRepeatableJob("external-dependency-monitor", {}, "*/2 * * * *"); + // reconciliation: per-asset, every hour (top of hour) + // Note: This uses the queue helper for retry/backoff defaults. + for (const assetCode of ["USDC", "EURC"]) { + await jobQueue.addJob("reconciliation", { assetCode }, { + repeat: { pattern: "0 * * * *" }, + jobId: `reconciliation:${assetCode}`, + }); + } // Cache priming: High priority every hour, Full every day at 03:00 UTC await jobQueue.addRepeatableJob("cache-priming", { priority: "high" }, "0 * * * *"); diff --git a/backend/src/workers/reconciliation.job.ts b/backend/src/workers/reconciliation.job.ts new file mode 100644 index 00000000..9e97e4be --- /dev/null +++ b/backend/src/workers/reconciliation.job.ts @@ -0,0 +1,99 @@ +import type { Job } from "bullmq"; +import crypto from "crypto"; +import { BridgeService } from "../services/bridge.service.js"; +import { ReconciliationService } from "../services/reconciliation.service.js"; +import { logger } from "../utils/logger.js"; +import { acquireLock, releaseLock } from "../utils/lock.js"; + +export interface ReconciliationJobData { + assetCode: string; +} + +function lockKey(assetCode: string) { + return `lock:reconciliation:${assetCode}`; +} + +export function createReconciliationProcessor(deps?: { + bridgeService?: BridgeService; + reconciliationService?: ReconciliationService; + acquireLock?: typeof acquireLock; + releaseLock?: typeof releaseLock; + lockTtlMs?: number; +}) { + const bridgeService = deps?.bridgeService ?? new BridgeService(); + const reconciliationService = deps?.reconciliationService ?? new ReconciliationService(); + const acquire = deps?.acquireLock ?? acquireLock; + const release = deps?.releaseLock ?? releaseLock; + const lockTtlMs = deps?.lockTtlMs ?? 10 * 60 * 1000; + + return async function processReconciliation(job: Job) { + const { assetCode } = job.data; + const lockValue = crypto.randomUUID(); + + const locked = await acquire({ + key: lockKey(assetCode), + value: lockValue, + ttlMs: lockTtlMs, + }); + + if (!locked) { + logger.warn({ assetCode, jobId: job.id }, "Reconciliation skipped (lock held)"); + return; + } + + const attempt = (job.attemptsMade ?? 0) + 1; + const started = Date.now(); + const run = await reconciliationService.startRun({ + assetCode, + jobId: job.id ?? null, + attempt, + }); + + try { + const result = await bridgeService.verifySupply(assetCode); + + const status = result.errorStatus + ? "failed" + : result.isFlagged + ? "mismatch" + : "success"; + + await reconciliationService.finishRun({ + id: run.id, + status, + stellarSupply: result.stellarSupply, + reportedSupply: result.ethereumReserves, + mismatchPercentage: result.mismatchPercentage, + error: result.errorStatus ?? null, + }); + + logger.info( + { + assetCode, + status, + mismatchPercentage: result.mismatchPercentage, + durationMs: Date.now() - started, + }, + "Reconciliation complete" + ); + } catch (error: any) { + const message = error?.message || String(error); + + await reconciliationService.finishRun({ + id: run.id, + status: "failed", + error: message, + }); + + logger.error({ assetCode, error: message, jobId: job.id }, "Reconciliation failed"); + throw error; + } finally { + await release({ key: lockKey(assetCode), value: lockValue }).catch(() => {}); + } + }; +} + +export async function processReconciliation(job: Job) { + return createReconciliationProcessor()(job); +} + diff --git a/backend/tests/workers/reconciliation.job.test.ts b/backend/tests/workers/reconciliation.job.test.ts new file mode 100644 index 00000000..224dd775 --- /dev/null +++ b/backend/tests/workers/reconciliation.job.test.ts @@ -0,0 +1,84 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { createReconciliationProcessor } from "../../src/workers/reconciliation.job.js"; + +const verifySupplyMock = vi.hoisted(() => vi.fn()); +const startRunMock = vi.hoisted(() => vi.fn().mockResolvedValue({ id: "run-1" })); +const finishRunMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); +const acquireLockMock = vi.hoisted(() => vi.fn().mockResolvedValue(true)); +const releaseLockMock = vi.hoisted(() => vi.fn().mockResolvedValue(true)); + +describe("reconciliation job", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("persists success run when supplies match", async () => { + verifySupplyMock.mockResolvedValueOnce({ + assetCode: "USDC", + stellarSupply: 100, + ethereumReserves: 100, + mismatchPercentage: 0, + isFlagged: false, + match: true, + errorStatus: null, + }); + + const processReconciliation = createReconciliationProcessor({ + bridgeService: { verifySupply: verifySupplyMock } as any, + reconciliationService: { startRun: startRunMock, finishRun: finishRunMock } as any, + acquireLock: acquireLockMock as any, + releaseLock: releaseLockMock as any, + lockTtlMs: 50, + }); + + await processReconciliation({ + id: "job-1", + data: { assetCode: "USDC" }, + attemptsMade: 0, + } as any); + + expect(startRunMock).toHaveBeenCalledWith( + expect.objectContaining({ assetCode: "USDC", jobId: "job-1", attempt: 1 }) + ); + expect(finishRunMock).toHaveBeenCalledWith( + expect.objectContaining({ + id: "run-1", + status: "success", + stellarSupply: 100, + reportedSupply: 100, + mismatchPercentage: 0, + }) + ); + }); + + it("persists mismatch run when flagged", async () => { + verifySupplyMock.mockResolvedValueOnce({ + assetCode: "EURC", + stellarSupply: 110, + ethereumReserves: 100, + mismatchPercentage: 10, + isFlagged: true, + match: false, + errorStatus: null, + }); + + const processReconciliation = createReconciliationProcessor({ + bridgeService: { verifySupply: verifySupplyMock } as any, + reconciliationService: { startRun: startRunMock, finishRun: finishRunMock } as any, + acquireLock: acquireLockMock as any, + releaseLock: releaseLockMock as any, + lockTtlMs: 50, + }); + + await processReconciliation({ + id: "job-2", + data: { assetCode: "EURC" }, + attemptsMade: 1, + } as any); + + expect(finishRunMock).toHaveBeenCalledWith( + expect.objectContaining({ id: "run-1", status: "mismatch" }) + ); + }); +}); + diff --git a/docs/reconciliation-workflow.md b/docs/reconciliation-workflow.md new file mode 100644 index 00000000..181a83c0 --- /dev/null +++ b/docs/reconciliation-workflow.md @@ -0,0 +1,31 @@ +## Reconciliation workflow + +The reconciliation job periodically compares **on-chain** observed balances/supply on Stellar with the **reported/source-chain** balance used by the bridge verifier (currently Ethereum reserves for bridged assets). + +### What it does + +- Runs on a schedule (hourly by default) for each configured asset (`USDC`, `EURC`) +- Uses a Redis lock to avoid overlapping runs per asset +- Persists every run (including mismatches and failures) to PostgreSQL/Timescale (`reconciliation_runs`) +- Emits structured logs for mismatch detection and job timing + +### Where it runs + +- Worker: `backend/src/workers/reconciliation.job.ts` +- Scheduler/registration: `backend/src/workers/index.ts` + +### Data stored + +Each run records: + +- `asset_code` +- `status`: `success`, `mismatch`, or `failed` +- `stellar_supply` and `reported_supply` +- `mismatch_percentage` +- timestamps + error message (if any) + +### API + +- `GET /api/v1/reconciliation/runs?assetCode=USDC&limit=50` +- `GET /api/v1/reconciliation/latest/:assetCode` +