Skip to content
36 changes: 35 additions & 1 deletion backend/src/routes/v1/admin.routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ router.get('/metrics', async (_req: Request, res: Response) => {
try {
const since24h = new Date(Date.now() - 24 * 60 * 60 * 1000);

const [activeCount, totalCount, cancelledCount, completedCount, eventsLast24h, indexerState] =
const [activeCount, totalCount, cancelledCount, completedCount, eventsLast24h, indexerState, feeEvents, feesLast24h] =
await Promise.all([
prisma.stream.count({ where: { isActive: true } }),
prisma.stream.count(),
Expand All @@ -43,8 +43,38 @@ router.get('/metrics', async (_req: Request, res: Response) => {
}),
prisma.streamEvent.count({ where: { createdAt: { gte: since24h } } }),
prisma.indexerState.findUnique({ where: { id: 'singleton' } }),
prisma.streamEvent.findMany({
where: { eventType: 'FEE_COLLECTED' },
select: { amount: true, metadata: true },
}),
prisma.streamEvent.findMany({
where: { eventType: 'FEE_COLLECTED', createdAt: { gte: since24h } },
select: { amount: true, metadata: true },
}),
]);

// Aggregate fees by token
const totalFeesCollectedByToken: Record<string, string> = {};
const feesLast24hByToken: Record<string, string> = {};

for (const event of feeEvents) {
const metadata = event.metadata ? JSON.parse(event.metadata) : {};
const token = metadata.token || 'unknown';
const amount = BigInt(event.amount || '0');
totalFeesCollectedByToken[token] = (
BigInt(totalFeesCollectedByToken[token] || '0') + amount
).toString();
}

for (const event of feesLast24h) {
const metadata = event.metadata ? JSON.parse(event.metadata) : {};
const token = metadata.token || 'unknown';
const amount = BigInt(event.amount || '0');
feesLast24hByToken[token] = (
BigInt(feesLast24hByToken[token] || '0') + amount
).toString();
}

const nowSec = Math.floor(Date.now() / 1000);
const lagSeconds = indexerState
? nowSec - Math.floor(indexerState.updatedAt.getTime() / 1000)
Expand All @@ -61,6 +91,10 @@ router.get('/metrics', async (_req: Request, res: Response) => {
},
},
events: { last24h: eventsLast24h },
fees: {
totalFeesCollectedByToken,
feesLast24h: feesLast24hByToken,
},
sse: { activeConnections: sseService.getClientCount() },
indexer: {
lastLedger: indexerState?.lastLedger ?? 0,
Expand Down
43 changes: 43 additions & 0 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ export class SorobanEventWorker {
case 'stream_completed':
await this.handleStreamCompleted(event, topic1);
break;
case 'fee_collected':
await this.handleFeeCollected(event, topic1);
break;
default:
// Unrecognised event β€” ignore silently.
break;
Expand Down Expand Up @@ -554,6 +557,46 @@ export class SorobanEventWorker {
timestamp,
});
}

private async handleFeeCollected(
event: rpc.Api.EventResponse,
streamIdTopic: xdr.ScVal,
): Promise<void> {
const streamId = Number(decodeU64(streamIdTopic));
const body = decodeMap(event.value);

if (!body['treasury'] || !body['fee_amount'] || !body['token']) {
throw new Error(`FeeCollected #${streamId}: missing body fields`);
}

const treasury = decodeAddress(body['treasury']);
const feeAmount = decodeI128(body['fee_amount']);
const token = decodeAddress(body['token']);
const timestamp = Math.floor(Date.now() / 1000);

await prisma.streamEvent.create({
data: {
streamId,
eventType: 'FEE_COLLECTED',
amount: feeAmount,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ treasury, token }),
},
});

// Broadcast to admin channel for treasury reporting
sseService.broadcast('admin', 'stream.fee_collected', {
streamId,
treasury,
feeAmount,
token,
transactionHash: event.txHash,
ledger: event.ledger,
timestamp,
});
}
}

export const sorobanEventWorker = new SorobanEventWorker();
116 changes: 116 additions & 0 deletions frontend/src/app/activity/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"use client";

