Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@prisma/adapter-pg": "^7.4.1",
"@stellar/stellar-sdk": "^14.5.0",
"cors": "^2.8.6",
"dotenv": "^17.3.1",
"dotenv": "^17.4.2",
"express": "^5.2.1",
"express-rate-limit": "^8.2.1",
"ioredis": "^5.3.2",
Expand Down Expand Up @@ -53,4 +53,4 @@
"typescript": "^5.9.3",
"vitest": "^2.1.8"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- AlterTable
ALTER TABLE "Stream" ADD COLUMN "isPaused" BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE "Stream" ADD COLUMN "pausedAt" INTEGER;
ALTER TABLE "Stream" ADD COLUMN "totalPausedDuration" INTEGER NOT NULL DEFAULT 0;

-- CreateIndex
CREATE INDEX "Stream_isPaused_idx" ON "Stream"("isPaused");
7 changes: 6 additions & 1 deletion backend/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ model Stream {
withdrawnAmount String // Total withdrawn amount (i128)
startTime Int // Unix timestamp when stream started
lastUpdateTime Int // Unix timestamp of last update
endTime Int? // Unix timestamp when stream ends
isActive Boolean @default(true)
isPaused Boolean @default(false)
pausedAt Int? // Unix timestamp when paused
totalPausedDuration Int @default(0) // Accumulated paused duration in seconds
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

Expand All @@ -49,6 +53,7 @@ model Stream {
@@index([recipient])
@@index([streamId])
@@index([isActive])
@@index([isPaused])
}

// IndexerState model - tracks the last processed ledger/cursor for the Soroban event worker
Expand All @@ -63,7 +68,7 @@ model IndexerState {
model StreamEvent {
id String @id @default(uuid())
streamId Int // Reference to on-chain stream ID
eventType String // EventType: "CREATED", "TOPPED_UP", "WITHDRAWN", "CANCELLED", "COMPLETED"
eventType String // EventType: "CREATED", "TOPPED_UP", "WITHDRAWN", "CANCELLED", "COMPLETED", "PAUSED", "RESUMED"
amount String? // Amount involved in the event (for top-ups, withdrawals)
transactionHash String // Stellar transaction hash
ledgerSequence Int // Ledger sequence number
Expand Down
21 changes: 15 additions & 6 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Request, Response } from 'express';

Check failure on line 1 in backend/src/controllers/stream.controller.ts

View workflow job for this annotation

GitHub Actions / Backend npm test

tests/integration/streams.test.ts

Error: [vitest] There was an error when mocking a module. If you are using "vi.mock" factory, make sure there are no top level variables inside, since this call is hoisted to top of the file. Read more: https://vitest.dev/api/vi.html#vi-mock ❯ src/controllers/stream.controller.ts:1:1 Caused by: Caused by: ReferenceError: Cannot access 'mockPrisma' before initialization ❯ tests/integration/streams.test.ts:12:12 ❯ src/controllers/stream.controller.ts:1:1
import { prisma } from '../lib/prisma.js';
import logger from '../logger.js';
import { claimableAmountService } from '../services/claimable.service.js';
Expand Down Expand Up @@ -67,6 +67,7 @@
depositedAmount,
withdrawnAmount: "0",
startTime: parseInt(startTime),
endTime: parseInt(startTime) + Number(BigInt(depositedAmount) / BigInt(ratePerSecond)),
lastUpdateTime: parseInt(startTime)
}
});
Expand Down Expand Up @@ -113,18 +114,18 @@
switch (status) {
case 'active':
where.isActive = true;
where.isPaused = false;
break;
case 'cancelled':
where.isActive = false;
// Additional check for cancelled events could be added here
where.events = { some: { eventType: 'CANCELLED' } };
break;
case 'completed':
where.isActive = false;
// Additional check for completed events could be added here
where.events = { some: { eventType: 'COMPLETED' } };
break;
case 'paused':
where.isActive = false;
// Additional check for paused events could be added here
where.isPaused = true;
break;
}
}
Expand All @@ -137,9 +138,9 @@
const parsedOffset = typeof offset === 'string' ? (Number.parseInt(offset, 10) || 0) : 0;

// Validate sort field
const validSortFields = ['createdAt', 'startTime', 'lastUpdateTime', 'depositedAmount'];
const validSortFields = ['createdAt', 'startTime', 'lastUpdateTime', 'depositedAmount', 'endTime'];
const sortField = validSortFields.includes(typeof sort === 'string' ? sort : 'createdAt')
? (sort as 'createdAt' | 'startTime' | 'lastUpdateTime' | 'depositedAmount')
? (sort as 'createdAt' | 'startTime' | 'lastUpdateTime' | 'depositedAmount' | 'endTime')
: 'createdAt';

