diff --git a/backend/src/index.ts b/backend/src/index.ts index fbf2f92..ea4a553 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -25,6 +25,7 @@ import { startReconciliationJob } from "./services/reconciliationJob"; import { startWebhookWorker } from "./services/webhookWorker"; import { getDeadLetters, countDeadLetters } from "./services/webhook"; import { + archiveOldStreams, calculateProgress, cancelStream, createStream, @@ -88,6 +89,10 @@ const listStreamsQuerySchema = z.object({ sender: z.string().trim().optional(), asset: z.string().trim().optional(), q: z.string().trim().optional(), + include_archived: z + .enum(["true", "false"]) + .optional() + .transform((v) => v === "true"), page: z .coerce.number() .int("page must be an integer") @@ -160,7 +165,7 @@ app.get("/api/streams", (req: Request, res: Response) => { const hasPage = req.query.page !== undefined; const hasLimit = req.query.limit !== undefined; - let data = listStreams().map((stream) => ({ + let data = listStreams(query.include_archived).map((stream) => ({ ...stream, progress: calculateProgress(stream), })); @@ -247,7 +252,7 @@ app.get("/api/streams/export.csv", (req: Request, res: Response) => { } const query = parsedQuery.data; - let data = listStreams().map((stream) => ({ + let data = listStreams(query.include_archived).map((stream) => ({ ...stream, progress: calculateProgress(stream), })); @@ -537,7 +542,7 @@ app.post( } try { - + const canceledStream = await cancelStream(parsedId.value); res.json({ data: { ...canceledStream, progress: calculateProgress(canceledStream) } }); } catch (error: any) { console.error("Failed to cancel stream:", error); @@ -579,19 +584,18 @@ app.patch( return; } - try { - const updatedStream = updateStreamStartAt(parsedId.value, newStartAt); - res.json({ data: { ...updatedStream, progress: calculateProgress(updatedStream) } }); -} catch (error: any) { - const normalizedError = normalizeUnknownApiError( - error, - "Failed to update stream start time.", - ); - sendApiError(req, res, normalizedError.statusCode, normalizedError.message, { - code: normalizedError.code ?? "INTERNAL_ERROR", - }); -} + const updatedStream = updateStreamStartAt(parsedId.value, parsedBody.data.startAt); + res.json({ data: { ...updatedStream, progress: calculateProgress(updatedStream) } }); + } catch (error: any) { + const normalizedError = normalizeUnknownApiError( + error, + "Failed to update stream start time.", + ); + sendApiError(req, res, normalizedError.statusCode, normalizedError.message, { + code: normalizedError.code ?? "INTERNAL_ERROR", + }); + } }, ); @@ -705,6 +709,21 @@ async function startServer() { await initSoroban(); await syncStreams(); + // Archive old streams on startup + await archiveOldStreams(); + + // Schedule archive job to run every 24 hours + setInterval(async () => { + try { + const archived = await archiveOldStreams(); + if (archived > 0) { + console.log(`[scheduler] archived ${archived} stream(s)`); + } + } catch (err) { + console.error("[scheduler] archive job failed:", err); + } + }, 24 * 60 * 60 * 1000); + // Initialize and start event indexer if (config.sorobanEnabled && config.contractId) { initIndexer(config.rpcUrl, config.contractId, config.networkPassphrase); diff --git a/backend/src/services/db.ts b/backend/src/services/db.ts index 5bc3320..9412e17 100644 --- a/backend/src/services/db.ts +++ b/backend/src/services/db.ts @@ -39,7 +39,24 @@ function migrate(): void { start_at INTEGER NOT NULL, created_at INTEGER NOT NULL, canceled_at INTEGER, - completed_at INTEGER + completed_at INTEGER, + refunded_amount REAL, + archived_at INTEGER + ); + + CREATE TABLE IF NOT EXISTS stream_archive ( + id TEXT PRIMARY KEY, + sender TEXT NOT NULL, + recipient TEXT NOT NULL, + asset_code TEXT NOT NULL, + total_amount REAL NOT NULL, + duration_seconds INTEGER NOT NULL, + start_at INTEGER NOT NULL, + created_at INTEGER NOT NULL, + canceled_at INTEGER, + completed_at INTEGER, + refunded_amount REAL, + archived_at INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS stream_events ( diff --git a/backend/src/services/streamStore.ts b/backend/src/services/streamStore.ts index 41985d5..ec78115 100644 --- a/backend/src/services/streamStore.ts +++ b/backend/src/services/streamStore.ts @@ -37,6 +37,7 @@ export interface StreamRecord { createdAt: number; canceledAt?: number; completedAt?: number; + refundedAmount?: number; } export interface StreamProgress { @@ -59,6 +60,8 @@ interface StreamRow { created_at: number; canceled_at: number | null; completed_at: number | null; + refunded_amount: number | null; + archived_at: number | null; } function rowToRecord(row: StreamRow): StreamRecord { @@ -73,6 +76,7 @@ function rowToRecord(row: StreamRow): StreamRecord { createdAt: row.created_at, canceledAt: row.canceled_at ?? undefined, completedAt: row.completed_at ?? undefined, + refundedAmount: row.refunded_amount ?? undefined, }; } @@ -80,8 +84,8 @@ function upsertStream(record: StreamRecord): void { const db = getDb(); db.prepare( ` - INSERT INTO streams (id, sender, recipient, asset_code, total_amount, duration_seconds, start_at, created_at, canceled_at, completed_at) - VALUES (@id, @sender, @recipient, @assetCode, @totalAmount, @durationSeconds, @startAt, @createdAt, @canceledAt, @completedAt) + INSERT INTO streams (id, sender, recipient, asset_code, total_amount, duration_seconds, start_at, created_at, canceled_at, completed_at, refunded_amount, archived_at) + VALUES (@id, @sender, @recipient, @assetCode, @totalAmount, @durationSeconds, @startAt, @createdAt, @canceledAt, @completedAt, @refundedAmount, @archivedAt) ON CONFLICT(id) DO UPDATE SET sender = excluded.sender, recipient = excluded.recipient, @@ -91,7 +95,9 @@ function upsertStream(record: StreamRecord): void { start_at = excluded.start_at, created_at = excluded.created_at, canceled_at = excluded.canceled_at, - completed_at = excluded.completed_at + completed_at = excluded.completed_at, + refunded_amount = excluded.refunded_amount, + archived_at = excluded.archived_at `, ).run({ id: record.id, @@ -104,6 +110,8 @@ function upsertStream(record: StreamRecord): void { createdAt: record.createdAt, canceledAt: record.canceledAt ?? null, completedAt: record.completedAt ?? null, + refundedAmount: record.refundedAmount ?? null, + archivedAt: null, }); } @@ -140,6 +148,74 @@ function round(value: number): number { return Number(value.toFixed(6)); } +interface CacheEntry { + data: T; + expiresAt: number; +} + +const rpcCache = new Map>(); + +function getCached(key: string): T | null { + const entry = rpcCache.get(key); + if (!entry) return null; + if (Date.now() > entry.expiresAt) { + rpcCache.delete(key); + return null; + } + return entry.data; +} + +function setCached(key: string, data: T, ttlSeconds = 5): void { + rpcCache.set(key, { + data, + expiresAt: Date.now() + ttlSeconds * 1000, + }); +} + +function invalidateCache(pattern?: string): void { + if (!pattern) { + rpcCache.clear(); + } else { + for (const key of rpcCache.keys()) { + if (key.includes(pattern)) { + rpcCache.delete(key); + } + } + } +} + +async function retryWithBackoff( + fn: () => Promise, + maxAttempts = 3, +): Promise { + let lastError: any; + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + return await fn(); + } catch (err) { + lastError = err; + const message = String(err).toLowerCase(); + const isRetryable = + message.includes("timeout") || + message.includes("network") || + message.includes("econnrefused") || + message.includes("econnreset"); + + if (!isRetryable || attempt === maxAttempts) { + throw err; + } + + const delayMs = Math.pow(2, attempt - 1) * 1000; + console.log( + `[retry] attempt ${attempt} failed, retrying in ${delayMs}ms`, + err, + ); + await new Promise((r) => setTimeout(r, delayMs)); + } + } + throw lastError; +} + function getSorobanContext(): | { contract: Contract; @@ -205,6 +281,12 @@ async function fetchOnChainStreamRecord( sourceAccount: Account, id: number, ): Promise { + const cacheKey = `stream:${id}`; + const cached = getCached(cacheKey); + if (cached) { + return cached; + } + const simRes = await simulateContractCall( contract, sourceAccount, @@ -218,7 +300,7 @@ async function fetchOnChainStreamRecord( const streamData = scValToNative(simRes.result.retval); - return { + const result = { id: id.toString(), sender: streamData.sender, recipient: streamData.recipient, @@ -229,6 +311,9 @@ async function fetchOnChainStreamRecord( createdAt: Number(streamData.start_time), canceledAt: streamData.canceled ? nowInSeconds() : undefined, }; + + setCached(cacheKey, result, 5); + return result; } function recordBackfilledCreatedEvent(stream: StreamRecord): void { @@ -437,7 +522,7 @@ export async function createStream(input: StreamInput): Promise { built.sign(serverKeypair); - const sendRes = await rpcServer.sendTransaction(built); + const sendRes = await retryWithBackoff(() => rpcServer!.sendTransaction(built)); if (sendRes.status !== "PENDING") { throw new Error("Failed to send transaction: " + JSON.stringify(sendRes)); } @@ -445,7 +530,7 @@ export async function createStream(input: StreamInput): Promise { let txResult; let attempts = 0; while (attempts < 10) { - txResult = await rpcServer.getTransaction(sendRes.hash); + txResult = await retryWithBackoff(() => rpcServer!.getTransaction(sendRes.hash)); if (txResult.status !== "NOT_FOUND") break; await new Promise((r) => setTimeout(r, 1000)); attempts++; @@ -488,6 +573,9 @@ export async function createStream(input: StreamInput): Promise { ); })(); + // Invalidate cache to ensure freshness after stream creation + invalidateCache("stream:"); + // Webhook fires after the transaction commits — a webhook failure // must never roll back an already-persisted stream. triggerWebhook("created", stream); @@ -523,11 +611,76 @@ export function refreshStreamStatuses(): number { return result.changes; } -export function listStreams(): StreamRecord[] { +export async function archiveOldStreams(): Promise { const db = getDb(); - const rows = db - .prepare("SELECT * FROM streams ORDER BY created_at DESC") - .all() as StreamRow[]; + const thirtyDaysAgo = nowInSeconds() - 30 * 24 * 60 * 60; + + try { + // Find completed streams older than 30 days that haven't been archived yet + const streamsToArchive = db + .prepare( + ` + SELECT * FROM streams + WHERE completed_at IS NOT NULL + AND completed_at < ? + AND archived_at IS NULL + `, + ) + .all(thirtyDaysAgo) as StreamRow[]; + + if (streamsToArchive.length === 0) { + return 0; + } + + const now = nowInSeconds(); + let archived = 0; + + db.transaction(() => { + for (const row of streamsToArchive) { + const record = rowToRecord(row); + record.refundedAmount = row.refunded_amount ?? undefined; + + // Insert into archive + db.prepare( + ` + INSERT INTO stream_archive (id, sender, recipient, asset_code, total_amount, duration_seconds, start_at, created_at, canceled_at, completed_at, refunded_amount, archived_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, + ).run( + record.id, + record.sender, + record.recipient, + record.assetCode, + record.totalAmount, + record.durationSeconds, + record.startAt, + record.createdAt, + record.canceledAt ?? null, + record.completedAt ?? null, + record.refundedAmount ?? null, + now, + ); + + // Mark as archived in main table + db.prepare("UPDATE streams SET archived_at = ? WHERE id = ?").run(now, record.id); + archived++; + } + })(); + + console.log(`[archive] archived ${archived} completed stream(s)`); + return archived; + } catch (err) { + console.error("[archive] failed to archive old streams:", err); + return 0; + } +} + +export function listStreams(includeArchived = false): StreamRecord[] { + const db = getDb(); + const query = includeArchived + ? "SELECT * FROM streams ORDER BY created_at DESC" + : "SELECT * FROM streams WHERE archived_at IS NULL ORDER BY created_at DESC"; + const rows = db.prepare(query).all() as StreamRow[]; return rows.map(rowToRecord); } @@ -565,6 +718,63 @@ export async function cancelStream( stream.canceledAt = nowInSeconds(); + // Attempt to get refund amount from on-chain cancel transaction. + // For now, we extract from potential on-chain response. In production, + // this would send an actual cancel_stream transaction to the contract. + let refundAmount: number | undefined = undefined; + try { + const sorobanContext = getSorobanContext(); + if (sorobanContext && rpcServer && serverKeypair) { + const contractId = process.env.CONTRACT_ID; + if (contractId) { + const sourceAccount = await rpcServer.getAccount(serverKeypair.publicKey()); + const contract = new Contract(contractId); + const tx = contract.call( + "cancel_stream", + nativeToScVal(parseInt(id), { type: "u64" }), + ); + + const built = await rpcServer.prepareTransaction( + new TransactionBuilder(sourceAccount, { + fee: "1000", + networkPassphrase: process.env.NETWORK_PASSPHRASE || Networks.TESTNET, + }) + .addOperation(tx) + .setTimeout(30) + .build(), + ); + + built.sign(serverKeypair); + const sendRes = await retryWithBackoff(() => rpcServer!.sendTransaction(built)); + if (sendRes.status === "PENDING") { + let txResult; + let attempts = 0; + while (attempts < 10) { + txResult = await retryWithBackoff(() => + rpcServer!.getTransaction(sendRes.hash), + ); + if (txResult.status !== "NOT_FOUND") break; + await new Promise((r) => setTimeout(r, 1000)); + attempts++; + } + + if (txResult?.status === "SUCCESS" && txResult.returnValue) { + refundAmount = Number(scValToNative(txResult.returnValue)); + stream.refundedAmount = refundAmount; + } + } + } + } + } catch (err) { + console.warn( + `[cancel] failed to get refund amount from chain for stream ${id}:`, + err, + ); + } + + // Invalidate cache + invalidateCache(`stream:${id}`); + // Atomically write the updated stream row and the cancellation event. const db = getDb(); db.transaction(() => {