Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ FlowFi consists of three main components that work together:

For a detailed explanation of how these components interact, where event indexing happens, and the overall system architecture, see the [Architecture Documentation](docs/ARCHITECTURE.md).

For full local setup and contributor onboarding, see the [Development Guide](docs/DEVELOPMENT.md).

## Getting Started

### Prerequisites
Expand Down Expand Up @@ -167,6 +169,8 @@ Contributions are welcome! Please see our [Contributing Guide](CONTRIBUTING.md)
- Pull request process
- Development scripts and CI workflows

Before your first change, run through the [Development Guide](docs/DEVELOPMENT.md) and review [Architecture Documentation](docs/ARCHITECTURE.md).

For architecture details, see [docs/ARCHITECTURE.md](docs/ARCHITECTURE.md).

## Security
Expand Down
25 changes: 24 additions & 1 deletion backend/src/controllers/sse.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,35 @@ const subscribeSchema = z.object({
all: z.boolean().optional().default(false),
});

function getClientIp(req: Request): string {
const forwarded = req.headers['x-forwarded-for'];
if (typeof forwarded === 'string' && forwarded.trim().length > 0) {
return forwarded.split(',')[0]?.trim() || 'unknown';
}

if (Array.isArray(forwarded) && forwarded.length > 0) {
return forwarded[0] ?? 'unknown';
}

return req.ip || req.socket.remoteAddress || 'unknown';
}
export const subscribe = async (req: Request, res: Response) => {
if (sseService.isShuttingDown()) {
return res.status(503).json({ message: 'Server is shutting down, please reconnect shortly.' });
}

try {
const sourceIp = getClientIp(req);
const capacity = sseService.checkCapacity(sourceIp);
if (!capacity.allowed) {
if (capacity.retryAfterSeconds) {
res.setHeader('Retry-After', String(capacity.retryAfterSeconds));
}
return res.status(capacity.status ?? 503).json({
message: capacity.message ?? 'SSE connection rejected',
});
}

const { publicKey } = (req as AuthenticatedRequest).user;
const { streams, all } = subscribeSchema.parse(req.query);

Expand Down Expand Up @@ -50,7 +73,7 @@ export const subscribe = async (req: Request, res: Response) => {

res.write(`data: ${JSON.stringify({ type: 'connected', clientId })}\n\n`);

sseService.addClient(clientId, res, subscriptions);
sseService.addClient(clientId, res, subscriptions, sourceIp);
} catch (error: any) {
if (error.name === 'ZodError') {
return res.status(400).json({
Expand Down
115 changes: 115 additions & 0 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@ import logger from '../logger.js';
import { claimableAmountService } from '../services/claimable.service.js';
import { getStreamFromChain, getClaimableFromChain, isStale } from '../services/sorobanService.js';

interface UserStreamSummary {
address: string;
totalStreamsCreated: number;
totalStreamedOut: string;
totalStreamedIn: string;
currentClaimable: string;
activeOutgoingCount: number;
activeIncomingCount: number;
}

interface UserSummaryCacheEntry {
value: UserStreamSummary;
expiresAtMs: number;
}

const USER_SUMMARY_CACHE_TTL_MS = 30_000;
const userSummaryCache = new Map<string, UserSummaryCacheEntry>();

function pruneUserSummaryCache(nowMs: number): void {
for (const [key, entry] of userSummaryCache.entries()) {
if (entry.expiresAtMs <= nowMs) {
userSummaryCache.delete(key);
}
}
}

function sumStringI128(values: string[]): string {
let total = 0n;
for (const value of values) {
try {
total += BigInt(value);
} catch {
logger.warn(`[UserSummary] Skipping invalid i128 value: ${value}`);
}
}
return total.toString();
}

/**
* Create a new stream (stub for on-chain indexing)
*/
Expand Down Expand Up @@ -258,3 +296,80 @@ export const getStreamClaimableAmount = async (req: Request, res: Response) => {
return res.status(500).json({ error: 'Internal server error' });
}
};

/**
* Get user-level stream summary used by dashboard/profile cards.
*/
export const getUserStreamSummary = async (req: Request, res: Response) => {
try {
const address = (req.params.address ?? '').trim();
if (!address) {
return res.status(400).json({ error: 'Address is required' });
}

const nowMs = Date.now();
const cacheKey = address;
const cached = userSummaryCache.get(cacheKey);
if (cached && cached.expiresAtMs > nowMs) {
return res.status(200).json(cached.value);
}

pruneUserSummaryCache(nowMs);

const [outgoingStreams, incomingStreams] = await Promise.all([
prisma.stream.findMany({
where: { sender: address },
select: {
withdrawnAmount: true,
isActive: true,
},
}),
prisma.stream.findMany({
where: { recipient: address },
select: {
streamId: true,
ratePerSecond: true,
depositedAmount: true,
withdrawnAmount: true,
lastUpdateTime: true,
isActive: true,
updatedAt: true,
},
}),
]);

const totalStreamsCreated = outgoingStreams.length;
const totalStreamedOut = sumStringI128(outgoingStreams.map((stream) => stream.withdrawnAmount));
const totalStreamedIn = sumStringI128(incomingStreams.map((stream) => stream.withdrawnAmount));
const activeOutgoingCount = outgoingStreams.filter((stream) => stream.isActive).length;
const activeIncomingCount = incomingStreams.filter((stream) => stream.isActive).length;

const calculatedAt = Math.floor(nowMs / 1000);
let claimableTotal = 0n;
for (const stream of incomingStreams) {
if (!stream.isActive) continue;
const claimable = claimableAmountService.getClaimableAmount(stream, calculatedAt);
claimableTotal += BigInt(claimable.claimableAmount);
}

const summary: UserStreamSummary = {
address,
totalStreamsCreated,
totalStreamedOut,
totalStreamedIn,
currentClaimable: claimableTotal.toString(),
activeOutgoingCount,
activeIncomingCount,
};

userSummaryCache.set(cacheKey, {
value: summary,
expiresAtMs: nowMs + USER_SUMMARY_CACHE_TTL_MS,
});

return res.status(200).json(summary);
} catch (error) {
logger.error('Error fetching user stream summary:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};
7 changes: 6 additions & 1 deletion backend/src/routes/adminRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ function adminAuth(req: Request, res: Response, next: NextFunction): void {
* properties:
* activeConnections:
* type: integer
* perIpPeakConnections:
* type: integer
* indexer:
* type: object
* properties:
Expand Down Expand Up @@ -120,7 +122,10 @@ router.get('/metrics', adminAuth, async (_req: Request, res: Response) => {
},
},
events: { last24h: eventsLast24h },
sse: { activeConnections: sseService.getClientCount() },
sse: {
activeConnections: sseService.getClientCount(),
perIpPeakConnections: sseService.getPerIpPeakConnections(),
},
indexer: {
lastLedger: indexerState?.lastLedger ?? 0,
lagSeconds,
Expand Down
12 changes: 12 additions & 0 deletions backend/src/routes/v1/events.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,25 @@ router.get('/subscribe', requireAuth, subscribe);
* activeConnections:
* type: number
* example: 42
* activeIps:
* type: number
* example: 8
* perIpPeakConnections:
* type: number
* example: 5
* maxConnections:
* type: number
* example: 10000
* timestamp:
* type: string
* format: date-time
*/
router.get('/stats', (req: Request, res: Response) => {
res.json({
activeConnections: sseService.getClientCount(),
activeIps: sseService.getActiveIpCount(),
perIpPeakConnections: sseService.getPerIpPeakConnections(),
maxConnections: sseService.getMaxConnections(),
timestamp: new Date().toISOString(),
});
});
Expand Down
6 changes: 4 additions & 2 deletions backend/src/routes/v1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import streamRoutes from './stream.routes.js';
import eventsRoutes from './events.routes.js';
import userRoutes from './user.routes.js';
import authRoutes from './auth.routes.js';
import adminRoutes from './admin.routes.js';
import v1AdminRoutes from './admin.routes.js';
import adminMetricsRoutes from '../adminRoutes.js';

const router = Router();

Expand All @@ -12,6 +13,7 @@ router.use('/streams', streamRoutes);
router.use('/events', eventsRoutes);
router.use('/users', userRoutes);
router.use('/auth', authRoutes);
router.use('/admin', adminRoutes);
router.use('/admin', v1AdminRoutes);
router.use('/admin', adminMetricsRoutes);

export default router;
45 changes: 45 additions & 0 deletions backend/src/routes/v1/user.routes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Router } from 'express';
import { registerUser, getUser, getUserEvents, getCurrentUser } from '../../controllers/user.controller.js';
import { getUserStreamSummary } from '../../controllers/stream.controller.js';
import { authMiddleware } from '../../middleware/auth.middleware.js';

const router = Router();
Expand Down Expand Up @@ -84,6 +85,50 @@ const router = Router();
*/
router.post('/', registerUser);
router.get('/me', authMiddleware, getCurrentUser);
/**
* @openapi
* /v1/users/{address}/summary:
* get:
* tags:
* - Users
* summary: Get aggregate stream summary for a user
* description: |
* Returns dashboard/profile summary data for a wallet address:
* total created streams, total streamed out/in, current claimable across
* active incoming streams, and active stream counts.
*
* Response is cached for 30 seconds to reduce DB load.
* parameters:
* - in: path
* name: address
* required: true
* schema:
* type: string
* description: Stellar public key address
* responses:
* 200:
* description: User stream summary
* content:
* application/json:
* schema:
* type: object
* properties:
* address:
* type: string
* totalStreamsCreated:
* type: integer
* totalStreamedOut:
* type: string
* totalStreamedIn:
* type: string
* currentClaimable:
* type: string
* activeOutgoingCount:
* type: integer
* activeIncomingCount:
* type: integer
*/
router.get('/:address/summary', getUserStreamSummary);
router.get('/:publicKey', getUser);

/**
Expand Down
Loading
Loading