Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { startReconciliationJob } from "./services/reconciliationJob";
import { startWebhookWorker } from "./services/webhookWorker";
import { getDeadLetters, countDeadLetters } from "./services/webhook";
import {
archiveOldStreams,
calculateProgress,
cancelStream,
createStream,
Expand Down Expand Up @@ -88,6 +89,10 @@ const listStreamsQuerySchema = z.object({
sender: z.string().trim().optional(),
asset: z.string().trim().optional(),
q: z.string().trim().optional(),
include_archived: z
.enum(["true", "false"])
.optional()
.transform((v) => v === "true"),
page: z
.coerce.number()
.int("page must be an integer")
Expand Down Expand Up @@ -160,7 +165,7 @@ app.get("/api/streams", (req: Request, res: Response) => {
const hasPage = req.query.page !== undefined;
const hasLimit = req.query.limit !== undefined;

let data = listStreams().map((stream) => ({
let data = listStreams(query.include_archived).map((stream) => ({
...stream,
progress: calculateProgress(stream),
}));
Expand Down Expand Up @@ -247,7 +252,7 @@ app.get("/api/streams/export.csv", (req: Request, res: Response) => {
}

const query = parsedQuery.data;
let data = listStreams().map((stream) => ({
let data = listStreams(query.include_archived).map((stream) => ({
...stream,
progress: calculateProgress(stream),
}));
Expand Down Expand Up @@ -537,7 +542,7 @@ app.post(
}

try {

const canceledStream = await cancelStream(parsedId.value);
res.json({ data: { ...canceledStream, progress: calculateProgress(canceledStream) } });
} catch (error: any) {
console.error("Failed to cancel stream:", error);
Expand Down Expand Up @@ -579,19 +584,18 @@ app.patch(
return;
}


try {
const updatedStream = updateStreamStartAt(parsedId.value, newStartAt);
res.json({ data: { ...updatedStream, progress: calculateProgress(updatedStream) } });
} catch (error: any) {
const normalizedError = normalizeUnknownApiError(
error,
"Failed to update stream start time.",
);
sendApiError(req, res, normalizedError.statusCode, normalizedError.message, {
code: normalizedError.code ?? "INTERNAL_ERROR",
});
}
const updatedStream = updateStreamStartAt(parsedId.value, parsedBody.data.startAt);
res.json({ data: { ...updatedStream, progress: calculateProgress(updatedStream) } });
} catch (error: any) {
const normalizedError = normalizeUnknownApiError(
error,
"Failed to update stream start time.",
);
sendApiError(req, res, normalizedError.statusCode, normalizedError.message, {
code: normalizedError.code ?? "INTERNAL_ERROR",
});
}
},
);

Expand Down Expand Up @@ -705,6 +709,21 @@ async function startServer() {
await initSoroban();
await syncStreams();

// Archive old streams on startup
await archiveOldStreams();

// Schedule archive job to run every 24 hours
setInterval(async () => {
try {
const archived = await archiveOldStreams();
if (archived > 0) {
console.log(`[scheduler] archived ${archived} stream(s)`);
}
} catch (err) {
console.error("[scheduler] archive job failed:", err);
}
}, 24 * 60 * 60 * 1000);

// Initialize and start event indexer
if (config.sorobanEnabled && config.contractId) {
initIndexer(config.rpcUrl, config.contractId, config.networkPassphrase);
Expand Down
19 changes: 18 additions & 1 deletion backend/src/services/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,24 @@ function migrate(): void {
start_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
canceled_at INTEGER,
completed_at INTEGER
completed_at INTEGER,
refunded_amount REAL,
archived_at INTEGER
);

CREATE TABLE IF NOT EXISTS stream_archive (
id TEXT PRIMARY KEY,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
asset_code TEXT NOT NULL,
total_amount REAL NOT NULL,
duration_seconds INTEGER NOT NULL,
start_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
canceled_at INTEGER,
completed_at INTEGER,
refunded_amount REAL,
archived_at INTEGER NOT NULL
);

CREATE TABLE IF NOT EXISTS stream_events (
Expand Down
Loading
Loading