// Validate order
Expand Down Expand Up @@ -320,8 +321,12 @@
ratePerSecond: true,
depositedAmount: true,
withdrawnAmount: true,
startTime: true,
lastUpdateTime: true,
isActive: true,
isPaused: true,
pausedAt: true,
totalPausedDuration: true,
updatedAt: true,
},
});
Expand Down Expand Up @@ -400,8 +405,12 @@
ratePerSecond: true,
depositedAmount: true,
withdrawnAmount: true,
startTime: true,
lastUpdateTime: true,
isActive: true,
isPaused: true,
pausedAt: true,
totalPausedDuration: true,
updatedAt: true,
},
}),
Expand Down
23 changes: 21 additions & 2 deletions backend/src/services/claimable.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ export interface ClaimableStreamState {
ratePerSecond: string;
depositedAmount: string;
withdrawnAmount: string;
startTime: number;
lastUpdateTime: number;
isActive: boolean;
isPaused: boolean;
pausedAt: number | null;
totalPausedDuration: number;
updatedAt?: Date;
}

Expand Down Expand Up @@ -60,8 +64,12 @@ function getStateFingerprint(stream: ClaimableStreamState): string {
stream.ratePerSecond,
stream.depositedAmount,
stream.withdrawnAmount,
stream.startTime,
stream.lastUpdateTime,
stream.isActive ? '1' : '0',
stream.isPaused ? '1' : '0',
stream.pausedAt ?? 'null',
stream.totalPausedDuration,
].join(':');
}

Expand Down Expand Up @@ -106,9 +114,20 @@ export class ClaimableAmountService {
};
}

const streamLastUpdate = BigInt(Math.max(0, stream.lastUpdateTime));
const streamStart = BigInt(Math.max(0, stream.startTime));
const nowTs = BigInt(Math.max(0, calculatedAt));
const elapsed = nowTs > streamLastUpdate ? nowTs - streamLastUpdate : 0n;
let elapsed = nowTs > streamStart ? nowTs - streamStart : 0n;

const pastPausedDuration = BigInt(Math.max(0, stream.totalPausedDuration));
elapsed = elapsed > pastPausedDuration ? elapsed - pastPausedDuration : 0n;

if (stream.isPaused && stream.pausedAt !== null) {
const currentPauseStart = BigInt(Math.max(0, stream.pausedAt));
if (nowTs > currentPauseStart) {
const currentPauseDuration = nowTs - currentPauseStart;
elapsed = elapsed > currentPauseDuration ? elapsed - currentPauseDuration : 0n;
}
}

const ratePerSecond = parseI128(stream.ratePerSecond, 'ratePerSecond');
const depositedAmount = parseI128(stream.depositedAmount, 'depositedAmount');
Expand Down
48 changes: 45 additions & 3 deletions backend/src/services/sse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ interface SSEClient {
res: Response;
subscriptions: Set<string>;
ip: string;
lastActivityAt: number;
}

const MAX_CONNECTIONS_PER_IP = 5;
Expand All @@ -24,6 +25,37 @@ class SSEService {
private readonly ipConnectionCounts: Map<string, number> = new Map();
private shuttingDown = false;
private perIpPeakConnections = 0;
private heartbeatInterval?: NodeJS.Timeout;

constructor() {
this.startHeartbeat();
}

private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
const now = Date.now();
const timeoutMs = 5 * 60 * 1000;

for (const [clientId, client] of this.clients.entries()) {
if (now - client.lastActivityAt > timeoutMs) {
logger.info(`[SSEService] Connection timed out: ${clientId}, ip: ${client.ip}`);
try {
client.res.end();
} catch (err) {
// ignore
}
continue;
}

try {
client.res.write(': keep-alive\n\n');
logger.debug(`[SSEService] Heartbeat sent: ${clientId}`);
} catch (err) {
// ignore
}
}
}, 30 * 1000);
}

private readonly maxConnections: number = (() => {
const parsed = Number.parseInt(process.env.MAX_SSE_CONNECTIONS ?? '10000', 10);
Expand Down Expand Up @@ -88,11 +120,12 @@ class SSEService {
res,
subscriptions: new Set(subscriptions),
ip,
lastActivityAt: Date.now(),
};

this.clients.set(clientId, client);
logger.info(
`SSE client connected: ${clientId}, ip: ${ip}, subscriptions: ${subscriptions.join(', ')}`
`[SSEService] Connection opened: ${clientId}, ip: ${ip}, subscriptions: ${subscriptions.join(', ')}`
);

res.on('close', () => {
Expand All @@ -113,15 +146,19 @@ class SSEService {
this.ipConnectionCounts.set(client.ip, currentIpCount - 1);
}

logger.info(`SSE client disconnected: ${clientId}, ip: ${client.ip}`);
logger.info(`[SSEService] Connection closed: ${clientId}, ip: ${client.ip}`);
}

sendReconnectToAll(): void {
this.shuttingDown = true;
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
const message = 'event: reconnect\ndata: {}\n\n';
for (const client of this.clients.values()) {
try {
client.res.write(message);
client.lastActivityAt = Date.now();
} catch {
// ignore write errors during shutdown
}
Expand All @@ -133,7 +170,12 @@ class SSEService {
const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`;
for (const client of this.clients.values()) {
if (!filter || filter(client)) {
client.res.write(message);
try {
client.res.write(message);
client.lastActivityAt = Date.now();
} catch (err) {
// ignore write errors
}
}
}
}
Expand Down
Loading
Loading