diff --git a/backend/src/routes/v1/admin.routes.ts b/backend/src/routes/v1/admin.routes.ts index 137714f..3eb772f 100644 --- a/backend/src/routes/v1/admin.routes.ts +++ b/backend/src/routes/v1/admin.routes.ts @@ -31,7 +31,7 @@ router.get('/metrics', async (_req: Request, res: Response) => { try { const since24h = new Date(Date.now() - 24 * 60 * 60 * 1000); - const [activeCount, totalCount, cancelledCount, completedCount, eventsLast24h, indexerState] = + const [activeCount, totalCount, cancelledCount, completedCount, eventsLast24h, indexerState, feeEvents, feesLast24h] = await Promise.all([ prisma.stream.count({ where: { isActive: true } }), prisma.stream.count(), @@ -43,8 +43,38 @@ router.get('/metrics', async (_req: Request, res: Response) => { }), prisma.streamEvent.count({ where: { createdAt: { gte: since24h } } }), prisma.indexerState.findUnique({ where: { id: 'singleton' } }), + prisma.streamEvent.findMany({ + where: { eventType: 'FEE_COLLECTED' }, + select: { amount: true, metadata: true }, + }), + prisma.streamEvent.findMany({ + where: { eventType: 'FEE_COLLECTED', createdAt: { gte: since24h } }, + select: { amount: true, metadata: true }, + }), ]); + // Aggregate fees by token + const totalFeesCollectedByToken: Record = {}; + const feesLast24hByToken: Record = {}; + + for (const event of feeEvents) { + const metadata = event.metadata ? JSON.parse(event.metadata) : {}; + const token = metadata.token || 'unknown'; + const amount = BigInt(event.amount || '0'); + totalFeesCollectedByToken[token] = ( + BigInt(totalFeesCollectedByToken[token] || '0') + amount + ).toString(); + } + + for (const event of feesLast24h) { + const metadata = event.metadata ? JSON.parse(event.metadata) : {}; + const token = metadata.token || 'unknown'; + const amount = BigInt(event.amount || '0'); + feesLast24hByToken[token] = ( + BigInt(feesLast24hByToken[token] || '0') + amount + ).toString(); + } + const nowSec = Math.floor(Date.now() / 1000); const lagSeconds = indexerState ? nowSec - Math.floor(indexerState.updatedAt.getTime() / 1000) @@ -61,6 +91,10 @@ router.get('/metrics', async (_req: Request, res: Response) => { }, }, events: { last24h: eventsLast24h }, + fees: { + totalFeesCollectedByToken, + feesLast24h: feesLast24hByToken, + }, sse: { activeConnections: sseService.getClientCount() }, indexer: { lastLedger: indexerState?.lastLedger ?? 0, diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index dc5c877..62392ab 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -261,6 +261,9 @@ export class SorobanEventWorker { case 'stream_completed': await this.handleStreamCompleted(event, topic1); break; + case 'fee_collected': + await this.handleFeeCollected(event, topic1); + break; default: // Unrecognised event — ignore silently. break; @@ -554,6 +557,46 @@ export class SorobanEventWorker { timestamp, }); } + + private async handleFeeCollected( + event: rpc.Api.EventResponse, + streamIdTopic: xdr.ScVal, + ): Promise { + const streamId = Number(decodeU64(streamIdTopic)); + const body = decodeMap(event.value); + + if (!body['treasury'] || !body['fee_amount'] || !body['token']) { + throw new Error(`FeeCollected #${streamId}: missing body fields`); + } + + const treasury = decodeAddress(body['treasury']); + const feeAmount = decodeI128(body['fee_amount']); + const token = decodeAddress(body['token']); + const timestamp = Math.floor(Date.now() / 1000); + + await prisma.streamEvent.create({ + data: { + streamId, + eventType: 'FEE_COLLECTED', + amount: feeAmount, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ treasury, token }), + }, + }); + + // Broadcast to admin channel for treasury reporting + sseService.broadcast('admin', 'stream.fee_collected', { + streamId, + treasury, + feeAmount, + token, + transactionHash: event.txHash, + ledger: event.ledger, + timestamp, + }); + } } export const sorobanEventWorker = new SorobanEventWorker(); diff --git a/frontend/src/app/activity/page.tsx b/frontend/src/app/activity/page.tsx new file mode 100644 index 0000000..289bfec --- /dev/null +++ b/frontend/src/app/activity/page.tsx @@ -0,0 +1,116 @@ +"use client"; + +import React, { useState, useEffect } from 'react'; +import { useWallet } from '@/context/wallet-context'; +import { BackendStreamEvent } from '@/lib/api-types'; +import { fetchUserEvents } from '@/lib/dashboard'; +import { ActivityHistory } from '@/components/dashboard/ActivityHistory'; +import { downloadCSV } from '@/utils/csvExport'; +import { fromStroops } from '@/utils/amount'; + +type EventFilter = 'All' | 'CREATED' | 'TOPPED_UP' | 'WITHDRAWN' | 'CANCELLED' | 'COMPLETED'; + +export default function ActivityPage() { + const { session } = useWallet(); + const [events, setEvents] = useState([]); + const [filteredEvents, setFilteredEvents] = useState([]); + const [isLoading, setIsLoading] = useState(true); + const [activeFilter, setActiveFilter] = useState('All'); + + useEffect(() => { + if (session?.publicKey) { + loadEvents(); + } + }, [session?.publicKey]); + + useEffect(() => { + if (activeFilter === 'All') { + setFilteredEvents(events); + } else { + setFilteredEvents(events.filter(e => e.eventType === activeFilter)); + } + }, [activeFilter, events]); + + const loadEvents = async () => { + if (!session?.publicKey) return; + setIsLoading(true); + try { + const data = await fetchUserEvents(session.publicKey); + setEvents(data); + setFilteredEvents(data); + } catch (error) { + console.error('Failed to load events:', error); + } finally { + setIsLoading(false); + } + }; + + const handleExportCSV = () => { + const csvData = filteredEvents.map(event => ({ + 'Stream ID': event.streamId, + 'Event Type': event.eventType, + 'Amount': event.amount ? fromStroops(BigInt(event.amount), 7) : '0', + 'Timestamp': new Date(event.timestamp * 1000).toLocaleString(), + 'Transaction Hash': event.transactionHash, + 'Ledger': event.ledgerSequence, + })); + downloadCSV(csvData, `flowfi-activity-${Date.now()}.csv`); + }; + + const filters: EventFilter[] = ['All', 'CREATED', 'TOPPED_UP', 'WITHDRAWN', 'CANCELLED', 'COMPLETED']; + + if (!session) { + return ( +
+
+

Connect Your Wallet

+

Please connect your wallet to view activity history

+
+
+ ); + } + + return ( +
+
+
+

Stream Activity History

+

View all your stream events and transactions

+
+ +
+
+ {filters.map(filter => ( + + ))} +
+ +
+

+ Showing {filteredEvents.length} of {events.length} events +

+ +
+
+ + +
+
+ ); +} diff --git a/frontend/src/components/NotificationDropdown.tsx b/frontend/src/components/NotificationDropdown.tsx index 9e01fcc..741ef15 100644 --- a/frontend/src/components/NotificationDropdown.tsx +++ b/frontend/src/components/NotificationDropdown.tsx @@ -3,6 +3,7 @@ import React, { useState, useEffect } from 'react'; import { BackendStreamEvent } from '@/lib/api-types'; import { fetchUserEvents } from '@/lib/dashboard'; import { Button } from './ui/Button'; +import { useStreamEvents } from '@/hooks/useStreamEvents'; interface NotificationDropdownProps { publicKey: string; @@ -12,18 +13,63 @@ export const NotificationDropdown: React.FC = ({ publ const [isOpen, setIsOpen] = useState(false); const [events, setEvents] = useState([]); const [isLoading, setIsLoading] = useState(false); + const [unreadCount, setUnreadCount] = useState(0); + + // Wire up SSE for real-time events + const { events: streamEvents } = useStreamEvents({ + userPublicKeys: [publicKey], + autoReconnect: true, + }); useEffect(() => { if (isOpen && publicKey) { loadEvents(); + setUnreadCount(0); // Clear unread count when dropdown opens } }, [isOpen, publicKey]); + // Handle incoming SSE events + useEffect(() => { + if (streamEvents.length > 0 && !isOpen) { + // Increment unread count for new events while dropdown is closed + setUnreadCount(prev => prev + 1); + } + + // Prepend live events to notification list + if (streamEvents.length > 0) { + const latestEvent = streamEvents[0]; + const newEvent: BackendStreamEvent = { + id: `sse-${Date.now()}`, + streamId: latestEvent.data.streamId || 0, + eventType: mapEventType(latestEvent.type), + amount: latestEvent.data.amount || latestEvent.data.feeAmount || null, + transactionHash: latestEvent.data.transactionHash || '', + ledgerSequence: latestEvent.data.ledger || 0, + timestamp: latestEvent.timestamp / 1000, + metadata: null, + createdAt: new Date().toISOString(), + }; + + setEvents(prev => [newEvent, ...prev.slice(0, 19)]); // Keep max 20 + } + }, [streamEvents, isOpen]); + + const mapEventType = (type: string): BackendStreamEvent['eventType'] => { + switch (type) { + case 'created': return 'CREATED'; + case 'topped_up': return 'TOPPED_UP'; + case 'withdrawn': return 'WITHDRAWN'; + case 'cancelled': return 'CANCELLED'; + case 'completed': return 'COMPLETED'; + default: return 'CREATED'; + } + }; + const loadEvents = async () => { setIsLoading(true); try { const data = await fetchUserEvents(publicKey); - setEvents(data.slice(0, 5)); // Show only last 5 + setEvents(data.slice(0, 20)); // Show last 20 } catch (error) { console.error(error); } finally { @@ -38,6 +84,7 @@ export const NotificationDropdown: React.FC = ({ publ case 'TOPPED_UP': return `Topped up #${event.streamId}`; case 'WITHDRAWN': return `Withdrew ${amount} from #${event.streamId}`; case 'CANCELLED': return `Cancelled #${event.streamId}`; + case 'COMPLETED': return `Completed #${event.streamId}`; default: return `Event on #${event.streamId}`; } }; @@ -51,8 +98,10 @@ export const NotificationDropdown: React.FC = ({ publ - {events.length > 0 && ( - + {unreadCount > 0 && ( + + {unreadCount > 9 ? '9+' : unreadCount} + )} diff --git a/frontend/src/components/dashboard/ActivityHistory.tsx b/frontend/src/components/dashboard/ActivityHistory.tsx index 0dc05a6..7dd48c1 100644 --- a/frontend/src/components/dashboard/ActivityHistory.tsx +++ b/frontend/src/components/dashboard/ActivityHistory.tsx @@ -2,6 +2,7 @@ import React from 'react'; import { BackendStreamEvent } from '@/lib/api-types'; import { fromStroops } from '@/utils/amount'; import TransactionTracker from '@/components/TransactionTracker'; +import Link from 'next/link'; interface ActivityHistoryProps { events: BackendStreamEvent[]; @@ -29,6 +30,17 @@ export const ActivityHistory: React.FC = ({ events, isLoad } }; + const getEventBadgeColor = (eventType: string) => { + switch (eventType) { + case 'CREATED': return 'bg-blue-500/10 text-blue-400'; + case 'TOPPED_UP': return 'bg-green-500/10 text-green-400'; + case 'WITHDRAWN': return 'bg-purple-500/10 text-purple-400'; + case 'CANCELLED': return 'bg-red-500/10 text-red-400'; + case 'COMPLETED': return 'bg-emerald-500/10 text-emerald-400'; + default: return 'bg-accent/10 text-accent'; + } + }; + if (isLoading) { return (
@@ -54,17 +66,45 @@ export const ActivityHistory: React.FC = ({ events, isLoad
{events.map((event) => (
-
-
-

{formatEventMessage(event)}

-

+

+
+
+ + {formatEventMessage(event)} + +
+

{new Date(event.timestamp * 1000).toLocaleString()}

-
+
{event.eventType}
+ {event.amount && ( +
+ Amount: {fromStroops(BigInt(event.amount), 7)} +
+ )} + {event.txHash && ( +
+ Tx: + + {event.txHash} + + + + +
+ )} {(event.txHash || event.transactionStatus) && (
{ - const amount = prompt(`Enter amount to add to stream ${streamId}:`); - if (amount && !Number.isNaN(parseFloat(amount)) && parseFloat(amount) > 0) { - console.log(`Adding ${amount} funds to stream ${streamId}`); - // TODO: Integrate with Soroban contract's top_up_stream function - alert(`Successfully added ${amount} to stream ${streamId}`); - } + const handleTopUp = (stream: Stream) => { + setModal({ type: "topup", stream }); }; const handleApplyTemplate = (templateId: string) => { diff --git a/frontend/src/lib/api-types.ts b/frontend/src/lib/api-types.ts index 1a6d48e..815206d 100644 --- a/frontend/src/lib/api-types.ts +++ b/frontend/src/lib/api-types.ts @@ -5,7 +5,7 @@ export interface BackendUser { updatedAt: string; } -export type StreamEventType = "CREATED" | "TOPPED_UP" | "WITHDRAWN" | "CANCELLED" | "COMPLETED"; +export type StreamEventType = "CREATED" | "TOPPED_UP" | "WITHDRAWN" | "CANCELLED" | "COMPLETED" | "FEE_COLLECTED"; export interface BackendStreamEvent { id: string;