diff --git a/backend/package.json b/backend/package.json index 6996aa2..6e898ef 100644 --- a/backend/package.json +++ b/backend/package.json @@ -25,7 +25,7 @@ "@prisma/adapter-pg": "^7.4.1", "@stellar/stellar-sdk": "^14.5.0", "cors": "^2.8.6", - "dotenv": "^17.3.1", + "dotenv": "^17.4.2", "express": "^5.2.1", "express-rate-limit": "^8.2.1", "ioredis": "^5.3.2", @@ -53,4 +53,4 @@ "typescript": "^5.9.3", "vitest": "^2.1.8" } -} \ No newline at end of file +} diff --git a/backend/prisma/migrations/20260428161500_add_stream_pause_fields/migration.sql b/backend/prisma/migrations/20260428161500_add_stream_pause_fields/migration.sql new file mode 100644 index 0000000..ca29c45 --- /dev/null +++ b/backend/prisma/migrations/20260428161500_add_stream_pause_fields/migration.sql @@ -0,0 +1,7 @@ +-- AlterTable +ALTER TABLE "Stream" ADD COLUMN "isPaused" BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE "Stream" ADD COLUMN "pausedAt" INTEGER; +ALTER TABLE "Stream" ADD COLUMN "totalPausedDuration" INTEGER NOT NULL DEFAULT 0; + +-- CreateIndex +CREATE INDEX "Stream_isPaused_idx" ON "Stream"("isPaused"); diff --git a/backend/prisma/schema.prisma b/backend/prisma/schema.prisma index a5ab55f..1ee2a33 100644 --- a/backend/prisma/schema.prisma +++ b/backend/prisma/schema.prisma @@ -36,7 +36,11 @@ model Stream { withdrawnAmount String // Total withdrawn amount (i128) startTime Int // Unix timestamp when stream started lastUpdateTime Int // Unix timestamp of last update + endTime Int? // Unix timestamp when stream ends isActive Boolean @default(true) + isPaused Boolean @default(false) + pausedAt Int? // Unix timestamp when paused + totalPausedDuration Int @default(0) // Accumulated paused duration in seconds createdAt DateTime @default(now()) updatedAt DateTime @updatedAt @@ -49,6 +53,7 @@ model Stream { @@index([recipient]) @@index([streamId]) @@index([isActive]) + @@index([isPaused]) } // IndexerState model - tracks the last processed ledger/cursor for the Soroban event worker @@ -63,7 +68,7 @@ model IndexerState { model StreamEvent { id String @id @default(uuid()) streamId Int // Reference to on-chain stream ID - eventType String // EventType: "CREATED", "TOPPED_UP", "WITHDRAWN", "CANCELLED", "COMPLETED" + eventType String // EventType: "CREATED", "TOPPED_UP", "WITHDRAWN", "CANCELLED", "COMPLETED", "PAUSED", "RESUMED" amount String? // Amount involved in the event (for top-ups, withdrawals) transactionHash String // Stellar transaction hash ledgerSequence Int // Ledger sequence number diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index cd2a86d..60ac930 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -67,6 +67,7 @@ export const createStream = async (req: Request, res: Response) => { depositedAmount, withdrawnAmount: "0", startTime: parseInt(startTime), + endTime: parseInt(startTime) + Number(BigInt(depositedAmount) / BigInt(ratePerSecond)), lastUpdateTime: parseInt(startTime) } }); @@ -113,18 +114,18 @@ export const listStreams = async (req: Request, res: Response) => { switch (status) { case 'active': where.isActive = true; + where.isPaused = false; break; case 'cancelled': where.isActive = false; - // Additional check for cancelled events could be added here + where.events = { some: { eventType: 'CANCELLED' } }; break; case 'completed': where.isActive = false; - // Additional check for completed events could be added here + where.events = { some: { eventType: 'COMPLETED' } }; break; case 'paused': - where.isActive = false; - // Additional check for paused events could be added here + where.isPaused = true; break; } } @@ -137,9 +138,9 @@ export const listStreams = async (req: Request, res: Response) => { const parsedOffset = typeof offset === 'string' ? (Number.parseInt(offset, 10) || 0) : 0; // Validate sort field - const validSortFields = ['createdAt', 'startTime', 'lastUpdateTime', 'depositedAmount']; + const validSortFields = ['createdAt', 'startTime', 'lastUpdateTime', 'depositedAmount', 'endTime']; const sortField = validSortFields.includes(typeof sort === 'string' ? sort : 'createdAt') - ? (sort as 'createdAt' | 'startTime' | 'lastUpdateTime' | 'depositedAmount') + ? (sort as 'createdAt' | 'startTime' | 'lastUpdateTime' | 'depositedAmount' | 'endTime') : 'createdAt'; // Validate order @@ -320,8 +321,12 @@ export const getStreamClaimableAmount = async (req: Request, res: Response) => { ratePerSecond: true, depositedAmount: true, withdrawnAmount: true, + startTime: true, lastUpdateTime: true, isActive: true, + isPaused: true, + pausedAt: true, + totalPausedDuration: true, updatedAt: true, }, }); @@ -400,8 +405,12 @@ export const getUserStreamSummary = async (req: Request, res: Response) => { ratePerSecond: true, depositedAmount: true, withdrawnAmount: true, + startTime: true, lastUpdateTime: true, isActive: true, + isPaused: true, + pausedAt: true, + totalPausedDuration: true, updatedAt: true, }, }), diff --git a/backend/src/services/claimable.service.ts b/backend/src/services/claimable.service.ts index 705ef67..7d7a48d 100644 --- a/backend/src/services/claimable.service.ts +++ b/backend/src/services/claimable.service.ts @@ -6,8 +6,12 @@ export interface ClaimableStreamState { ratePerSecond: string; depositedAmount: string; withdrawnAmount: string; + startTime: number; lastUpdateTime: number; isActive: boolean; + isPaused: boolean; + pausedAt: number | null; + totalPausedDuration: number; updatedAt?: Date; } @@ -60,8 +64,12 @@ function getStateFingerprint(stream: ClaimableStreamState): string { stream.ratePerSecond, stream.depositedAmount, stream.withdrawnAmount, + stream.startTime, stream.lastUpdateTime, stream.isActive ? '1' : '0', + stream.isPaused ? '1' : '0', + stream.pausedAt ?? 'null', + stream.totalPausedDuration, ].join(':'); } @@ -106,9 +114,20 @@ export class ClaimableAmountService { }; } - const streamLastUpdate = BigInt(Math.max(0, stream.lastUpdateTime)); + const streamStart = BigInt(Math.max(0, stream.startTime)); const nowTs = BigInt(Math.max(0, calculatedAt)); - const elapsed = nowTs > streamLastUpdate ? nowTs - streamLastUpdate : 0n; + let elapsed = nowTs > streamStart ? nowTs - streamStart : 0n; + + const pastPausedDuration = BigInt(Math.max(0, stream.totalPausedDuration)); + elapsed = elapsed > pastPausedDuration ? elapsed - pastPausedDuration : 0n; + + if (stream.isPaused && stream.pausedAt !== null) { + const currentPauseStart = BigInt(Math.max(0, stream.pausedAt)); + if (nowTs > currentPauseStart) { + const currentPauseDuration = nowTs - currentPauseStart; + elapsed = elapsed > currentPauseDuration ? elapsed - currentPauseDuration : 0n; + } + } const ratePerSecond = parseI128(stream.ratePerSecond, 'ratePerSecond'); const depositedAmount = parseI128(stream.depositedAmount, 'depositedAmount'); diff --git a/backend/src/services/sse.service.ts b/backend/src/services/sse.service.ts index d2feea7..f932da6 100644 --- a/backend/src/services/sse.service.ts +++ b/backend/src/services/sse.service.ts @@ -7,6 +7,7 @@ interface SSEClient { res: Response; subscriptions: Set; ip: string; + lastActivityAt: number; } const MAX_CONNECTIONS_PER_IP = 5; @@ -24,6 +25,37 @@ class SSEService { private readonly ipConnectionCounts: Map = new Map(); private shuttingDown = false; private perIpPeakConnections = 0; + private heartbeatInterval?: NodeJS.Timeout; + + constructor() { + this.startHeartbeat(); + } + + private startHeartbeat(): void { + this.heartbeatInterval = setInterval(() => { + const now = Date.now(); + const timeoutMs = 5 * 60 * 1000; + + for (const [clientId, client] of this.clients.entries()) { + if (now - client.lastActivityAt > timeoutMs) { + logger.info(`[SSEService] Connection timed out: ${clientId}, ip: ${client.ip}`); + try { + client.res.end(); + } catch (err) { + // ignore + } + continue; + } + + try { + client.res.write(': keep-alive\n\n'); + logger.debug(`[SSEService] Heartbeat sent: ${clientId}`); + } catch (err) { + // ignore + } + } + }, 30 * 1000); + } private readonly maxConnections: number = (() => { const parsed = Number.parseInt(process.env.MAX_SSE_CONNECTIONS ?? '10000', 10); @@ -88,11 +120,12 @@ class SSEService { res, subscriptions: new Set(subscriptions), ip, + lastActivityAt: Date.now(), }; this.clients.set(clientId, client); logger.info( - `SSE client connected: ${clientId}, ip: ${ip}, subscriptions: ${subscriptions.join(', ')}` + `[SSEService] Connection opened: ${clientId}, ip: ${ip}, subscriptions: ${subscriptions.join(', ')}` ); res.on('close', () => { @@ -113,15 +146,19 @@ class SSEService { this.ipConnectionCounts.set(client.ip, currentIpCount - 1); } - logger.info(`SSE client disconnected: ${clientId}, ip: ${client.ip}`); + logger.info(`[SSEService] Connection closed: ${clientId}, ip: ${client.ip}`); } sendReconnectToAll(): void { this.shuttingDown = true; + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + } const message = 'event: reconnect\ndata: {}\n\n'; for (const client of this.clients.values()) { try { client.res.write(message); + client.lastActivityAt = Date.now(); } catch { // ignore write errors during shutdown } @@ -133,7 +170,12 @@ class SSEService { const message = `event: ${event}\ndata: ${JSON.stringify(data)}\n\n`; for (const client of this.clients.values()) { if (!filter || filter(client)) { - client.res.write(message); + try { + client.res.write(message); + client.lastActivityAt = Date.now(); + } catch (err) { + // ignore write errors + } } } } diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index 96c38e0..7d0a5a8 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -263,6 +263,12 @@ export class SorobanEventWorker { case 'fee_collected': await this.handleFeeCollected(event, topic1); break; + case 'stream_paused': + await this.handleStreamPaused(event, topic1); + break; + case 'stream_resumed': + await this.handleStreamResumed(event, topic1); + break; default: // Unrecognised event — ignore silently. break; @@ -295,6 +301,10 @@ export class SorobanEventWorker { const ratePerSecond = decodeI128(body['rate_per_second']); const depositedAmount = decodeI128(body['deposited_amount']); const startTime = Number(decodeU64(body['start_time'])); + + // Compute expected end time (assuming no pauses yet) + const durationSeconds = Number(BigInt(depositedAmount) / BigInt(ratePerSecond)); + const endTime = startTime + durationSeconds; await prisma.$transaction(async (tx: any) => { await tx.user.upsert({ @@ -319,6 +329,7 @@ export class SorobanEventWorker { depositedAmount, withdrawnAmount: '0', startTime, + endTime, lastUpdateTime: startTime, isActive: true, }, @@ -327,6 +338,7 @@ export class SorobanEventWorker { ratePerSecond, depositedAmount, startTime, + endTime, lastUpdateTime: startTime, isActive: true, }, @@ -374,10 +386,19 @@ export class SorobanEventWorker { const timestamp = Math.floor(Date.now() / 1000); await prisma.$transaction(async (tx: any) => { + const stream = await tx.stream.findUniqueOrThrow({ + where: { streamId }, + select: { ratePerSecond: true, startTime: true, totalPausedDuration: true } + }); + + const durationSeconds = Number(BigInt(newDepositedAmount) / BigInt(stream.ratePerSecond)); + const newEndTime = stream.startTime + durationSeconds + stream.totalPausedDuration; + await tx.stream.update({ where: { streamId }, data: { depositedAmount: newDepositedAmount, + endTime: newEndTime, lastUpdateTime: timestamp, }, }); @@ -586,7 +607,7 @@ export class SorobanEventWorker { }); // Broadcast to admin channel for treasury reporting - sseService.broadcast('admin', 'stream.fee_collected', { + sseService.broadcast('stream.fee_collected', { streamId, treasury, feeAmount, @@ -594,6 +615,111 @@ export class SorobanEventWorker { transactionHash: event.txHash, ledger: event.ledger, timestamp, + }, (client) => client.subscriptions.has('admin') || client.subscriptions.has('*')); + } + + private async handleStreamPaused( + event: rpc.Api.EventResponse, + streamIdTopic: xdr.ScVal, + ): Promise { + const streamId = Number(decodeU64(streamIdTopic)); + const body = decodeMap(event.value); + + if (!body['sender'] || !body['paused_at']) { + throw new Error(`StreamPaused #${streamId}: missing body fields`); + } + + const sender = decodeAddress(body['sender']); + const pausedAt = Number(decodeU64(body['paused_at'])); + + await prisma.$transaction(async (tx: any) => { + await tx.stream.update({ + where: { streamId }, + data: { + isPaused: true, + pausedAt, + lastUpdateTime: pausedAt, + }, + }); + + await tx.streamEvent.create({ + data: { + streamId, + eventType: 'PAUSED', + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp: pausedAt, + metadata: JSON.stringify({ sender }), + }, + }); + }); + + sseService.broadcastToStream(String(streamId), 'stream.paused', { + streamId, + sender, + pausedAt, + transactionHash: event.txHash, + ledger: event.ledger, + timestamp: pausedAt, + }); + } + + private async handleStreamResumed( + event: rpc.Api.EventResponse, + streamIdTopic: xdr.ScVal, + ): Promise { + const streamId = Number(decodeU64(streamIdTopic)); + const body = decodeMap(event.value); + + if (!body['sender'] || !body['new_end_time']) { + throw new Error(`StreamResumed #${streamId}: missing body fields`); + } + + const sender = decodeAddress(body['sender']); + const newEndTime = Number(decodeU64(body['new_end_time'])); + const timestamp = Math.floor(Date.now() / 1000); + + await prisma.$transaction(async (tx: any) => { + const stream = await tx.stream.findUniqueOrThrow({ + where: { streamId }, + select: { pausedAt: true, totalPausedDuration: true }, + }); + + const pausedAt = stream.pausedAt || timestamp; + const pausedDuration = timestamp - pausedAt; + + await tx.stream.update({ + where: { streamId }, + data: { + isPaused: false, + pausedAt: null, + endTime: newEndTime, + totalPausedDuration: { + increment: pausedDuration, + }, + lastUpdateTime: timestamp, + }, + }); + + await tx.streamEvent.create({ + data: { + streamId, + eventType: 'RESUMED', + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ sender, newEndTime, pausedDuration }), + }, + }); + }); + + sseService.broadcastToStream(String(streamId), 'stream.resumed', { + streamId, + sender, + newEndTime, + transactionHash: event.txHash, + ledger: event.ledger, + timestamp, }); } } diff --git a/backend/tests/claimable.service.test.ts b/backend/tests/claimable.service.test.ts index 0e1594b..c5501cd 100644 --- a/backend/tests/claimable.service.test.ts +++ b/backend/tests/claimable.service.test.ts @@ -14,6 +14,10 @@ describe('ClaimableAmountService', () => { depositedAmount: '500', withdrawnAmount: '100', lastUpdateTime: 7, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, }); @@ -38,6 +42,10 @@ describe('ClaimableAmountService', () => { depositedAmount: '1000', withdrawnAmount: '900', lastUpdateTime: 0, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, }); @@ -57,6 +65,10 @@ describe('ClaimableAmountService', () => { depositedAmount: '1000', withdrawnAmount: '100', lastUpdateTime: 0, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: false, }); @@ -76,6 +88,10 @@ describe('ClaimableAmountService', () => { depositedAmount: '100', withdrawnAmount: '150', lastUpdateTime: 0, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, }); @@ -96,6 +112,10 @@ describe('ClaimableAmountService', () => { depositedAmount: '700', withdrawnAmount: '0', lastUpdateTime: 0, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, }; @@ -125,6 +145,10 @@ describe('ClaimableAmountService', () => { depositedAmount: i128Max, withdrawnAmount: '0', lastUpdateTime: 998, + startTime: 0, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, }); diff --git a/backend/tests/stream.test.ts b/backend/tests/stream.test.ts index 48b8a60..1c843b8 100644 --- a/backend/tests/stream.test.ts +++ b/backend/tests/stream.test.ts @@ -43,7 +43,11 @@ describe('POST /v1/streams', () => { withdrawnAmount: '0', startTime: 1700000000, lastUpdateTime: 1700000000, + isPaused: false, + pausedAt: null, + totalPausedDuration: 0, isActive: true, + endTime: null, createdAt: new Date(), updatedAt: new Date(), }; @@ -157,7 +161,11 @@ describe('GET /v1/users/:address/summary', () => { withdrawnAmount: '30', startTime: 1000, lastUpdateTime: 2000, - isActive: true + isPaused: false, + endTime: null, + pausedAt: null, + totalPausedDuration: 0, + isActive: true }, { id: '2', @@ -172,7 +180,11 @@ describe('GET /v1/users/:address/summary', () => { withdrawnAmount: '20', startTime: 1000, lastUpdateTime: 2000, - isActive: false + isPaused: false, + endTime: null, + pausedAt: null, + totalPausedDuration: 0, + isActive: false }, ]) .mockResolvedValueOnce([ @@ -189,7 +201,11 @@ describe('GET /v1/users/:address/summary', () => { withdrawnAmount: '100', startTime: 1000, lastUpdateTime: 0, - isActive: true, + isPaused: false, + endTime: null, + pausedAt: null, + totalPausedDuration: 0, + isActive: true, }, { id: '4', @@ -204,7 +220,11 @@ describe('GET /v1/users/:address/summary', () => { withdrawnAmount: '0', startTime: 1000, lastUpdateTime: 0, - isActive: false, + isPaused: false, + endTime: null, + pausedAt: null, + totalPausedDuration: 0, + isActive: false, }, ]); @@ -238,7 +258,11 @@ describe('GET /v1/users/:address/summary', () => { withdrawnAmount: '1', startTime: 1000, lastUpdateTime: 2000, - isActive: true + isPaused: false, + endTime: null, + pausedAt: null, + totalPausedDuration: 0, + isActive: true }]) .mockResolvedValueOnce([]); diff --git a/package-lock.json b/package-lock.json index e331c75..9a6172d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -32,7 +32,7 @@ "@prisma/adapter-pg": "^7.4.1", "@stellar/stellar-sdk": "^14.5.0", "cors": "^2.8.6", - "dotenv": "^17.3.1", + "dotenv": "^17.4.2", "express": "^5.2.1", "express-rate-limit": "^8.2.1", "ioredis": "^5.3.2", @@ -906,7 +906,6 @@ "version": "7.29.0", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1350,8 +1349,7 @@ "node_modules/@electric-sql/pglite": { "version": "0.3.15", "dev": true, - "license": "Apache-2.0", - "peer": true + "license": "Apache-2.0" }, "node_modules/@electric-sql/pglite-socket": { "version": "0.0.20", @@ -2891,7 +2889,6 @@ "version": "20.19.33", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -2925,7 +2922,6 @@ "version": "19.2.14", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.2.2" } @@ -3049,7 +3045,6 @@ "version": "8.56.1", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.56.1", "@typescript-eslint/types": "8.56.1", @@ -3322,7 +3317,6 @@ "version": "8.16.0", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -3834,7 +3828,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -4377,8 +4370,7 @@ }, "node_modules/csstype": { "version": "3.2.3", - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/damerau-levenshtein": { "version": "1.0.8", @@ -4620,7 +4612,9 @@ "peer": true }, "node_modules/dotenv": { - "version": "17.3.1", + "version": "17.4.2", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.4.2.tgz", + "integrity": "sha512-nI4U3TottKAcAD9LLud4Cb7b2QztQMUEfHbvhTH09bqXTxnSie8WnjPALV/WMCrJZ6UV/qHJ6L03OqO3LcdYZw==", "license": "BSD-2-Clause", "engines": { "node": ">=12" @@ -5322,7 +5316,6 @@ "version": "9.39.3", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -5492,7 +5485,6 @@ "version": "2.32.0", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@rtsao/scc": "^1.1.0", "array-includes": "^3.1.9", @@ -5745,7 +5737,6 @@ "node_modules/express": { "version": "5.2.1", "license": "MIT", - "peer": true, "dependencies": { "accepts": "^2.0.0", "body-parser": "^2.2.1", @@ -6466,7 +6457,6 @@ "version": "4.11.4", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=16.9.0" } @@ -8309,7 +8299,6 @@ "node_modules/pg": { "version": "8.18.0", "license": "MIT", - "peer": true, "dependencies": { "pg-connection-string": "^2.11.0", "pg-pool": "^3.11.0", @@ -8562,7 +8551,6 @@ "dev": true, "hasInstallScript": true, "license": "Apache-2.0", - "peer": true, "dependencies": { "@prisma/config": "7.4.1", "@prisma/dev": "0.20.0", @@ -8729,7 +8717,6 @@ "node_modules/react": { "version": "19.2.4", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -8737,7 +8724,6 @@ "node_modules/react-dom": { "version": "19.2.4", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.27.0" }, @@ -9990,7 +9976,6 @@ "version": "4.0.3", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -10310,7 +10295,6 @@ "version": "5.9.3", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver"