From c62fd6158fe61a4bf045f1c1b045d34346858128 Mon Sep 17 00:00:00 2001 From: xeladev4 Date: Tue, 28 Apr 2026 10:55:15 +0100 Subject: [PATCH 1/3] feat(backend): add SSE limits and user summary stats --- backend/src/controllers/sse.controller.ts | 25 +++- backend/src/controllers/stream.controller.ts | 115 +++++++++++++++++++ backend/src/routes/adminRoutes.ts | 7 +- backend/src/routes/v1/events.routes.ts | 12 ++ backend/src/routes/v1/index.ts | 6 +- backend/src/routes/v1/user.routes.ts | 45 ++++++++ backend/src/services/sse.service.ts | 83 ++++++++++++- backend/tests/sse.service.test.ts | 66 +++++++++++ backend/tests/stream.test.ts | 83 +++++++++++++ 9 files changed, 434 insertions(+), 8 deletions(-) create mode 100644 backend/tests/sse.service.test.ts diff --git a/backend/src/controllers/sse.controller.ts b/backend/src/controllers/sse.controller.ts index 2cf4ca1..6f8eb6c 100644 --- a/backend/src/controllers/sse.controller.ts +++ b/backend/src/controllers/sse.controller.ts @@ -9,12 +9,35 @@ const subscribeSchema = z.object({ all: z.boolean().optional().default(false), }); +function getClientIp(req: Request): string { + const forwarded = req.headers['x-forwarded-for']; + if (typeof forwarded === 'string' && forwarded.trim().length > 0) { + return forwarded.split(',')[0]?.trim() || 'unknown'; + } + + if (Array.isArray(forwarded) && forwarded.length > 0) { + return forwarded[0] ?? 'unknown'; + } + + return req.ip || req.socket.remoteAddress || 'unknown'; +} export const subscribe = async (req: Request, res: Response) => { if (sseService.isShuttingDown()) { return res.status(503).json({ message: 'Server is shutting down, please reconnect shortly.' }); } try { + const sourceIp = getClientIp(req); + const capacity = sseService.checkCapacity(sourceIp); + if (!capacity.allowed) { + if (capacity.retryAfterSeconds) { + res.setHeader('Retry-After', String(capacity.retryAfterSeconds)); + } + return res.status(capacity.status ?? 503).json({ + message: capacity.message ?? 'SSE connection rejected', + }); + } + const { publicKey } = (req as AuthenticatedRequest).user; const { streams, all } = subscribeSchema.parse(req.query); @@ -50,7 +73,7 @@ export const subscribe = async (req: Request, res: Response) => { res.write(`data: ${JSON.stringify({ type: 'connected', clientId })}\n\n`); - sseService.addClient(clientId, res, subscriptions); + sseService.addClient(clientId, res, subscriptions, sourceIp); } catch (error: any) { if (error.name === 'ZodError') { return res.status(400).json({ diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index 6ea87f0..b10e8b1 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -4,6 +4,44 @@ import logger from '../logger.js'; import { claimableAmountService } from '../services/claimable.service.js'; import { getStreamFromChain, getClaimableFromChain, isStale } from '../services/sorobanService.js'; +interface UserStreamSummary { + address: string; + totalStreamsCreated: number; + totalStreamedOut: string; + totalStreamedIn: string; + currentClaimable: string; + activeOutgoingCount: number; + activeIncomingCount: number; +} + +interface UserSummaryCacheEntry { + value: UserStreamSummary; + expiresAtMs: number; +} + +const USER_SUMMARY_CACHE_TTL_MS = 30_000; +const userSummaryCache = new Map(); + +function pruneUserSummaryCache(nowMs: number): void { + for (const [key, entry] of userSummaryCache.entries()) { + if (entry.expiresAtMs <= nowMs) { + userSummaryCache.delete(key); + } + } +} + +function sumStringI128(values: string[]): string { + let total = 0n; + for (const value of values) { + try { + total += BigInt(value); + } catch { + logger.warn(`[UserSummary] Skipping invalid i128 value: ${value}`); + } + } + return total.toString(); +} + /** * Create a new stream (stub for on-chain indexing) */ @@ -258,3 +296,80 @@ export const getStreamClaimableAmount = async (req: Request, res: Response) => { return res.status(500).json({ error: 'Internal server error' }); } }; + +/** + * Get user-level stream summary used by dashboard/profile cards. + */ +export const getUserStreamSummary = async (req: Request, res: Response) => { + try { + const address = (req.params.address ?? '').trim(); + if (!address) { + return res.status(400).json({ error: 'Address is required' }); + } + + const nowMs = Date.now(); + const cacheKey = address; + const cached = userSummaryCache.get(cacheKey); + if (cached && cached.expiresAtMs > nowMs) { + return res.status(200).json(cached.value); + } + + pruneUserSummaryCache(nowMs); + + const [outgoingStreams, incomingStreams] = await Promise.all([ + prisma.stream.findMany({ + where: { sender: address }, + select: { + withdrawnAmount: true, + isActive: true, + }, + }), + prisma.stream.findMany({ + where: { recipient: address }, + select: { + streamId: true, + ratePerSecond: true, + depositedAmount: true, + withdrawnAmount: true, + lastUpdateTime: true, + isActive: true, + updatedAt: true, + }, + }), + ]); + + const totalStreamsCreated = outgoingStreams.length; + const totalStreamedOut = sumStringI128(outgoingStreams.map((stream) => stream.withdrawnAmount)); + const totalStreamedIn = sumStringI128(incomingStreams.map((stream) => stream.withdrawnAmount)); + const activeOutgoingCount = outgoingStreams.filter((stream) => stream.isActive).length; + const activeIncomingCount = incomingStreams.filter((stream) => stream.isActive).length; + + const calculatedAt = Math.floor(nowMs / 1000); + let claimableTotal = 0n; + for (const stream of incomingStreams) { + if (!stream.isActive) continue; + const claimable = claimableAmountService.getClaimableAmount(stream, calculatedAt); + claimableTotal += BigInt(claimable.claimableAmount); + } + + const summary: UserStreamSummary = { + address, + totalStreamsCreated, + totalStreamedOut, + totalStreamedIn, + currentClaimable: claimableTotal.toString(), + activeOutgoingCount, + activeIncomingCount, + }; + + userSummaryCache.set(cacheKey, { + value: summary, + expiresAtMs: nowMs + USER_SUMMARY_CACHE_TTL_MS, + }); + + return res.status(200).json(summary); + } catch (error) { + logger.error('Error fetching user stream summary:', error); + return res.status(500).json({ error: 'Internal server error' }); + } +}; diff --git a/backend/src/routes/adminRoutes.ts b/backend/src/routes/adminRoutes.ts index 3380afe..be78c4f 100644 --- a/backend/src/routes/adminRoutes.ts +++ b/backend/src/routes/adminRoutes.ts @@ -66,6 +66,8 @@ function adminAuth(req: Request, res: Response, next: NextFunction): void { * properties: * activeConnections: * type: integer + * perIpPeakConnections: + * type: integer * indexer: * type: object * properties: @@ -120,7 +122,10 @@ router.get('/metrics', adminAuth, async (_req: Request, res: Response) => { }, }, events: { last24h: eventsLast24h }, - sse: { activeConnections: sseService.getClientCount() }, + sse: { + activeConnections: sseService.getClientCount(), + perIpPeakConnections: sseService.getPerIpPeakConnections(), + }, indexer: { lastLedger: indexerState?.lastLedger ?? 0, lagSeconds, diff --git a/backend/src/routes/v1/events.routes.ts b/backend/src/routes/v1/events.routes.ts index 495774c..5f0a873 100644 --- a/backend/src/routes/v1/events.routes.ts +++ b/backend/src/routes/v1/events.routes.ts @@ -100,6 +100,15 @@ router.get('/subscribe', requireAuth, subscribe); * activeConnections: * type: number * example: 42 + * activeIps: + * type: number + * example: 8 + * perIpPeakConnections: + * type: number + * example: 5 + * maxConnections: + * type: number + * example: 10000 * timestamp: * type: string * format: date-time @@ -107,6 +116,9 @@ router.get('/subscribe', requireAuth, subscribe); router.get('/stats', (req: Request, res: Response) => { res.json({ activeConnections: sseService.getClientCount(), + activeIps: sseService.getActiveIpCount(), + perIpPeakConnections: sseService.getPerIpPeakConnections(), + maxConnections: sseService.getMaxConnections(), timestamp: new Date().toISOString(), }); }); diff --git a/backend/src/routes/v1/index.ts b/backend/src/routes/v1/index.ts index 7bd3657..6315362 100644 --- a/backend/src/routes/v1/index.ts +++ b/backend/src/routes/v1/index.ts @@ -3,7 +3,8 @@ import streamRoutes from './stream.routes.js'; import eventsRoutes from './events.routes.js'; import userRoutes from './user.routes.js'; import authRoutes from './auth.routes.js'; -import adminRoutes from './admin.routes.js'; +import v1AdminRoutes from './admin.routes.js'; +import adminMetricsRoutes from '../adminRoutes.js'; const router = Router(); @@ -12,6 +13,7 @@ router.use('/streams', streamRoutes); router.use('/events', eventsRoutes); router.use('/users', userRoutes); router.use('/auth', authRoutes); -router.use('/admin', adminRoutes); +router.use('/admin', v1AdminRoutes); +router.use('/admin', adminMetricsRoutes); export default router; diff --git a/backend/src/routes/v1/user.routes.ts b/backend/src/routes/v1/user.routes.ts index e79cf36..098b783 100644 --- a/backend/src/routes/v1/user.routes.ts +++ b/backend/src/routes/v1/user.routes.ts @@ -1,5 +1,6 @@ import { Router } from 'express'; import { registerUser, getUser, getUserEvents, getCurrentUser } from '../../controllers/user.controller.js'; +import { getUserStreamSummary } from '../../controllers/stream.controller.js'; import { authMiddleware } from '../../middleware/auth.middleware.js'; const router = Router(); @@ -84,6 +85,50 @@ const router = Router(); */ router.post('/', registerUser); router.get('/me', authMiddleware, getCurrentUser); +/** + * @openapi + * /v1/users/{address}/summary: + * get: + * tags: + * - Users + * summary: Get aggregate stream summary for a user + * description: | + * Returns dashboard/profile summary data for a wallet address: + * total created streams, total streamed out/in, current claimable across + * active incoming streams, and active stream counts. + * + * Response is cached for 30 seconds to reduce DB load. + * parameters: + * - in: path + * name: address + * required: true + * schema: + * type: string + * description: Stellar public key address + * responses: + * 200: + * description: User stream summary + * content: + * application/json: + * schema: + * type: object + * properties: + * address: + * type: string + * totalStreamsCreated: + * type: integer + * totalStreamedOut: + * type: string + * totalStreamedIn: + * type: string + * currentClaimable: + * type: string + * activeOutgoingCount: + * type: integer + * activeIncomingCount: + * type: integer + */ +router.get('/:address/summary', getUserStreamSummary); router.get('/:publicKey', getUser); /** diff --git a/backend/src/services/sse.service.ts b/backend/src/services/sse.service.ts index 6bb7656..00335d2 100644 --- a/backend/src/services/sse.service.ts +++ b/backend/src/services/sse.service.ts @@ -6,11 +6,30 @@ interface SSEClient { id: string; res: Response; subscriptions: Set; + ip: string; +} + +const MAX_CONNECTIONS_PER_IP = 5; +const RETRY_AFTER_SECONDS = 60; + +interface SSECapacityCheckResult { + allowed: boolean; + status?: number; + retryAfterSeconds?: number; + message?: string; } class SSEService { private clients: Map = new Map(); + private readonly ipConnectionCounts: Map = new Map(); private shuttingDown = false; + private perIpPeakConnections = 0; + + private readonly maxConnections: number = (() => { + const parsed = Number.parseInt(process.env.MAX_SSE_CONNECTIONS ?? '10000', 10); + if (!Number.isFinite(parsed) || parsed <= 0) return 10000; + return parsed; + })(); isShuttingDown(): boolean { return this.shuttingDown; @@ -37,22 +56,66 @@ class SSEService { logger.info('[SSEService] Redis pub/sub subscription active.'); } - addClient(clientId: string, res: Response, subscriptions: string[] = []): void { + checkCapacity(ip: string): SSECapacityCheckResult { + if (this.clients.size >= this.maxConnections) { + return { + allowed: false, + status: 503, + message: 'SSE capacity reached. Please try again shortly.', + }; + } + + const currentIpConnections = this.ipConnectionCounts.get(ip) ?? 0; + if (currentIpConnections >= MAX_CONNECTIONS_PER_IP) { + return { + allowed: false, + status: 429, + retryAfterSeconds: RETRY_AFTER_SECONDS, + message: `Too many SSE connections from this IP. Max ${MAX_CONNECTIONS_PER_IP}.`, + }; + } + + return { allowed: true }; + } + + addClient(clientId: string, res: Response, subscriptions: string[] = [], ip = 'unknown'): void { + const nextIpCount = (this.ipConnectionCounts.get(ip) ?? 0) + 1; + this.ipConnectionCounts.set(ip, nextIpCount); + this.perIpPeakConnections = Math.max(this.perIpPeakConnections, nextIpCount); + const client: SSEClient = { id: clientId, res, subscriptions: new Set(subscriptions), + ip, }; this.clients.set(clientId, client); - logger.info(`SSE client connected: ${clientId}, subscriptions: ${subscriptions.join(', ')}`); + logger.info( + `SSE client connected: ${clientId}, ip: ${ip}, subscriptions: ${subscriptions.join(', ')}` + ); res.on('close', () => { - this.clients.delete(clientId); - logger.info(`SSE client disconnected: ${clientId}`); + this.removeClient(clientId); }); } + private removeClient(clientId: string): void { + const client = this.clients.get(clientId); + if (!client) return; + + this.clients.delete(clientId); + + const currentIpCount = this.ipConnectionCounts.get(client.ip) ?? 0; + if (currentIpCount <= 1) { + this.ipConnectionCounts.delete(client.ip); + } else { + this.ipConnectionCounts.set(client.ip, currentIpCount - 1); + } + + logger.info(`SSE client disconnected: ${clientId}, ip: ${client.ip}`); + } + sendReconnectToAll(): void { this.shuttingDown = true; const message = 'event: reconnect\ndata: {}\n\n'; @@ -106,6 +169,18 @@ class SSEService { getClientCount(): number { return this.clients.size; } + + getMaxConnections(): number { + return this.maxConnections; + } + + getPerIpPeakConnections(): number { + return this.perIpPeakConnections; + } + + getActiveIpCount(): number { + return this.ipConnectionCounts.size; + } } export const sseService = new SSEService(); diff --git a/backend/tests/sse.service.test.ts b/backend/tests/sse.service.test.ts new file mode 100644 index 0000000..58d34a8 --- /dev/null +++ b/backend/tests/sse.service.test.ts @@ -0,0 +1,66 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'node:events'; +import { SSEService } from '../src/services/sse.service.js'; + +function createMockResponse() { + const emitter = new EventEmitter(); + return Object.assign(emitter, { write: vi.fn() }); +} + +describe('SSEService connection limits', () => { + const originalMax = process.env.MAX_SSE_CONNECTIONS; + + beforeEach(() => { + vi.clearAllMocks(); + process.env.MAX_SSE_CONNECTIONS = '10000'; + }); + + afterEach(() => { + process.env.MAX_SSE_CONNECTIONS = originalMax; + }); + + it('rejects the 6th concurrent connection from the same IP with 429', () => { + const service = new SSEService(); + + for (let i = 0; i < 5; i += 1) { + const capacity = service.checkCapacity('127.0.0.1'); + expect(capacity.allowed).toBe(true); + const res = createMockResponse(); + service.addClient(`ip-client-${i}`, res as any, ['*'], '127.0.0.1'); + } + + const sixth = service.checkCapacity('127.0.0.1'); + expect(sixth.allowed).toBe(false); + expect(sixth.status).toBe(429); + expect(sixth.retryAfterSeconds).toBe(60); + }); + + it('rejects connections when global capacity is reached with 503', () => { + process.env.MAX_SSE_CONNECTIONS = '2'; + const service = new SSEService(); + + service.addClient('client-1', createMockResponse() as any, ['*'], '10.0.0.1'); + service.addClient('client-2', createMockResponse() as any, ['*'], '10.0.0.2'); + + const blocked = service.checkCapacity('10.0.0.3'); + expect(blocked.allowed).toBe(false); + expect(blocked.status).toBe(503); + }); + + it('cleans up IP tracking when all connections from that IP close', () => { + const service = new SSEService(); + const resA = createMockResponse(); + const resB = createMockResponse(); + + service.addClient('client-a', resA as any, ['*'], '10.10.10.10'); + service.addClient('client-b', resB as any, ['*'], '10.10.10.10'); + + expect(service.getActiveIpCount()).toBe(1); + + resA.emit('close'); + expect(service.getActiveIpCount()).toBe(1); + + resB.emit('close'); + expect(service.getActiveIpCount()).toBe(0); + }); +}); diff --git a/backend/tests/stream.test.ts b/backend/tests/stream.test.ts index 4270b8f..e01749d 100644 --- a/backend/tests/stream.test.ts +++ b/backend/tests/stream.test.ts @@ -115,3 +115,86 @@ describe('GET /v1/streams', () => { expect(Array.isArray(response.body)).toBe(true); }); }); + +describe('GET /v1/users/:address/summary', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('returns all-zero summary for addresses with no streams', async () => { + prisma.stream.findMany + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([]); + + const address = 'GZERO000000000000000000000000000000000000000000000000000000'; + const response = await request(app).get(`/v1/users/${address}/summary`); + + expect(response.status).toBe(200); + expect(response.body).toMatchObject({ + address, + totalStreamsCreated: 0, + totalStreamedOut: '0', + totalStreamedIn: '0', + currentClaimable: '0', + activeOutgoingCount: 0, + activeIncomingCount: 0, + }); + }); + + it('returns accurate outgoing/incoming aggregates and claimable sum', async () => { + prisma.stream.findMany + .mockResolvedValueOnce([ + { withdrawnAmount: '30', isActive: true }, + { withdrawnAmount: '20', isActive: false }, + ]) + .mockResolvedValueOnce([ + { + streamId: 11, + ratePerSecond: '10', + depositedAmount: '1000', + withdrawnAmount: '100', + lastUpdateTime: 0, + isActive: true, + updatedAt: new Date(), + }, + { + streamId: 12, + ratePerSecond: '1', + depositedAmount: '200', + withdrawnAmount: '50', + lastUpdateTime: 0, + isActive: false, + updatedAt: new Date(), + }, + ]); + + const address = 'GACCURATE00000000000000000000000000000000000000000000000000'; + const response = await request(app).get(`/v1/users/${address}/summary`); + + expect(response.status).toBe(200); + expect(response.body).toMatchObject({ + address, + totalStreamsCreated: 2, + totalStreamedOut: '50', + totalStreamedIn: '150', + currentClaimable: '900', + activeOutgoingCount: 1, + activeIncomingCount: 1, + }); + }); + + it('caches summary results for repeated requests within TTL', async () => { + prisma.stream.findMany + .mockResolvedValueOnce([{ withdrawnAmount: '1', isActive: true }]) + .mockResolvedValueOnce([]); + + const address = 'GCACHE000000000000000000000000000000000000000000000000000000'; + + const first = await request(app).get(`/v1/users/${address}/summary`); + const second = await request(app).get(`/v1/users/${address}/summary`); + + expect(first.status).toBe(200); + expect(second.status).toBe(200); + expect(prisma.stream.findMany).toHaveBeenCalledTimes(2); + }); +}); From 1ccc95d64889c6ef01f29219e193b0b004ccbd47 Mon Sep 17 00:00:00 2001 From: xeladev4 Date: Tue, 28 Apr 2026 10:55:30 +0100 Subject: [PATCH 2/3] feat(frontend): add stream creation template step --- .../stream-creation/StreamCreationWizard.tsx | 190 ++++++++++++++++-- .../stream-creation/TemplateStep.tsx | 124 ++++++++++++ 2 files changed, 302 insertions(+), 12 deletions(-) create mode 100644 frontend/src/components/stream-creation/TemplateStep.tsx diff --git a/frontend/src/components/stream-creation/StreamCreationWizard.tsx b/frontend/src/components/stream-creation/StreamCreationWizard.tsx index 65d3f74..2e82018 100644 --- a/frontend/src/components/stream-creation/StreamCreationWizard.tsx +++ b/frontend/src/components/stream-creation/StreamCreationWizard.tsx @@ -6,6 +6,7 @@ import { RecipientStep } from "./RecipientStep"; import { TokenStep } from "./TokenStep"; import { AmountStep } from "./AmountStep"; import { ScheduleStep } from "./ScheduleStep"; +import { TemplateStep, type StreamTemplate } from "./TemplateStep"; import { fetchTokenBalanceDisplay } from "@/lib/soroban"; import { isValidStellarPublicKey } from "@/lib/stellar"; @@ -15,6 +16,7 @@ export interface StreamFormData { amount: string; duration: string; durationUnit: "seconds" | "minutes" | "hours" | "days" | "weeks" | "months"; + descriptionTag?: string; } interface StreamCreationWizardProps { @@ -23,7 +25,64 @@ interface StreamCreationWizardProps { walletPublicKey?: string; } -const STEPS = ["Recipient", "Token", "Amount", "Schedule"]; +const CUSTOM_TEMPLATE_STORAGE_KEY = "flowfi.stream.wizard.custom-templates.v1"; + +const BUILT_IN_TEMPLATES: StreamTemplate[] = [ + { + id: "monthly-salary", + name: "Monthly Salary", + description: "Recurring monthly payroll stream", + builtIn: true, + values: { + token: "USDC", + amount: "5000", + duration: "1", + durationUnit: "months", + descriptionTag: "salary", + }, + }, + { + id: "weekly-subscription", + name: "Weekly Subscription", + description: "Weekly recurring subscription billing", + builtIn: true, + values: { + token: "USDC", + amount: "49", + duration: "1", + durationUnit: "weeks", + descriptionTag: "subscription", + }, + }, + { + id: "one-time-grant", + name: "One-time Grant", + description: "Short fixed-duration grant payout", + builtIn: true, + values: { + token: "USDC", + amount: "1000", + duration: "14", + durationUnit: "days", + descriptionTag: "grant", + }, + }, + { + id: "custom", + name: "Custom", + description: "Start with blank defaults", + builtIn: true, + values: { + token: "USDC", + amount: "", + duration: "", + durationUnit: "days", + descriptionTag: "custom", + }, + }, +]; + +const STEPS = ["Template", "Recipient", "Token", "Amount", "Schedule"]; export const StreamCreationWizard: React.FC = ({ onClose, @@ -32,18 +91,51 @@ export const StreamCreationWizard: React.FC = ({ }) => { const [currentStep, setCurrentStep] = useState(1); const [isSubmitting, setIsSubmitting] = useState(false); + const [customTemplates, setCustomTemplates] = useState([]); + const [selectedTemplateId, setSelectedTemplateId] = useState("monthly-salary"); + const [customTemplateName, setCustomTemplateName] = useState(""); + const [templateSaveMessage, setTemplateSaveMessage] = useState(null); const [formData, setFormData] = useState({ recipient: "", - token: "", - amount: "", - duration: "", - durationUnit: "days", + token: "USDC", + amount: "5000", + duration: "1", + durationUnit: "months", + descriptionTag: "salary", }); const [errors, setErrors] = useState>>({}); const [walletBalance, setWalletBalance] = useState(null); const [walletBalanceLoading, setWalletBalanceLoading] = useState(false); const [walletBalanceError, setWalletBalanceError] = useState(null); + React.useEffect(() => { + try { + const stored = localStorage.getItem(CUSTOM_TEMPLATE_STORAGE_KEY); + if (!stored) return; + const parsed = JSON.parse(stored); + if (!Array.isArray(parsed)) return; + const sanitized = parsed + .filter((item) => item && typeof item.id === "string" && typeof item.name === "string") + .map((item) => ({ + id: item.id, + name: item.name, + description: item.description || "Saved custom template", + values: item.values || {}, + } as StreamTemplate)); + setCustomTemplates(sanitized); + } catch { + setCustomTemplates([]); + } + }, []); + + React.useEffect(() => { + try { + localStorage.setItem(CUSTOM_TEMPLATE_STORAGE_KEY, JSON.stringify(customTemplates)); + } catch { + // ignore localStorage write errors + } + }, [customTemplates]); + React.useEffect(() => { if (!walletPublicKey || !formData.token) { setWalletBalance(null); @@ -88,23 +180,77 @@ export const StreamCreationWizard: React.FC = ({ }); }; + const allTemplates = React.useMemo( + () => [...BUILT_IN_TEMPLATES, ...customTemplates], + [customTemplates] + ); + + const applyTemplate = (templateId: string) => { + const template = allTemplates.find((item) => item.id === templateId); + if (!template) return; + setSelectedTemplateId(templateId); + setTemplateSaveMessage(`Applied template "${template.name}". You can still edit every field.`); + updateFormData({ + token: template.values.token ?? formData.token, + amount: template.values.amount ?? formData.amount, + duration: template.values.duration ?? formData.duration, + durationUnit: template.values.durationUnit ?? formData.durationUnit, + descriptionTag: template.values.descriptionTag ?? formData.descriptionTag, + }); + }; + + const saveCurrentAsCustomTemplate = () => { + const cleanedName = customTemplateName.trim(); + if (!cleanedName) { + setTemplateSaveMessage("Enter a template name first."); + return; + } + + if (!formData.amount || !formData.duration || !formData.token) { + setTemplateSaveMessage("Set amount, duration, and token before saving a custom template."); + return; + } + + const newTemplate: StreamTemplate = { + id: `custom-${Date.now()}`, + name: cleanedName, + description: formData.descriptionTag + ? `Tag: ${formData.descriptionTag}` + : "Saved custom template", + values: { + token: formData.token, + amount: formData.amount, + duration: formData.duration, + durationUnit: formData.durationUnit, + descriptionTag: formData.descriptionTag || "custom", + }, + }; + + setCustomTemplates((prev) => [newTemplate, ...prev]); + setCustomTemplateName(""); + setTemplateSaveMessage(`Saved custom template "${cleanedName}".`); + setSelectedTemplateId(newTemplate.id); + }; + const validateStep = (step: number): boolean => { const newErrors: Partial> = {}; switch (step) { - case 1: // Recipient + case 1: // Template + break; + case 2: // Recipient if (!formData.recipient.trim()) { newErrors.recipient = "Recipient address is required"; } else if (!isValidStellarPublicKey(formData.recipient.trim())) { newErrors.recipient = "Invalid Stellar public key format"; } break; - case 2: // Token + case 3: // Token if (!formData.token) { newErrors.token = "Please select a token"; } break; - case 3: // Amount + case 4: // Amount if (!formData.amount.trim()) { newErrors.amount = "Amount is required"; } else { @@ -119,7 +265,7 @@ export const StreamCreationWizard: React.FC = ({ } } break; - case 4: // Schedule + case 5: // Schedule if (!formData.duration.trim()) { newErrors.duration = "Duration is required"; } else { @@ -184,6 +330,19 @@ export const StreamCreationWizard: React.FC = ({ const renderStepContent = () => { switch (currentStep) { case 1: + return ( + + ); + case 2: return ( = ({ error={errors.recipient} /> ); - case 2: + case 3: return ( = ({ error={errors.token} /> ); - case 3: + case 4: return ( = ({ }} /> ); - case 4: + case 5: return ( = ({ {Math.round((currentStep / STEPS.length) * 100)}% complete + {formData.descriptionTag && ( +
+ + Tag: {formData.descriptionTag} + +
+ )} {renderStepContent()} diff --git a/frontend/src/components/stream-creation/TemplateStep.tsx b/frontend/src/components/stream-creation/TemplateStep.tsx new file mode 100644 index 0000000..005d381 --- /dev/null +++ b/frontend/src/components/stream-creation/TemplateStep.tsx @@ -0,0 +1,124 @@ +"use client"; +import React from "react"; +import type { StreamFormData } from "./StreamCreationWizard"; + +export interface StreamTemplate { + id: string; + name: string; + description: string; + values: Partial; + builtIn?: boolean; +} + +interface TemplateStepProps { + templates: StreamTemplate[]; + selectedTemplateId: string; + onSelectTemplate: (templateId: string) => void; + customTemplateName: string; + onCustomTemplateNameChange: (value: string) => void; + onSaveCustomTemplate: () => void; + saveDisabled?: boolean; + saveMessage?: string | null; +} + +export const TemplateStep: React.FC = ({ + templates, + selectedTemplateId, + onSelectTemplate, + customTemplateName, + onCustomTemplateNameChange, + onSaveCustomTemplate, + saveDisabled, + saveMessage, +}) => { + const builtInTemplates = templates.filter((template) => template.builtIn); + const customTemplates = templates.filter((template) => !template.builtIn); + + return ( +
+
+

Choose a Template

+

+ Start from a common setup. You can edit everything in the next steps. +

+
+ +
+

Built-in

+
+ {builtInTemplates.map((template) => { + const isSelected = template.id === selectedTemplateId; + return ( + + ); + })} +
+
+ +
+

Custom

+ {customTemplates.length === 0 ? ( +

No custom templates yet.

+ ) : ( +
+ {customTemplates.map((template) => { + const isSelected = template.id === selectedTemplateId; + return ( + + ); + })} +
+ )} +
+ +
+

Save Current Values as Template

+

+ Save your current amount, duration, token, and tag to reuse later. +

+
+ onCustomTemplateNameChange(e.target.value)} + placeholder="Template name" + className="w-full rounded-lg bg-background/40 border border-glass-border px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-accent" + /> + +
+ {saveMessage &&

{saveMessage}

} +
+
+ ); +}; From a60883a3b54c35f9fd476536c3fb899d1f8001a8 Mon Sep 17 00:00:00 2001 From: xeladev4 Date: Tue, 28 Apr 2026 10:55:34 +0100 Subject: [PATCH 3/3] docs: add architecture and onboarding guides --- README.md | 4 + docs/ARCHITECTURE.md | 305 +++++++++++++++---------------------------- docs/DEVELOPMENT.md | 200 ++++++++++++++++++++++++++++ 3 files changed, 308 insertions(+), 201 deletions(-) create mode 100644 docs/DEVELOPMENT.md diff --git a/README.md b/README.md index 54930ff..d46c2ae 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,8 @@ FlowFi consists of three main components that work together: For a detailed explanation of how these components interact, where event indexing happens, and the overall system architecture, see the [Architecture Documentation](docs/ARCHITECTURE.md). +For full local setup and contributor onboarding, see the [Development Guide](docs/DEVELOPMENT.md). + ## Getting Started ### Prerequisites @@ -167,6 +169,8 @@ Contributions are welcome! Please see our [Contributing Guide](CONTRIBUTING.md) - Pull request process - Development scripts and CI workflows +Before your first change, run through the [Development Guide](docs/DEVELOPMENT.md) and review [Architecture Documentation](docs/ARCHITECTURE.md). + For architecture details, see [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md). ## Security diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 7b078da..9316178 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -1,242 +1,145 @@ -# FlowFi Architecture Overview +# FlowFi Architecture -This document provides a high-level overview of how FlowFi's components interact and how the system processes on-chain events. +This document explains how FlowFi moves data from on-chain contract events into API responses and real-time frontend updates. -## System Components +## High-Level Pipeline -FlowFi consists of three main components: +```mermaid +flowchart LR + C["Soroban Stream Contract\nEvent Emission"] --> W["Event Worker / Indexer\nbackend/src/workers"] + W --> P["Prisma ORM"] + P --> D[("PostgreSQL")] + D --> API["Express API\nREST + SSE"] + API --> SSE["SSE Service\nConnection Registry"] + SSE --> FE["Next.js Frontend\nDashboard + Profile"] -1. **Soroban Smart Contracts** - On-chain logic for payment streams -2. **Backend API** - Indexing, API endpoints, and real-time event streaming -3. **Frontend** - User interface built with Next.js - -``` -┌─────────────┐ -│ Frontend │ (Next.js + React) -│ (Port 3000)│ -└──────┬──────┘ - │ HTTP/REST - │ SSE (Server-Sent Events) - ▼ -┌─────────────┐ -│ Backend │ (Express.js + TypeScript) -│ (Port 3001)│ -└──────┬──────┘ - │ - │ Indexes Events - │ Queries State - ▼ -┌─────────────┐ -│ Stellar │ -│ Network │ -└──────┬──────┘ - │ - │ Smart Contract - │ Events & State - ▼ -┌─────────────┐ -│ Soroban │ -│ Contracts │ (Rust) -└─────────────┘ + API <--> R[("Redis Pub/Sub\nMulti-instance fanout")] + R <--> SSE ``` -## Component Interactions - -### 1. Soroban Smart Contracts - -**Location:** `contracts/stream_contract/` - -The smart contract handles all on-chain logic for payment streams: - -- **Stream Creation**: Users create streams by depositing tokens -- **Withdrawals**: Recipients can withdraw available funds -- **Top-ups**: Senders can add more funds to active streams -- **Cancellation**: Senders can cancel streams and receive refunds - -**Key Contract Functions:** -- `create_stream()` - Creates a new payment stream -- `withdraw()` - Withdraws available funds from a stream -- `top_up_stream()` - Adds funds to an existing stream -- `cancel_stream()` - Cancels a stream and refunds remaining balance -- `get_stream()` - Reads stream state - -**Events Emitted:** -The contract emits events for all state changes: -- `stream_created` - When a new stream is created -- `tokens_withdrawn` - When funds are withdrawn -- `stream_topped_up` - When additional funds are added -- `stream_cancelled` - When a stream is cancelled - -### 2. Backend API - -**Location:** `backend/` - -The backend serves multiple purposes: - -#### A. Event Indexing +## Core Components -**Where Indexing Happens:** +1. Soroban contract: source of truth for stream state and events. +1. Event worker/indexer: reads events from Stellar/Soroban, normalizes payloads, and persists stream state + stream events. +1. PostgreSQL + Prisma: query layer for fast read APIs. +1. Express API: serves versioned REST endpoints and long-lived SSE subscriptions. +1. Frontend: consumes REST for initial state and SSE for real-time deltas. -The backend indexes on-chain events from Soroban contracts. The indexing process: +## Event Type Data Flows -1. **Event Detection**: The backend listens to Stellar network events (via Stellar Horizon API or similar) -2. **Event Processing**: When contract events are detected, they are processed and stored -3. **Database Storage**: Events are stored in PostgreSQL using Prisma ORM +### 1) CREATED -**Database Models:** -- `Stream` - Mirrors on-chain stream state for fast querying -- `StreamEvent` - Stores all on-chain events (CREATED, TOPPED_UP, WITHDRAWN, CANCELLED, COMPLETED) -- `User` - Tracks Stellar wallet addresses +1. Contract emits `CREATED`. +1. Worker inserts `Stream` row with sender, recipient, token, amount/rate/timestamps. +1. Worker inserts `StreamEvent` row. +1. SSE broadcasts `stream.created` to stream and user channels. +1. Frontend refreshes outgoing/incoming lists and summary cards. -**Indexing Implementation:** +### 2) TOPPED_UP -The indexing logic is designed to be integrated with a Stellar event listener. See: -- `backend/src/services/indexer-integration.example.ts` - Example integration pattern -- `backend/prisma/schema.prisma` - Database schema for indexed data +1. Contract emits `TOPPED_UP` with top-up amount. +1. Worker updates `Stream.depositedAmount` and `lastUpdateTime`. +1. Worker inserts `StreamEvent`. +1. SSE broadcasts `stream.topped_up`. +1. Frontend updates TVL/deposit values. -**Event Types Indexed:** -- `CREATED` - Stream creation events -- `TOPPED_UP` - Additional funds added -- `WITHDRAWN` - Funds withdrawn by recipient -- `CANCELLED` - Stream cancellation -- `COMPLETED` - Stream completion (all funds withdrawn) +### 3) WITHDRAWN -#### B. REST API +1. Contract emits `WITHDRAWN` with claimed amount. +1. Worker updates `Stream.withdrawnAmount` and `lastUpdateTime`. +1. Worker inserts `StreamEvent`. +1. SSE broadcasts `stream.withdrawn`. +1. Frontend updates balances and claimable indicators. -The backend provides REST endpoints for: +### 4) CANCELLED -- **Stream Management**: Query stream state, create streams (via contract interaction) -- **User Data**: Get user streams, balances, history -- **Health Checks**: API status and metrics +1. Contract emits `CANCELLED`. +1. Worker marks `Stream.isActive = false` and updates `lastUpdateTime`. +1. Worker inserts `StreamEvent`. +1. SSE broadcasts `stream.cancelled`. +1. Frontend moves stream to historical state. -**API Documentation:** -- Swagger UI: `http://localhost:3001/api-docs` -- OpenAPI Spec: `http://localhost:3001/api-docs.json` +### 5) COMPLETED -#### C. Real-Time Event Streaming +1. Contract emits `COMPLETED` (fully drained lifecycle). +1. Worker marks `Stream.isActive = false`. +1. Worker inserts `StreamEvent`. +1. SSE broadcasts `stream.completed`. +1. Frontend marks stream complete. -**Server-Sent Events (SSE):** +### 6) PAUSED / RESUMED -The backend provides SSE endpoints for real-time updates: +FlowFi timing math supports pause windows by excluding paused wall-clock duration from effective streaming time. -- **Endpoint**: `GET /events/subscribe` -- **Purpose**: Push real-time stream updates to frontend clients -- **Event Types**: `stream.created`, `stream.topped_up`, `stream.withdrawn`, `stream.cancelled`, `stream.completed` +Paused behavior: -**How It Works:** -1. Frontend connects to SSE endpoint -2. Backend maintains connection and broadcasts events -3. When on-chain events are indexed, they trigger SSE broadcasts -4. Frontend receives real-time updates without polling +1. On `PAUSED`, worker stores pause start metadata and stream remains non-progressing. +1. On `RESUMED`, worker computes paused interval duration and accumulates `totalPausedSeconds`. +1. Claimable calculations use effective elapsed time: -See `backend/docs/SSE_ARCHITECTURE.md` for detailed SSE implementation. +$$ +\text{effectiveElapsed} = \max(0,\, now - lastUpdateTime - totalPausedSecondsSinceLastUpdate) +$$ -### 3. Frontend +$$ +\text{streamed} = \text{effectiveElapsed} \times \text{ratePerSecond} +$$ -**Location:** `frontend/` +$$ +\text{claimable} = \min(\text{streamed},\, depositedAmount - withdrawnAmount) +$$ -The frontend is a Next.js application that: +This prevents paused periods from increasing claimable balance. -- **Displays Streams**: Shows active streams, incoming/outgoing payments -- **Wallet Integration**: Connects to Stellar wallets (Freighter, etc.) -- **Real-Time Updates**: Subscribes to SSE events for live stream updates -- **Stream Management**: UI for creating, viewing, and managing streams +## Pause/Resume Timing Model -**Key Features:** -- Dashboard with stream overview -- Incoming/outgoing stream lists -- Real-time balance updates via SSE -- Wallet connection and transaction signing +Rules used by backend/domain logic: -## Data Flow +1. Time is tracked in Unix seconds. +1. Claimable only advances while stream is active and not paused. +1. Multiple pause/resume intervals are cumulative. +1. Resume re-baselines time accounting so no double counting occurs. +1. Cancellation/completion finalizes stream and halts further accrual. -### Creating a Stream +## Authentication Flow -1. **User Action**: User fills out stream creation form in frontend -2. **Frontend**: Prepares transaction and prompts wallet for signature -3. **Stellar Network**: Transaction is submitted and processed -4. **Contract**: `create_stream()` executes, emits `stream_created` event -5. **Backend Indexer**: Detects event, stores in database -6. **Backend SSE**: Broadcasts event to subscribed clients -7. **Frontend**: Receives SSE update, UI updates automatically +```mermaid +sequenceDiagram + participant U as User Wallet (Freighter) + participant FE as Frontend + participant API as Backend Auth API + participant SSE as SSE Endpoint -### Withdrawing from a Stream - -1. **User Action**: Recipient clicks withdraw in frontend -2. **Frontend**: Prepares withdrawal transaction, prompts wallet -3. **Stellar Network**: Transaction submitted -4. **Contract**: `withdraw()` executes, emits `tokens_withdrawn` event -5. **Backend Indexer**: Detects event, updates database -6. **Backend SSE**: Broadcasts withdrawal event -7. **Frontend**: Updates balance and stream state - -### Querying Stream State - -1. **User Action**: User navigates to stream details page -2. **Frontend**: Makes REST API call to backend -3. **Backend**: Queries indexed database (fast, no on-chain call needed) -4. **Backend**: Returns stream data -5. **Frontend**: Displays stream information - -For real-time accuracy, the frontend can also: -- Subscribe to SSE events for that specific stream -- Receive updates immediately when on-chain events occur - -## Technology Stack - -### Smart Contracts -- **Language**: Rust -- **Framework**: Soroban SDK -- **Build Target**: `wasm32-unknown-unknown` - -### Backend -- **Runtime**: Node.js -- **Framework**: Express.js -- **Language**: TypeScript -- **Database**: PostgreSQL -- **ORM**: Prisma -- **Real-Time**: Server-Sent Events (SSE) - -### Frontend -- **Framework**: Next.js 16 -- **Language**: TypeScript -- **Styling**: Tailwind CSS -- **State Management**: React Context -- **Wallet Integration**: Stellar SDK - -## Development Workflow - -### Local Development - -1. **Start Infrastructure**: `docker compose up` (PostgreSQL) -2. **Start Backend**: `cd backend && npm run dev` -3. **Start Frontend**: `cd frontend && npm run dev` -4. **Deploy Contracts**: Build and deploy to Stellar testnet + FE->>API: Request challenge for public key + API-->>FE: Nonce/challenge payload + FE->>U: Ask wallet to sign challenge + U-->>FE: Signed challenge + FE->>API: Verify signature + API-->>FE: JWT token + FE->>SSE: Subscribe with Bearer JWT + SSE-->>FE: Connected + real-time events +``` -### Testing +## SSE in Single vs Multi-Instance Mode -- **Contracts**: Rust unit tests in contract source files -- **Backend**: Vitest for API and service tests -- **Frontend**: Next.js testing utilities +Single instance: -## Security Considerations +1. API writes SSE event directly to in-memory client registry. -- **Rate Limiting**: Backend implements rate limiting on all endpoints -- **Input Validation**: Zod schemas validate all API inputs -- **Authentication**: Wallet-based authentication via Stellar signatures -- **Event Verification**: Indexed events are verified against on-chain state +Multi-instance (recommended for horizontal scale): -## Future Enhancements +1. Instance A receives event and publishes to Redis channels (`sse:stream:*`, `sse:user:*`). +1. All API instances subscribe to matching channels. +1. Each instance rebroadcasts to its own connected clients. -Potential areas for improvement: +Benefits: -- **Indexer Service**: Dedicated microservice for event indexing -- **Caching Layer**: Redis for frequently accessed stream data -- **WebSocket Support**: Alternative to SSE for bidirectional communication -- **GraphQL API**: More flexible querying for complex frontend needs +1. Real-time fanout works across replicas. +1. Sticky sessions are not required for event delivery. +1. API replicas can scale independently while preserving SSE correctness. -## Related Documentation +## Operational Notes -- [SSE Architecture](backend/docs/SSE_ARCHITECTURE.md) - Detailed SSE implementation -- [Backend README](backend/SSE_README.md) - Backend-specific documentation -- [Contributing Guide](../CONTRIBUTING.md) - Development setup and workflows +1. `/v1/events/stats` exposes active SSE connections and connection-capacity metrics. +1. Admin metrics include SSE peak-per-IP visibility for abuse monitoring. +1. User summary endpoint (`/v1/users/{address}/summary`) is cached for 30s to protect DB hot paths. diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md new file mode 100644 index 0000000..f0a6264 --- /dev/null +++ b/docs/DEVELOPMENT.md @@ -0,0 +1,200 @@ +# Development Onboarding + +This guide is intended to let a new contributor run the full FlowFi stack from a fresh clone. + +## Prerequisites + +Required: + +1. Node.js 20.x and npm +1. Rust toolchain (rustup + cargo) +1. PostgreSQL 14+ + +Recommended: + +1. Stellar CLI / Soroban CLI +1. Redis 7+ (optional for multi-instance SSE testing) +1. Docker + Docker Compose (easiest local infra) + +## 1) Clone and Install + +```bash +git clone https://github.com/LabsCrypt/flowfi.git +cd flowfi +``` + +Install root helpers (if any): + +```bash +npm install +``` + +Install backend + frontend dependencies: + +```bash +cd backend && npm install +cd ../frontend && npm install +cd .. +``` + +## 2) Start Infrastructure + +### Option A: Docker (recommended) + +```bash +docker compose up -d postgres +``` + +If your compose file includes Redis and you want to test pub/sub SSE fanout: + +```bash +docker compose up -d redis +``` + +### Option B: Local services + +Run PostgreSQL locally and create/update your database for `DATABASE_URL`. +Run Redis locally only if needed. + +## 3) Configure Environment + +Create backend env file (example values): + +```bash +cd backend +cp .env.example .env 2>/dev/null || true +``` + +Set at least: + +1. `DATABASE_URL=postgresql://...` +1. `JWT_SECRET=...` +1. `STELLAR_NETWORK=testnet` +1. `MAX_SSE_CONNECTIONS=10000` + +Optional Redis for multi-instance SSE: + +1. `REDIS_URL=redis://localhost:6379` + +Return to repo root after editing env. + +## 4) Prepare Database + +```bash +cd backend +npm run prisma:generate +npm run prisma:migrate +``` + +Optional seed: + +```bash +npm run prisma:seed +``` + +## 5) Build and Run Contracts (optional for UI/API iteration, required for full chain flow) + +```bash +cd contracts +cargo build --target wasm32-unknown-unknown --release +``` + +If deploying/testing contracts with CLI, ensure Stellar CLI is configured for testnet. + +## 6) Run Backend + +```bash +cd backend +npm run dev +``` + +Backend endpoints: + +1. API base: `http://localhost:3001/v1` +1. Swagger UI: `http://localhost:3001/api-docs` +1. Health: `http://localhost:3001/health` + +## 7) Run Frontend + +In a second terminal: + +```bash +cd frontend +npm run dev +``` + +Frontend app: + +1. `http://localhost:3000` + +## 8) Run Tests + +Backend: + +```bash +cd backend +npm test +``` + +Frontend lint: + +```bash +cd frontend +npm run lint +``` + +## Common Issues + +### Indexer not syncing + +Symptoms: + +1. Streams created on-chain do not appear in dashboard. + +Checks: + +1. Confirm backend worker/indexer is running. +1. Verify Stellar network config (`testnet` vs `mainnet`) matches your transactions. +1. Verify `IndexerState` row updates in DB. +1. Check backend logs for RPC/Horizon throttling or cursor errors. + +### SSE drops or reconnect loops + +Symptoms: + +1. Live updates stop; browser reconnects repeatedly. + +Checks: + +1. Verify JWT is valid and unexpired. +1. Check `/v1/events/stats` for capacity limits. +1. Confirm per-IP limit not exceeded (6th SSE connection returns 429). +1. In multi-instance deployments, ensure Redis pub/sub connectivity on all instances. + +### Auth errors (401/403) + +Symptoms: + +1. `Unauthorized` during subscribe or protected endpoint calls. + +Checks: + +1. Sign challenge using the same wallet/public key you verify. +1. Ensure backend `JWT_SECRET` is stable across restarts if testing long sessions. +1. Confirm `Authorization: Bearer ` header is present. + +### Prisma or DB migration failures + +Checks: + +1. Ensure `DATABASE_URL` points to a reachable DB. +1. Reset local DB and rerun migrations if schema drift occurred. +1. Regenerate Prisma client after schema changes. + +## Suggested Day-1 Workflow + +1. Start Postgres (and optionally Redis). +1. Run backend migrations. +1. Start backend and open Swagger. +1. Start frontend and connect wallet. +1. Create a stream and verify updates via dashboard + SSE.