Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,30 @@ jobs:
- name: Run Contract Tests
run: cargo test
working-directory: contracts

- name: Install Stellar CLI
run: |
curl -fsSL https://github.com/stellar/stellar-cli/raw/main/install.sh | sh -s -- --install-deps
shell: bash

- name: Optimize WASM files
run: |
set -euo pipefail
WASMS=$(find contracts/target -type f -name "*.wasm" -print)
if [ -z "$WASMS" ]; then
echo "No wasm files found"
exit 1
fi
for w in $WASMS; do
out="${w%%.wasm}.optimized.wasm"
echo "Optimizing $w -> $out"
stellar contract optimize --wasm "$w" --wasm-out "$out"
done
shell: bash

- name: Upload optimized WASM artifacts
uses: actions/upload-artifact@v4
with:
name: optimized-wasm
path: |
contracts/target/**/**/*.optimized.wasm
2 changes: 2 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ DATABASE_URL="postgresql://user:password@localhost:5432/flowfi?schema=public"
PORT=3001
NODE_ENV=development
CORS_ALLOWED_ORIGINS="https://app.flowfi.xyz,https://flowfi.xyz"
# Comma-separated list of allowed origins for CORS. In development, if unset,
# defaults to http://localhost:3000

# Stellar Network (Testnet/Mainnet)
STELLAR_NETWORK=testnet
Expand Down
23 changes: 17 additions & 6 deletions backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import healthRoutes from './routes/health.routes.js';

const app = express();
const isProduction = process.env.NODE_ENV === 'production';
const allowedOrigins = (process.env.CORS_ALLOWED_ORIGINS ?? '')
const rawCors = process.env.CORS_ALLOWED_ORIGINS ?? '';
const allowedOrigins = rawCors
.split(',')
.map((origin) => origin.trim())
.filter(Boolean);

// Default in development to only localhost:3000 (frontend dev server)
if (!process.env.CORS_ALLOWED_ORIGINS && !isProduction) {
allowedOrigins.push('http://localhost:3000');
}

// Apply global rate limiter first
app.use(globalRateLimiter);

Expand All @@ -37,11 +43,6 @@ app.use((req: Request, res: Response, next: NextFunction) => {

app.use(cors({
origin(origin, callback) {
if (!isProduction) {
callback(null, true);
return;
}

// Allow non-browser clients (no Origin header)
if (!origin) {
callback(null, true);
Expand All @@ -53,10 +54,20 @@ app.use(cors({
return;
}

// Not allowed
callback(new Error('CORS origin not allowed'));
},
credentials: true,
}));

// Convert CORS errors into 403 responses so callers get a clear status code
app.use((err: any, req: Request, res: Response, next: NextFunction) => {
if (err && err.message === 'CORS origin not allowed') {
res.status(403).json({ error: 'CORS origin not allowed' });
return;
}
next(err);
});
app.use(express.json());

// Sandbox mode detection (before versioning)
Expand Down
1 change: 1 addition & 0 deletions backend/src/controllers/sse.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const subscribeSchema = z.object({
all: z.boolean().optional().default(false),
});


function getClientIp(req: Request): string {
const forwarded = req.headers['x-forwarded-for'];
if (typeof forwarded === 'string' && forwarded.trim().length > 0) {
Expand Down
17 changes: 12 additions & 5 deletions backend/src/lib/redis.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Redis } from 'ioredis';
import type { Redis } from 'ioredis';
import RedisClass from 'ioredis';
import logger from '../logger.js';

const REDIS_URL = process.env.REDIS_URL;
Expand All @@ -20,9 +21,10 @@ export function isRedisAvailable(): boolean {
}

function makeClient(url: string): Redis {
return new Redis(url, {
return new RedisClass(url, {
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => (times > 3 ? null : Math.min(times * 200, 2000)),
retryStrategy: (times: number) =>
times > 3 ? null : Math.min(times * 200, 2000),
enableOfflineQueue: false,
lazyConnect: true,
});
Expand All @@ -42,9 +44,14 @@ export async function connectRedis(): Promise<void> {
_publisher = publisher;
_subscriber = subscriber;
_available = true;

logger.info('[Redis] Connected β€” horizontal SSE scaling enabled.');
} catch (err) {
logger.warn('[Redis] Connection failed β€” falling back to single-instance SSE mode:', err);
logger.warn(
'[Redis] Connection failed β€” falling back to single-instance SSE mode:',
err
);

_publisher?.disconnect();
_subscriber?.disconnect();
_publisher = null;
Expand All @@ -58,4 +65,4 @@ export async function disconnectRedis(): Promise<void> {
_publisher = null;
_subscriber = null;
_available = false;
}
}
10 changes: 6 additions & 4 deletions backend/src/routes/v1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import streamRoutes from './stream.routes.js';
import eventsRoutes from './events.routes.js';
import userRoutes from './user.routes.js';
import authRoutes from './auth.routes.js';
import v1AdminRoutes from './admin.routes.js';
import adminRoutes from './admin.routes.js';
import adminMetricsRoutes from '../adminRoutes.js';

const router = Router();
Expand All @@ -13,7 +13,9 @@ router.use('/streams', streamRoutes);
router.use('/events', eventsRoutes);
router.use('/users', userRoutes);
router.use('/auth', authRoutes);
router.use('/admin', v1AdminRoutes);
router.use('/admin', adminMetricsRoutes);

export default router;
// Admin routes
router.use('/admin', adminRoutes);
router.use('/admin/metrics', adminMetricsRoutes);

export default router;
44 changes: 35 additions & 9 deletions backend/src/services/sorobanService.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { rpc, xdr, StrKey, Contract, Address, nativeToScVal } from '@stellar/stellar-sdk';
import { rpc, xdr, StrKey, Contract, nativeToScVal } from '@stellar/stellar-sdk';
import logger from '../logger.js';

const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org';
Expand Down Expand Up @@ -45,29 +45,53 @@ function decodeMap(val: xdr.ScVal): Record<string, xdr.ScVal> {

async function simulateContractCall(method: string, args: xdr.ScVal[]): Promise<xdr.ScVal> {
const contract = new Contract(CONTRACT_ID);

const op = contract.call(method, ...args);
const tx = new (await import('@stellar/stellar-sdk')).TransactionBuilder(
new (await import('@stellar/stellar-sdk')).Account('GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN', '0'),
{ fee: '100', networkPassphrase: process.env.STELLAR_NETWORK === 'mainnet'
? (await import('@stellar/stellar-sdk')).Networks.PUBLIC
: (await import('@stellar/stellar-sdk')).Networks.TESTNET }
).addOperation(op).setTimeout(30).build();

const { TransactionBuilder, Account, Networks } = await import('@stellar/stellar-sdk');

const tx = new TransactionBuilder(
new Account(
'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN',
'0'
),
{
fee: '100',
networkPassphrase:
process.env.STELLAR_NETWORK === 'mainnet'
? Networks.PUBLIC
: Networks.TESTNET,
}
)
.addOperation(op)
.setTimeout(30)
.build();

const result = await server.simulateTransaction(tx);

if (rpc.Api.isSimulationError(result)) {
throw new Error(`Simulation error: ${result.error}`);
}

const simSuccess = result as rpc.Api.SimulateTransactionSuccessResponse;
return simSuccess.result!.retval;
}

export async function getStreamFromChain(streamId: number): Promise<ChainStream | null> {
if (!CONTRACT_ID) return null;

try {
const retval = await simulateContractCall('get_stream', [
nativeToScVal(streamId, { type: 'u64' }),
]);

const fields = decodeMap(retval);

const isActiveVal = fields['is_active']!;
const isActive =
isActiveVal.switch().value === xdr.ScValType.scvBool().value &&
isActiveVal.b() === true;

return {
streamId,
sender: decodeAddress(fields['sender']!),
Expand All @@ -77,7 +101,7 @@ export async function getStreamFromChain(streamId: number): Promise<ChainStream
depositedAmount: decodeI128(fields['deposited_amount']!),
withdrawnAmount: decodeI128(fields['withdrawn_amount']!),
startTime: Number(fields['start_time']!.u64().toString()),
isActive: fields['is_active']!.switch().value === xdr.ScValType.scvBool().value && fields['is_active']!.b() === true,
isActive,
};
} catch (err) {
logger.error(`[SorobanService] getStreamFromChain(${streamId}) failed:`, err);
Expand All @@ -87,10 +111,12 @@ export async function getStreamFromChain(streamId: number): Promise<ChainStream

export async function getClaimableFromChain(streamId: number): Promise<string | null> {
if (!CONTRACT_ID) return null;

try {
const retval = await simulateContractCall('get_claimable_amount', [
nativeToScVal(streamId, { type: 'u64' }),
]);

return decodeI128(retval);
} catch (err) {
logger.error(`[SorobanService] getClaimableFromChain(${streamId}) failed:`, err);
Expand All @@ -101,4 +127,4 @@ export async function getClaimableFromChain(streamId: number): Promise<string |
/** Returns true when the DB record is older than STALE_THRESHOLD_MS. */
export function isStale(updatedAt: Date): boolean {
return Date.now() - updatedAt.getTime() > STALE_THRESHOLD_MS;
}
}
10 changes: 7 additions & 3 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ export class SorobanEventWorker {
if (this.activeBatch) await this.activeBatch;
}

