feat: add stream pause and resume functionality with state tracking and event handling#399
Merged
Merged
Conversation
…nd event handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #357 I have successfully added the PAUSED and RESUMED event handlers to the Soroban event worker and updated the database schema to reflect the stream's paused state.
Here is a summary of the changes:
Updated the Prisma Schema
Added isPaused (Boolean), pausedAt (Int?), and totalPausedDuration (Int) fields to the Stream model in backend/prisma/schema.prisma.
Updated the StreamEvent event type comment to include "PAUSED" and "RESUMED".
Added stream_paused Handler
Decodes the stream_id, sender, and paused_at from the Soroban event.
Updates the corresponding Stream in the database to reflect isPaused: true and sets pausedAt.
Creates a new StreamEvent of type PAUSED.
Broadcasts the stream.paused Server-Sent Event (SSE).
Added stream_resumed Handler
Decodes the stream_id, sender, and new_end_time.
Calculates the paused duration using the difference between the current timestamp and pausedAt.
Updates the stream: sets isPaused: false, resets pausedAt to null, and increments totalPausedDuration.
Creates a StreamEvent of type RESUMED.
Broadcasts the stream.resumed Server-Sent Event (SSE).
Closes #358 Stream Pause Handling Implementation
Changes Made
Prisma Schema and Migrations
Re-generated the Prisma Client and added a manual migration (20260428161500_add_stream_pause_fields) containing isPaused, pausedAt, and totalPausedDuration since an automated migration was hindered by a missing active database connection.
Added @@index([isPaused]) to the Stream model for faster dashboard queries.
Claimable Calculation Logic
Modified the ClaimableStreamState interface in claimable.service.ts to accept startTime, isPaused, pausedAt, and totalPausedDuration.
Updated getClaimableAmount to calculate elapsed time starting from startTime instead of lastUpdateTime. This fixes a critical conceptual bug ensuring that pausing and partial withdrawals don't break the total claimable calculations.
Deducted totalPausedDuration from the elapsed time calculation.
Deducted the duration of the current active pause (from pausedAt to the current calculation timestamp) if isPaused is true.
Controller Updates
Updated getStreamClaimableAmount and getUserStreamSummary in stream.controller.ts to explicitly select the new startTime, isPaused, pausedAt, and totalPausedDuration fields from the database.
Standard Prisma operations for GET /v1/streams and GET /v1/streams/:streamId inherently include all fields, so isPaused and pausedAt are now natively exposed in responses.
Test and Build Fixes
Added missing attributes to mock Stream states in claimable.service.test.ts and stream.test.ts so that Vitest and TypeScript tests continue to build cleanly.
Fixed a pre-existing type mismatch in soroban-event-worker.ts's sseService.broadcast method for the admin channel.
Validation
Successfully ran npm run prisma:generate reflecting the new schema changes.
Cleanly compiled the workspace with npx tsc --noEmit and resolved previous build errors.
Closes #362 I have successfully implemented the SSE heartbeat and idle connection cleanup logic as requested.
Here are the details of the changes:
Heartbeat & Idle Cleanup Setup: Added a setInterval in the SSEService constructor that runs every 30 seconds. It loops through all connected clients, sending the : keep-alive\n\n SSE comment as a heartbeat to prevent load balancers from dropping the connection.
Activity Tracking: Added a lastActivityAt field to the SSEClient interface. It's initialized to Date.now() upon connection and updated every time a real event (broadcast or sendReconnectToAll) is pushed to the client.
5-Minute Timeout: Within the heartbeat interval, the service checks if a connection has had no real events pushed to it for over 5 minutes (300,000 ms). If so, it gracefully closes the connection using client.res.end(). This ensures no stale handles accumulate and forces load balancers to reset.
Lifecycle Logging: Added robust logging across the service using standard logger calls (logger.info / logger.debug):
[SSEService] Connection opened: ...
[SSEService] Heartbeat sent: ... (logged at debug to prevent spamming while still tracking)
[SSEService] Connection timed out: ...
[SSEService] Connection closed: ...
AutoReconnect Verification: I reviewed frontend/src/hooks/useStreamEvents.ts. The EventSource.onerror handler correctly flags connected to false and automatically triggers the reconnect logic (setTimeout using an exponential backoff sequence capped at maxRetryDelay) when the connection is closed server-side. This guarantees the client smoothly restores its connection whenever the backend performs its 5-minute cleanup.
Closes #363 I have implemented the requested sorting, pagination, and status filtering features for the /v1/streams endpoint.
Changes Made
Schema Update
Added the endTime Int? column to the Stream model in schema.prisma so it can be queried and sorted by Prisma.
Note: I generated the updated Prisma client types locally, but you will need to run the migration script locally using npx prisma migrate dev as your PostgreSQL database was not reachable from the environment.
Controller Updates (listStreams in stream.controller.ts)
Added support for ?status=active|cancelled|completed|paused mapping to boolean fields and StreamEvent event checks for COMPLETED and CANCELLED.
Added endTime to the list of validSortFields.
Enabled filtering by token which maps directly to tokenAddress in the schema.
Refined offset & limit pagination logic to ensure hasMore behaves correctly.
Event Worker Adjustments (SorobanEventWorker)
Augmented handleStreamCreated and handleStreamToppedUp to dynamically calculate endTime from depositedAmount and ratePerSecond.
Updated handleStreamResumed to accept the updated new_end_time logic from on-chain.
TypeScript Fixes
Fixed several mocked stream objects in your test suite that failed TypeScript strict typechecking after endTime was introduced into the core data model. All types now successfully compile (npx tsc --noEmit exits with 0).