import React, { useState, useEffect } from 'react';
import { useWallet } from '@/context/wallet-context';
import { BackendStreamEvent } from '@/lib/api-types';
import { fetchUserEvents } from '@/lib/dashboard';
import { ActivityHistory } from '@/components/dashboard/ActivityHistory';
import { downloadCSV } from '@/utils/csvExport';
import { fromStroops } from '@/utils/amount';

type EventFilter = 'All' | 'CREATED' | 'TOPPED_UP' | 'WITHDRAWN' | 'CANCELLED' | 'COMPLETED';

export default function ActivityPage() {
const { session } = useWallet();
const [events, setEvents] = useState<BackendStreamEvent[]>([]);
const [filteredEvents, setFilteredEvents] = useState<BackendStreamEvent[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [activeFilter, setActiveFilter] = useState<EventFilter>('All');

useEffect(() => {
if (session?.publicKey) {
loadEvents();
}
}, [session?.publicKey]);

Check warning on line 24 in frontend/src/app/activity/page.tsx

View workflow job for this annotation

GitHub Actions / Frontend CI

React Hook useEffect has a missing dependency: 'loadEvents'. Either include it or remove the dependency array

useEffect(() => {
if (activeFilter === 'All') {
setFilteredEvents(events);
} else {
setFilteredEvents(events.filter(e => e.eventType === activeFilter));
}
}, [activeFilter, events]);

const loadEvents = async () => {
if (!session?.publicKey) return;
setIsLoading(true);
try {
const data = await fetchUserEvents(session.publicKey);
setEvents(data);
setFilteredEvents(data);
} catch (error) {
console.error('Failed to load events:', error);
} finally {
setIsLoading(false);
}
};

const handleExportCSV = () => {
const csvData = filteredEvents.map(event => ({
'Stream ID': event.streamId,
'Event Type': event.eventType,
'Amount': event.amount ? fromStroops(BigInt(event.amount), 7) : '0',
'Timestamp': new Date(event.timestamp * 1000).toLocaleString(),
'Transaction Hash': event.transactionHash,
'Ledger': event.ledgerSequence,
}));
downloadCSV(csvData, `flowfi-activity-${Date.now()}.csv`);
};

const filters: EventFilter[] = ['All', 'CREATED', 'TOPPED_UP', 'WITHDRAWN', 'CANCELLED', 'COMPLETED'];

if (!session) {
return (
<div className="min-h-screen bg-gradient-to-br from-background via-background to-accent/5 flex items-center justify-center">
<div className="text-center">
<h1 className="text-2xl font-bold text-white mb-4">Connect Your Wallet</h1>
<p className="text-slate-400">Please connect your wallet to view activity history</p>
</div>
</div>
);
}

return (
<div className="min-h-screen bg-gradient-to-br from-background via-background to-accent/5 p-6">
<div className="max-w-6xl mx-auto">
<div className="mb-8">
<h1 className="text-3xl font-bold text-white mb-2">Stream Activity History</h1>
<p className="text-slate-400">View all your stream events and transactions</p>
</div>

<div className="bg-white/5 border border-glass-border rounded-2xl p-6 mb-6">
<div className="flex flex-wrap gap-2 mb-4">
{filters.map(filter => (
<button
key={filter}
onClick={() => setActiveFilter(filter)}
className={`px-4 py-2 rounded-lg text-sm font-medium transition-colors ${
activeFilter === filter
? 'bg-accent text-white'
: 'bg-white/5 text-slate-400 hover:bg-white/10'
}`}
>
{filter === 'All' ? 'All Events' : filter.replace('_', ' ')}
</button>
))}
</div>

<div className="flex justify-between items-center">
<p className="text-sm text-slate-400">
Showing {filteredEvents.length} of {events.length} events
</p>
<button
onClick={handleExportCSV}
disabled={filteredEvents.length === 0}
className="px-4 py-2 bg-accent/10 text-accent rounded-lg text-sm font-medium hover:bg-accent/20 transition-colors disabled:opacity-50 disabled:cursor-not-allowed"
>
Export CSV
</button>
</div>
</div>

<ActivityHistory events={filteredEvents} isLoading={isLoading} />
</div>
</div>
);
}
55 changes: 52 additions & 3 deletions frontend/src/components/NotificationDropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import React, { useState, useEffect } from 'react';
import { BackendStreamEvent } from '@/lib/api-types';
import { fetchUserEvents } from '@/lib/dashboard';
import { Button } from './ui/Button';
import { useStreamEvents } from '@/hooks/useStreamEvents';

interface NotificationDropdownProps {
publicKey: string;
Expand All @@ -12,18 +13,63 @@ export const NotificationDropdown: React.FC<NotificationDropdownProps> = ({ publ
const [isOpen, setIsOpen] = useState(false);
const [events, setEvents] = useState<BackendStreamEvent[]>([]);
const [isLoading, setIsLoading] = useState(false);
const [unreadCount, setUnreadCount] = useState(0);

// Wire up SSE for real-time events
const { events: streamEvents } = useStreamEvents({
userPublicKeys: [publicKey],
autoReconnect: true,
});

useEffect(() => {
if (isOpen && publicKey) {
loadEvents();
setUnreadCount(0); // Clear unread count when dropdown opens
}
}, [isOpen, publicKey]);

// Handle incoming SSE events
useEffect(() => {
if (streamEvents.length > 0 && !isOpen) {
// Increment unread count for new events while dropdown is closed
setUnreadCount(prev => prev + 1);
}

// Prepend live events to notification list
if (streamEvents.length > 0) {
const latestEvent = streamEvents[0];
const newEvent: BackendStreamEvent = {
id: `sse-${Date.now()}`,
streamId: latestEvent.data.streamId || 0,
eventType: mapEventType(latestEvent.type),
amount: latestEvent.data.amount || latestEvent.data.feeAmount || null,
transactionHash: latestEvent.data.transactionHash || '',
ledgerSequence: latestEvent.data.ledger || 0,
timestamp: latestEvent.timestamp / 1000,
metadata: null,
createdAt: new Date().toISOString(),
};

setEvents(prev => [newEvent, ...prev.slice(0, 19)]); // Keep max 20
}
}, [streamEvents, isOpen]);

const mapEventType = (type: string): BackendStreamEvent['eventType'] => {
switch (type) {
case 'created': return 'CREATED';
case 'topped_up': return 'TOPPED_UP';
case 'withdrawn': return 'WITHDRAWN';
case 'cancelled': return 'CANCELLED';
case 'completed': return 'COMPLETED';
default: return 'CREATED';
}
};

const loadEvents = async () => {
setIsLoading(true);
try {
const data = await fetchUserEvents(publicKey);
setEvents(data.slice(0, 5)); // Show only last 5
setEvents(data.slice(0, 20)); // Show last 20
} catch (error) {
console.error(error);
} finally {
Expand All @@ -38,6 +84,7 @@ export const NotificationDropdown: React.FC<NotificationDropdownProps> = ({ publ
case 'TOPPED_UP': return `Topped up #${event.streamId}`;
case 'WITHDRAWN': return `Withdrew ${amount} from #${event.streamId}`;
case 'CANCELLED': return `Cancelled #${event.streamId}`;
case 'COMPLETED': return `Completed #${event.streamId}`;
default: return `Event on #${event.streamId}`;
}
};
Expand All @@ -51,8 +98,10 @@ export const NotificationDropdown: React.FC<NotificationDropdownProps> = ({ publ
<svg className="w-6 h-6" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path strokeLinecap="round" strokeLinejoin="round" strokeWidth="2" d="M15 17h5l-1.405-1.405A2.032 2.032 0 0118 14.158V11a6.002 6.002 0 00-4-5.659V5a2 2 0 10-4 0v.341C7.67 6.165 6 8.388 6 11v3.159c0 .538-.214 1.055-.595 1.436L4 17h5m6 0v1a3 3 0 11-6 0v-1m6 0H9" />
</svg>
{events.length > 0 && (
<span className="absolute top-0 right-0 h-3 w-3 bg-accent rounded-full border-2 border-background"></span>
{unreadCount > 0 && (
<span className="absolute top-0 right-0 h-5 w-5 bg-accent rounded-full border-2 border-background flex items-center justify-center text-xs font-bold text-white">
{unreadCount > 9 ? '9+' : unreadCount}
</span>
)}
</button>

Expand Down
Loading
Loading