Skip to content
Merged

Four #287

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ flowfi/
│ ├── stream_contract/ # Core streaming logic
├── frontend/ # Next.js + Tailwind CSS frontend
├── docs/ # Documentation
│ └── ARCHITECTURE.md # Architecture overview
│ ├── ARCHITECTURE.md # Architecture overview
│ └── DEVELOPMENT.md # Local development guide
```

## Architecture
Expand All @@ -40,6 +41,8 @@ For full local setup and contributor onboarding, see the [Development Guide](doc

## Getting Started

For full step-by-step instructions, see our [Development Guide](docs/DEVELOPMENT.md).

### Prerequisites

- Node.js & npm
Expand Down Expand Up @@ -94,11 +97,12 @@ npm install
npm run dev
```

### Smart Contracts
### Deployment

To build, optimize, and deploy the smart contract to a Stellar network, you can use the automated deployment script:

```bash
cd contracts
cargo build --target wasm32-unknown-unknown --release
./scripts/deploy.sh --network testnet
```

## Deployment
Expand Down
221 changes: 221 additions & 0 deletions backend/src/__tests__/integration/streams.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
import 'dotenv/config';
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import request from 'supertest';
import { nativeToScVal, xdr, StrKey, Keypair } from '@stellar/stellar-sdk';
import app from '../../app.js';
import { prisma } from '../../lib/prisma.js';
import { sorobanEventWorker } from '../../workers/soroban-event-worker.js';
import { sseService } from '../../services/sse.service.js';

