diff --git a/backend/src/index.test.ts b/backend/src/index.test.ts index 2211454..54cbe94 100644 --- a/backend/src/index.test.ts +++ b/backend/src/index.test.ts @@ -568,6 +568,3 @@ describe("GET /api/events", () => { }); }); - - }); -}); diff --git a/backend/src/index.ts b/backend/src/index.ts index 55691e6..d5fc066 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -18,7 +18,7 @@ import { getStreamHistory, } from "./services/eventHistory"; import { fetchOpenIssues } from "./services/openIssues"; -import { initIndexer, startIndexer } from "./services/indexer"; +import { initIndexer, startIndexer, getCircuitBreakerStatus } from "./services/indexer"; import { startReconciliationJob } from "./services/reconciliationJob"; import { startWebhookWorker } from "./services/webhookWorker"; import { getDeadLetters, countDeadLetters } from "./services/webhook"; @@ -35,12 +35,7 @@ import { syncStreams, updateStreamStartAt, } from "./services/streamStore"; -import { - getGlobalEvents, - countAllEvents, - getStreamHistory, - getAllEvents, -} from "./services/eventHistory"; + import { authMiddleware, generateChallenge, @@ -138,6 +133,12 @@ app.get("/api/health", (_req: Request, res: Response) => { }); }); +app.get("/api/metrics", (_req: Request, res: Response) => { + res.json({ + indexer_circuit_breaker: getCircuitBreakerStatus(), + }); +}); + app.get("/api/assets", (_req: Request, res: Response) => { res.json({ data: ALLOWED_ASSETS, @@ -313,7 +314,7 @@ app.get("/api/recipients/:accountId/streams", (req: Request, res: Response) => { const parsedQuery = listStreamsQuerySchema.safeParse(req.query); if (!parsedQuery.success) { - sendValidationError(res, parsedQuery.error.issues); + sendValidationError(req, res, parsedQuery.error.issues); return; } const query = parsedQuery.data; @@ -480,6 +481,7 @@ app.post("/api/streams", authMiddleware, async (req: Request, res: Response) => return; } + try { const stream = await createStream(parsedBody.data); res.status(201).json({ @@ -523,7 +525,12 @@ app.post( } try { - + const canceledStream = await cancelStream(parsedId.value); + if (!canceledStream) { + res.status(404).json({ error: "Stream not found or could not be canceled.", requestId: req.requestId }); + return; + } + res.json({ data: { ...canceledStream, progress: calculateProgress(canceledStream) } }); } catch (error: any) { console.error("Failed to cancel stream:", error); const normalizedError = normalizeUnknownApiError(error, "Failed to cancel stream."); @@ -568,26 +575,10 @@ app.patch( return; } - const stream = getStream(parsedId.value); - if (!stream) { - res.status(404).json({ error: "Stream not found.", requestId: req.requestId }); - return; - } - - const user = (req as any).user; - if (stream.sender !== user.accountId) { - res.status(403).json({ - error: "Only the sender can update the start time.", - requestId: req.requestId, - }); - return; - } - const now = Math.floor(Date.now() / 1000); const newStartAt = parsedBody.data.startAt; - return; - } + try { const updatedStream = updateStreamStartAt(parsedId.value, newStartAt); diff --git a/backend/src/services/auth.ts b/backend/src/services/auth.ts index b669c27..4f3a832 100644 --- a/backend/src/services/auth.ts +++ b/backend/src/services/auth.ts @@ -81,7 +81,7 @@ export function verifyChallengeAndIssueToken( ); } - const token = jwt.sign({ accountId: clientAccountID }, JWT_SECRET, { + const token = jwt.sign({ accountId: clientAccountID }, getJwtSecret(), { expiresIn: "24h", }); return token; @@ -104,7 +104,6 @@ export function authMiddleware( if (!authHeader || !authHeader.startsWith("Bearer ")) { sendApiError(req, res, 401, "Missing or invalid authorization header.", { code: "UNAUTHORIZED", - requestId: (req as any).requestId, }); return; } @@ -118,7 +117,6 @@ export function authMiddleware( } catch (error) { sendApiError(req, res, 401, "Invalid or expired authorization token.", { code: "UNAUTHORIZED", - requestId: (req as any).requestId, }); } } diff --git a/backend/src/services/indexer.ts b/backend/src/services/indexer.ts index 47463dd..980c00b 100644 --- a/backend/src/services/indexer.ts +++ b/backend/src/services/indexer.ts @@ -14,6 +14,69 @@ let networkPassphrase: string = Networks.TESTNET; let lastProcessedLedger = 0; let indexerInterval: NodeJS.Timeout | null = null; +export enum CircuitState { + CLOSED = "CLOSED", + OPEN = "OPEN", + HALF_OPEN = "HALF_OPEN", +} + +class CircuitBreaker { + private state: CircuitState = CircuitState.CLOSED; + private failureCount: number = 0; + private lastFailureTime: number = 0; + private readonly failureThreshold: number = 5; + private readonly timeoutMs: number; + + constructor(timeoutMs: number = 60000) { + this.timeoutMs = timeoutMs; + } + + public getState(): CircuitState { + if (this.state === CircuitState.OPEN) { + const now = Date.now(); + if (now - this.lastFailureTime >= this.timeoutMs) { + this.setState(CircuitState.HALF_OPEN); + } + } + return this.state; + } + + public onSuccess(): void { + if (this.state !== CircuitState.CLOSED) { + console.log(`[Circuit Breaker] Probe successful. Resetting to CLOSED state.`); + this.setState(CircuitState.CLOSED); + } + this.failureCount = 0; + } + + public onFailure(): void { + this.failureCount++; + this.lastFailureTime = Date.now(); + + if (this.state === CircuitState.CLOSED && this.failureCount >= this.failureThreshold) { + console.log(`[Circuit Breaker] ${this.failureThreshold} consecutive failures reached. Opening circuit.`); + this.setState(CircuitState.OPEN); + } else if (this.state === CircuitState.HALF_OPEN) { + console.log(`[Circuit Breaker] Probe failed in HALF_OPEN state. Re-opening circuit.`); + this.setState(CircuitState.OPEN); + } + } + + private setState(newState: CircuitState): void { + if (this.state !== newState) { + console.log(`[Circuit Breaker] State Transition: ${this.state} -> ${newState}`); + this.state = newState; + } + } +} + +const CIRCUIT_BREAKER_TIMEOUT_MS = Number(process.env.CIRCUIT_BREAKER_TIMEOUT_MS ?? 60000); +const circuitBreaker = new CircuitBreaker(CIRCUIT_BREAKER_TIMEOUT_MS); + +export function getCircuitBreakerStatus(): CircuitState { + return circuitBreaker.getState(); +} + export function initIndexer( rpcUrl: string, contractIdParam: string, @@ -57,6 +120,11 @@ async function indexEvents(): Promise { return; } + const state = circuitBreaker.getState(); + if (state === CircuitState.OPEN) { + return; + } + try { const db = getDb(); const latestLedger = await rpcServer.getLatestLedger(); @@ -77,6 +145,7 @@ async function indexEvents(): Promise { } if (currentLedger <= lastProcessedLedger) { + circuitBreaker.onSuccess(); return; } @@ -103,7 +172,10 @@ async function indexEvents(): Promise { "INSERT INTO indexer_cursor (id, last_ledger) VALUES (?, ?) ON CONFLICT(id) DO UPDATE SET last_ledger = excluded.last_ledger", ).run(contractId, lastProcessedLedger); })(); + + circuitBreaker.onSuccess(); } catch (err) { + circuitBreaker.onFailure(); console.error("Failed to index events:", err); } } diff --git a/backend/src/services/streamStore.ts b/backend/src/services/streamStore.ts index 2fa899e..41985d5 100644 --- a/backend/src/services/streamStore.ts +++ b/backend/src/services/streamStore.ts @@ -8,6 +8,7 @@ import { TimeoutInfinite, TransactionBuilder, Networks, + Account, } from "@stellar/stellar-sdk"; import { initDb, getDb } from "./db"; import { recordEventWithDb } from "./eventHistory"; @@ -142,7 +143,7 @@ function round(value: number): number { function getSorobanContext(): | { contract: Contract; - sourceAccountPromise: Promise; + sourceAccountPromise: Promise; } | undefined { const contractId = process.env.CONTRACT_ID; @@ -162,7 +163,7 @@ function getSorobanContext(): async function simulateContractCall( contract: Contract, - sourceAccount: rpc.Api.GetAccountResponse, + sourceAccount: Account, method: string, ...args: any[] ): Promise { @@ -183,7 +184,7 @@ async function simulateContractCall( async function fetchNextOnChainStreamId( contract: Contract, - sourceAccount: rpc.Api.GetAccountResponse, + sourceAccount: Account, ): Promise { const simRes = await simulateContractCall( contract, @@ -201,7 +202,7 @@ async function fetchNextOnChainStreamId( async function fetchOnChainStreamRecord( contract: Contract, - sourceAccount: rpc.Api.GetAccountResponse, + sourceAccount: Account, id: number, ): Promise { const simRes = await simulateContractCall( diff --git a/backend/src/swagger.ts b/backend/src/swagger.ts index 5ad8d86..3e85e0b 100644 --- a/backend/src/swagger.ts +++ b/backend/src/swagger.ts @@ -673,7 +673,8 @@ export const swaggerDocument = { }, }, }, - + "400": { + description: "Invalid request.", content: { "application/json": { schema: {