Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backend/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,3 @@ describe("GET /api/events", () => {
});
});


});
});
43 changes: 17 additions & 26 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
getStreamHistory,
} from "./services/eventHistory";
import { fetchOpenIssues } from "./services/openIssues";
import { initIndexer, startIndexer } from "./services/indexer";
import { initIndexer, startIndexer, getCircuitBreakerStatus } from "./services/indexer";
import { startReconciliationJob } from "./services/reconciliationJob";
import { startWebhookWorker } from "./services/webhookWorker";
import { getDeadLetters, countDeadLetters } from "./services/webhook";
Expand All @@ -35,12 +35,7 @@ import {
syncStreams,
updateStreamStartAt,
} from "./services/streamStore";
import {
getGlobalEvents,
countAllEvents,
getStreamHistory,
getAllEvents,
} from "./services/eventHistory";

import {
authMiddleware,
generateChallenge,
Expand Down Expand Up @@ -138,6 +133,12 @@ app.get("/api/health", (_req: Request, res: Response) => {
});
});

app.get("/api/metrics", (_req: Request, res: Response) => {
res.json({
indexer_circuit_breaker: getCircuitBreakerStatus(),
});
});

app.get("/api/assets", (_req: Request, res: Response) => {
res.json({
data: ALLOWED_ASSETS,
Expand Down Expand Up @@ -313,7 +314,7 @@ app.get("/api/recipients/:accountId/streams", (req: Request, res: Response) => {

const parsedQuery = listStreamsQuerySchema.safeParse(req.query);
if (!parsedQuery.success) {
sendValidationError(res, parsedQuery.error.issues);
sendValidationError(req, res, parsedQuery.error.issues);
return;
}
const query = parsedQuery.data;
Expand Down Expand Up @@ -480,6 +481,7 @@ app.post("/api/streams", authMiddleware, async (req: Request, res: Response) =>
return;
}


try {
const stream = await createStream(parsedBody.data);
res.status(201).json({
Expand Down Expand Up @@ -523,7 +525,12 @@ app.post(
}

try {

const canceledStream = await cancelStream(parsedId.value);
if (!canceledStream) {
res.status(404).json({ error: "Stream not found or could not be canceled.", requestId: req.requestId });
return;
}
res.json({ data: { ...canceledStream, progress: calculateProgress(canceledStream) } });
} catch (error: any) {
console.error("Failed to cancel stream:", error);
const normalizedError = normalizeUnknownApiError(error, "Failed to cancel stream.");
Expand Down Expand Up @@ -568,26 +575,10 @@ app.patch(
return;
}

const stream = getStream(parsedId.value);
if (!stream) {
res.status(404).json({ error: "Stream not found.", requestId: req.requestId });
return;
}

const user = (req as any).user;
if (stream.sender !== user.accountId) {
res.status(403).json({
error: "Only the sender can update the start time.",
requestId: req.requestId,
});
return;
}

const now = Math.floor(Date.now() / 1000);
const newStartAt = parsedBody.data.startAt;

return;
}


try {
const updatedStream = updateStreamStartAt(parsedId.value, newStartAt);
Expand Down
4 changes: 1 addition & 3 deletions backend/src/services/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export function verifyChallengeAndIssueToken(
);
}

const token = jwt.sign({ accountId: clientAccountID }, JWT_SECRET, {
const token = jwt.sign({ accountId: clientAccountID }, getJwtSecret(), {
expiresIn: "24h",
});
return token;
Expand All @@ -104,7 +104,6 @@ export function authMiddleware(
if (!authHeader || !authHeader.startsWith("Bearer ")) {
sendApiError(req, res, 401, "Missing or invalid authorization header.", {
code: "UNAUTHORIZED",
requestId: (req as any).requestId,
});
return;
}
Expand All @@ -118,7 +117,6 @@ export function authMiddleware(
} catch (error) {
sendApiError(req, res, 401, "Invalid or expired authorization token.", {
code: "UNAUTHORIZED",
requestId: (req as any).requestId,
});
}
}
72 changes: 72 additions & 0 deletions backend/src/services/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,69 @@ let networkPassphrase: string = Networks.TESTNET;
let lastProcessedLedger = 0;
let indexerInterval: NodeJS.Timeout | null = null;

export enum CircuitState {
CLOSED = "CLOSED",
OPEN = "OPEN",
HALF_OPEN = "HALF_OPEN",
}

class CircuitBreaker {
private state: CircuitState = CircuitState.CLOSED;
private failureCount: number = 0;
private lastFailureTime: number = 0;
private readonly failureThreshold: number = 5;
private readonly timeoutMs: number;

constructor(timeoutMs: number = 60000) {
this.timeoutMs = timeoutMs;
}

public getState(): CircuitState {
if (this.state === CircuitState.OPEN) {
const now = Date.now();
if (now - this.lastFailureTime >= this.timeoutMs) {
this.setState(CircuitState.HALF_OPEN);
}
}
return this.state;
}

public onSuccess(): void {
if (this.state !== CircuitState.CLOSED) {
console.log(`[Circuit Breaker] Probe successful. Resetting to CLOSED state.`);
this.setState(CircuitState.CLOSED);
}
this.failureCount = 0;
}

public onFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();

if (this.state === CircuitState.CLOSED && this.failureCount >= this.failureThreshold) {
console.log(`[Circuit Breaker] ${this.failureThreshold} consecutive failures reached. Opening circuit.`);
this.setState(CircuitState.OPEN);
} else if (this.state === CircuitState.HALF_OPEN) {
console.log(`[Circuit Breaker] Probe failed in HALF_OPEN state. Re-opening circuit.`);
this.setState(CircuitState.OPEN);
}
}

private setState(newState: CircuitState): void {
if (this.state !== newState) {
console.log(`[Circuit Breaker] State Transition: ${this.state} -> ${newState}`);
this.state = newState;
}
}
}

const CIRCUIT_BREAKER_TIMEOUT_MS = Number(process.env.CIRCUIT_BREAKER_TIMEOUT_MS ?? 60000);
const circuitBreaker = new CircuitBreaker(CIRCUIT_BREAKER_TIMEOUT_MS);

export function getCircuitBreakerStatus(): CircuitState {
return circuitBreaker.getState();
}

export function initIndexer(
rpcUrl: string,
contractIdParam: string,
Expand Down Expand Up @@ -57,6 +120,11 @@ async function indexEvents(): Promise<void> {
return;
}

const state = circuitBreaker.getState();
if (state === CircuitState.OPEN) {
return;
}

try {
const db = getDb();
const latestLedger = await rpcServer.getLatestLedger();
Expand All @@ -77,6 +145,7 @@ async function indexEvents(): Promise<void> {
}

if (currentLedger <= lastProcessedLedger) {
circuitBreaker.onSuccess();
return;
}

Expand All @@ -103,7 +172,10 @@ async function indexEvents(): Promise<void> {
"INSERT INTO indexer_cursor (id, last_ledger) VALUES (?, ?) ON CONFLICT(id) DO UPDATE SET last_ledger = excluded.last_ledger",
).run(contractId, lastProcessedLedger);
})();

circuitBreaker.onSuccess();
} catch (err) {
circuitBreaker.onFailure();
console.error("Failed to index events:", err);
}
}
Expand Down
9 changes: 5 additions & 4 deletions backend/src/services/streamStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
TimeoutInfinite,
TransactionBuilder,
Networks,
Account,
} from "@stellar/stellar-sdk";
import { initDb, getDb } from "./db";
import { recordEventWithDb } from "./eventHistory";
Expand Down Expand Up @@ -142,7 +143,7 @@ function round(value: number): number {
function getSorobanContext():
| {
contract: Contract;
sourceAccountPromise: Promise<rpc.Api.GetAccountResponse>;
sourceAccountPromise: Promise<Account>;
}
| undefined {
const contractId = process.env.CONTRACT_ID;
Expand All @@ -162,7 +163,7 @@ function getSorobanContext():

async function simulateContractCall(
contract: Contract,
sourceAccount: rpc.Api.GetAccountResponse,
sourceAccount: Account,
method: string,
...args: any[]
): Promise<rpc.Api.SimulateTransactionResponse> {
Expand All @@ -183,7 +184,7 @@ async function simulateContractCall(

async function fetchNextOnChainStreamId(
contract: Contract,
sourceAccount: rpc.Api.GetAccountResponse,
sourceAccount: Account,
): Promise<number | null> {
const simRes = await simulateContractCall(
contract,
Expand All @@ -201,7 +202,7 @@ async function fetchNextOnChainStreamId(

async function fetchOnChainStreamRecord(
contract: Contract,
sourceAccount: rpc.Api.GetAccountResponse,
sourceAccount: Account,
id: number,
): Promise<StreamRecord | null> {
const simRes = await simulateContractCall(
Expand Down
3 changes: 2 additions & 1 deletion backend/src/swagger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ export const swaggerDocument = {
},
},
},

"400": {
description: "Invalid request.",
content: {
"application/json": {
schema: {
Expand Down
Loading