/** Trigger an immediate poll cycle (used for replay functionality). */
/** Trigger an immediate poll cycle (used for replay and manual updates). */
async triggerPoll(): Promise<void> {
if (!this.isRunning) return;
await this.fetchAndProcessEvents().catch((err) => {

try {
await this.fetchAndProcessEvents();
} catch (err) {
logger.error('[SorobanWorker] Manual poll error:', err);
});
}
}
}

// ─── Internal ──────────────────────────────────────────────────────────────
Expand Down
15 changes: 15 additions & 0 deletions backend/tests/cors.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { describe, it, expect } from 'vitest';
import request from 'supertest';
import app from '../src/app.js';

describe('CORS middleware', () => {
it('returns 403 for non-whitelisted origin', async () => {
const response = await request(app)
.get('/')
.set('Origin', 'https://evil.example')
.set('Accept', 'text/plain');

expect(response.status).toBe(403);
expect(response.body.error).toBe('CORS origin not allowed');
});
});
14 changes: 13 additions & 1 deletion docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@

This document explains how FlowFi moves data from on-chain contract events into API responses and real-time frontend updates.

## High-Level Pipeline
# FlowFi Architecture

This document explains how FlowFi moves data from on-chain contract events into API responses and real-time frontend updates.

## High-Level Overview

```mermaid
flowchart LR
Contract[Stream Contract (Soroban WASM)] --> Indexer[Soroban Event Indexer]
Indexer --> DB[(Postgres DB)]
DB --> API[Backend API (Express + SSE)]
API --> UI[Frontend (Next.js)]
UI --> API

```mermaid
flowchart LR
Expand Down
Loading
Loading