describe('Stream Lifecycle Integration Tests', () => {
const senderPair = Keypair.random();
const recipientPair = Keypair.random();
const sender = senderPair.publicKey();
const recipient = recipientPair.publicKey();
const tokenAddress = StrKey.encodeContract(Buffer.alloc(32));
const streamId = 999;
let sseEvents: any[] = [];

beforeAll(async () => {
// Set up a test SSE client
const mockRes = {
write: (chunk: string) => {
const lines = chunk.split('\n');
let eventName = '';
let data: any = null;
for (const line of lines) {
if (line.startsWith('event: ')) eventName = line.slice(7).trim();
if (line.startsWith('data: ')) {
try {
data = JSON.parse(line.slice(6).trim());
} catch (e) {}
}
}
if (eventName && data) {
sseEvents.push({ event: eventName, data });
}
},
on: () => {},
} as any;

sseService.addClient('test-integration-client', mockRes, ['*']);

// Clean up DB before test
await prisma.streamEvent.deleteMany({ where: { streamId } }).catch(() => {});
await prisma.stream.deleteMany({ where: { streamId } }).catch(() => {});
});

afterAll(async () => {
await prisma.streamEvent.deleteMany({ where: { streamId } }).catch(() => {});
await prisma.stream.deleteMany({ where: { streamId } }).catch(() => {});
});

it('Indexer processes stream_created event -> stream appears in GET /v1/streams/{id}', async () => {
sseEvents = [];

const event = {
id: 'created-event-1',
txHash: 'hash-created',
ledger: 100,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('stream_created'),
nativeToScVal(BigInt(streamId), { type: 'u64' }),
],
value: xdr.ScVal.scvMap([
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('sender'),
val: nativeToScVal(sender, { type: 'address' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('recipient'),
val: nativeToScVal(recipient, { type: 'address' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('token_address'),
val: nativeToScVal(tokenAddress, { type: 'address' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('rate_per_second'),
val: nativeToScVal(BigInt(10), { type: 'i128' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('deposited_amount'),
val: nativeToScVal(BigInt(1000), { type: 'i128' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('start_time'),
val: nativeToScVal(BigInt(Math.floor(Date.now() / 1000)), { type: 'u64' }),
}),
]),
} as any;

await sorobanEventWorker.processEvent(event);

// Verify stream appears in GET API
const res = await request(app).get(`/v1/streams/${streamId}`);
expect(res.status).toBe(200);
expect(res.body.streamId).toBe(streamId);
expect(res.body.depositedAmount).toBe('1000');

// Verify SSE
expect(sseEvents.some(e => e.event === 'stream.created')).toBe(true);
});

it('Indexer processes stream_topped_up -> depositedAmount updated in DB', async () => {
sseEvents = [];

const event = {
id: 'topped-up-event-1',
txHash: 'hash-topped-up',
ledger: 101,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('stream_topped_up'),
nativeToScVal(BigInt(streamId), { type: 'u64' }),
],
value: xdr.ScVal.scvMap([
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('amount'),
val: nativeToScVal(BigInt(500), { type: 'i128' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('new_deposited_amount'),
val: nativeToScVal(BigInt(1500), { type: 'i128' }),
}),
]),
} as any;

await sorobanEventWorker.processEvent(event);

const dbStream = await prisma.stream.findUnique({ where: { streamId } });
expect(dbStream?.depositedAmount).toBe('1500');

expect(sseEvents.some(e => e.event === 'stream.topped_up')).toBe(true);
});

it('Indexer processes stream_paused -> isPaused = true', async () => {
sseEvents = [];

const event = {
id: 'paused-event-1',
txHash: 'hash-paused',
ledger: 102,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('stream_paused'),
nativeToScVal(BigInt(streamId), { type: 'u64' }),
],
value: xdr.ScVal.scvMap([]),
} as any;

await sorobanEventWorker.processEvent(event);

const dbStream = await prisma.stream.findUnique({ where: { streamId } });
expect(dbStream?.isPaused).toBe(true);

expect(sseEvents.some(e => e.event === 'stream.paused')).toBe(true);
});

it('Indexer processes stream_resumed -> isPaused = false', async () => {
sseEvents = [];

const event = {
id: 'resumed-event-1',
txHash: 'hash-resumed',
ledger: 103,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('stream_resumed'),
nativeToScVal(BigInt(streamId), { type: 'u64' }),
],
value: xdr.ScVal.scvMap([]),
} as any;

await sorobanEventWorker.processEvent(event);

const dbStream = await prisma.stream.findUnique({ where: { streamId } });
expect(dbStream?.isPaused).toBe(false);

expect(sseEvents.some(e => e.event === 'stream.resumed')).toBe(true);
});

it('Indexer processes stream_cancelled -> stream isActive = false', async () => {
sseEvents = [];

const event = {
id: 'cancelled-event-1',
txHash: 'hash-cancelled',
ledger: 104,
inSuccessfulContractCall: true,
topic: [
xdr.ScVal.scvSymbol('stream_cancelled'),
nativeToScVal(BigInt(streamId), { type: 'u64' }),
],
value: xdr.ScVal.scvMap([
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('amount_withdrawn'),
val: nativeToScVal(BigInt(100), { type: 'i128' }),
}),
new xdr.ScMapEntry({
key: xdr.ScVal.scvSymbol('refunded_amount'),
val: nativeToScVal(BigInt(1400), { type: 'i128' }),
}),
]),
} as any;

await sorobanEventWorker.processEvent(event);

const dbStream = await prisma.stream.findUnique({ where: { streamId } });
expect(dbStream?.isActive).toBe(false);

expect(sseEvents.some(e => e.event === 'stream.cancelled')).toBe(true);
});

it('GET /v1/streams/{id}/events returns all lifecycle events', async () => {
const res = await request(app).get(`/v1/streams/${streamId}/events`);
expect(res.status).toBe(200);
expect(Array.isArray(res.body.data)).toBe(true);
expect(res.body.data.length).toBeGreaterThan(0);
});
});
90 changes: 89 additions & 1 deletion backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export class SorobanEventWorker {
* Dispatch a single contract event to the appropriate handler based on the
* first topic symbol.
*/
private async processEvent(
public async processEvent(
event: rpc.Api.EventResponse,
): Promise<void> {
if (!event.topic || event.topic.length < 2) return;
Expand All @@ -254,6 +254,12 @@ export class SorobanEventWorker {
case 'tokens_withdrawn':
await this.handleTokensWithdrawn(event, topic1);
break;
case 'stream_paused':
await this.handleStreamPaused(event, topic1);
break;
case 'stream_resumed':
await this.handleStreamResumed(event, topic1);
break;
case 'stream_cancelled':
await this.handleStreamCancelled(event, topic1);
break;
Expand Down Expand Up @@ -426,6 +432,88 @@ export class SorobanEventWorker {
});
}

private async handleStreamPaused(
event: rpc.Api.EventResponse,
streamIdTopic: xdr.ScVal,
): Promise<void> {
const streamId = Number(decodeU64(streamIdTopic));
const timestamp = Math.floor(Date.now() / 1000);

await prisma.$transaction(async (tx: any) => {
await tx.stream.update({
where: { streamId },
data: {
isPaused: true,
lastPausedAt: timestamp,
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'PAUSED',
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
},
});
});

sseService.broadcastToStream(String(streamId), 'stream.paused', {
streamId,
isPaused: true,
transactionHash: event.txHash,
ledger: event.ledger,
timestamp,
});
}

private async handleStreamResumed(
event: rpc.Api.EventResponse,
streamIdTopic: xdr.ScVal,
): Promise<void> {
const streamId = Number(decodeU64(streamIdTopic));
const timestamp = Math.floor(Date.now() / 1000);

await prisma.$transaction(async (tx: any) => {
const stream = await tx.stream.findUniqueOrThrow({
where: { streamId },
select: { totalPausedSeconds: true, lastPausedAt: true },
});

const lastPausedAt = stream.lastPausedAt ?? timestamp;
const pausedDuration = Math.max(0, timestamp - lastPausedAt);
const nextTotalPausedSeconds = stream.totalPausedSeconds + pausedDuration;

await tx.stream.update({
where: { streamId },
data: {
isPaused: false,
totalPausedSeconds: nextTotalPausedSeconds,
lastPausedAt: null,
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'RESUMED',
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
},
});
});

sseService.broadcastToStream(String(streamId), 'stream.resumed', {
streamId,
isPaused: false,
transactionHash: event.txHash,
ledger: event.ledger,
timestamp,
});
}

private async handleTokensWithdrawn(
event: rpc.Api.EventResponse,
streamIdTopic: xdr.ScVal,
Expand Down
2 changes: 1 addition & 1 deletion backend/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export default defineConfig({
environment: 'node',
globals: true,
setupFiles: [],
include: ['tests/**/*.{test,spec}.ts'],
include: ['tests/**/*.{test,spec}.ts', 'src/__tests__/**/*.{test,spec}.ts'],
coverage: {
reporter: ['text', 'json', 'html'],
},
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
POSTGRES_PASSWORD: flowfi_dev_password
POSTGRES_DB: flowfi
ports:
- "5432:5432"
- "5433:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
Expand Down
Loading
Loading