Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Contributing to StellarStream

Thank you for your interest in contributing to StellarStream! This guide will help you get started with our development process.

## Development Setup

1. Clone the repository
2. Install dependencies: `npm run install:all`
3. Run the development environment: `npm run dev`

## Testing

### Backend Tests
Run `npm run test` in the `backend/` directory.

### Contract Tests
Run `cargo test` in the `contracts/` directory.

#### Snapshot Testing
We use `insta` for snapshot testing of contract events.
Snapshot files are located in `contracts/test_snapshots/`.

**To update snapshots:**
If you change event structures and need to update the snapshots, run:
```bash
cargo insta review
```
This will allow you to interactively review and accept changes to the snapshots.

## Pull Request Process

1. Create a feature branch from `main`.
2. Ensure all tests pass.
3. Update documentation if necessary.
4. Submit a PR and wait for review.
2 changes: 1 addition & 1 deletion backend/src/config/validateEnv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const envSchema = z.object({
DB_PATH: z.string().optional().default("backend/data/streams.db"),
WEBHOOK_DESTINATION_URL: z.string().optional(),
WEBHOOK_SIGNING_SECRET: z.string().optional(),
JWT_SECRET: z.string().optional().default("default_local_dev_secret_key"),
JWT_SECRET: z.string().optional(),
SERVER_SIGNING_KEY: z.string().optional(),
DOMAIN: z.string().optional().default("localhost"),
SOROBAN_DISABLED: z.string().optional(),
Expand Down
76 changes: 76 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import {
listStreams,
listStreamsByRecipient,
listStreamsBySender,
pauseStream,
refreshStreamStatuses,
resumeStream,
StreamStatus,
syncStreams,
updateStreamStartAt,
Expand Down Expand Up @@ -677,6 +679,80 @@ app.patch(
},
);

app.patch(
"/api/streams/:id/pause",
authMiddleware,
async (req: Request, res: Response) => {
const parsedId = parseStreamId(req.params.id);
if (!parsedId.ok) {
sendValidationError(req, res, parsedId.issues);
return;
}

const stream = getStream(parsedId.value);
if (!stream) {
sendApiError(req, res, 404, "Stream not found.", { code: "NOT_FOUND" });
return;
}

const user = (req as any).user;
if (stream.sender !== user.accountId) {
sendApiError(req, res, 403, "Only the sender can pause this stream.", {
code: "FORBIDDEN",
});
return;
}

try {
const pausedStream = await pauseStream(parsedId.value);
res.json({ data: { ...pausedStream, progress: calculateProgress(pausedStream!) } });
} catch (error: any) {
console.error("Failed to pause stream:", error);
const normalizedError = normalizeUnknownApiError(error, "Failed to pause stream.");
sendApiError(req, res, normalizedError.statusCode, normalizedError.message, {
code: normalizedError.code ?? "INTERNAL_ERROR",
});
}
},
);

app.patch(
"/api/streams/:id/resume",
authMiddleware,
async (req: Request, res: Response) => {
const parsedId = parseStreamId(req.params.id);
if (!parsedId.ok) {
sendValidationError(req, res, parsedId.issues);
return;
}

const stream = getStream(parsedId.value);
if (!stream) {
sendApiError(req, res, 404, "Stream not found.", { code: "NOT_FOUND" });
return;
}

const user = (req as any).user;
if (stream.sender !== user.accountId) {
sendApiError(req, res, 403, "Only the sender can resume this stream.", {
code: "FORBIDDEN",
});
return;
}

try {
const resumedStream = await resumeStream(parsedId.value);
res.json({ data: { ...resumedStream, progress: calculateProgress(resumedStream!) } });
} catch (error: any) {
console.error("Failed to resume stream:", error);
const normalizedError = normalizeUnknownApiError(error, "Failed to resume stream.");
sendApiError(req, res, normalizedError.statusCode, normalizedError.message, {
code: normalizedError.code ?? "INTERNAL_ERROR",
});
}
},
);

app.get("/api/streams/:id/history", (req: Request, res: Response) => {
const parsedId = parseStreamId(req.params.id);
if (!parsedId.ok) {
Expand Down
17 changes: 16 additions & 1 deletion backend/src/services/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
WebAuth,
} from "@stellar/stellar-sdk";
import jwt from "jsonwebtoken";
import crypto from "crypto";
import { Request, Response, NextFunction } from "express";
import { sendApiError } from "../apiErrors";

Expand All @@ -16,8 +17,22 @@ const SERVER_SIGNING_KEY =
const DOMAIN = (process.env.DOMAIN || "localhost").trim();
const NETWORK_PASSPHRASE = process.env.NETWORK_PASSPHRASE || Networks.TESTNET;

let jwtSecret = process.env.JWT_SECRET;

if (!jwtSecret) {
if (process.env.NODE_ENV === "production") {
throw new Error("JWT_SECRET must be set in production");
}

jwtSecret = crypto.randomBytes(32).toString("hex");

console.warn(
"JWT_SECRET not set β€” using ephemeral secret. All tokens will be invalidated on restart.",
);
}

function getJwtSecret() {
return process.env.JWT_SECRET || "default_local_dev_secret_key";
return jwtSecret as string;
}

export interface AuthUser {
Expand Down
8 changes: 6 additions & 2 deletions backend/src/services/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ function migrate(): void {
canceled_at INTEGER,
completed_at INTEGER,
refunded_amount REAL,
archived_at INTEGER
archived_at INTEGER,
paused_at INTEGER,
paused_duration INTEGER NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS stream_archive (
Expand All @@ -56,7 +58,9 @@ function migrate(): void {
canceled_at INTEGER,
completed_at INTEGER,
refunded_amount REAL,
archived_at INTEGER NOT NULL
archived_at INTEGER NOT NULL,
paused_at INTEGER,
paused_duration INTEGER NOT NULL DEFAULT 0
);

CREATE TABLE IF NOT EXISTS stream_events (
Expand Down
131 changes: 125 additions & 6 deletions backend/src/services/streamStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ export interface StreamRecord {
canceledAt?: number;
completedAt?: number;
refundedAmount?: number;
pausedAt?: number;
pausedDuration: number;
}

export interface StreamProgress {
Expand All @@ -63,6 +65,8 @@ interface StreamRow {
completed_at: number | null;
refunded_amount: number | null;
archived_at: number | null;
paused_at: number | null;
paused_duration: number;
}

function rowToRecord(row: StreamRow): StreamRecord {
Expand All @@ -78,15 +82,17 @@ function rowToRecord(row: StreamRow): StreamRecord {
canceledAt: row.canceled_at ?? undefined,
completedAt: row.completed_at ?? undefined,
refundedAmount: row.refunded_amount ?? undefined,
pausedAt: row.paused_at ?? undefined,
pausedDuration: row.paused_duration,
};
}

function upsertStream(record: StreamRecord): void {
const db = getDb();
db.prepare(
`
INSERT INTO streams (id, sender, recipient, asset_code, total_amount, duration_seconds, start_at, created_at, canceled_at, completed_at, refunded_amount, archived_at)
VALUES (@id, @sender, @recipient, @assetCode, @totalAmount, @durationSeconds, @startAt, @createdAt, @canceledAt, @completedAt, @refundedAmount, @archivedAt)
INSERT INTO streams (id, sender, recipient, asset_code, total_amount, duration_seconds, start_at, created_at, canceled_at, completed_at, refunded_amount, archived_at, paused_at, paused_duration)
VALUES (@id, @sender, @recipient, @assetCode, @totalAmount, @durationSeconds, @startAt, @createdAt, @canceledAt, @completedAt, @refundedAmount, @archivedAt, @pausedAt, @pausedDuration)
ON CONFLICT(id) DO UPDATE SET
sender = excluded.sender,
recipient = excluded.recipient,
Expand All @@ -98,7 +104,9 @@ function upsertStream(record: StreamRecord): void {
canceled_at = excluded.canceled_at,
completed_at = excluded.completed_at,
refunded_amount = excluded.refunded_amount,
archived_at = excluded.archived_at
archived_at = excluded.archived_at,
paused_at = excluded.paused_at,
paused_duration = excluded.paused_duration
`,
).run({
id: record.id,
Expand All @@ -113,6 +121,8 @@ function upsertStream(record: StreamRecord): void {
completedAt: record.completedAt ?? null,
refundedAmount: record.refundedAmount ?? null,
archivedAt: null,
pausedAt: record.pausedAt ?? null,
pausedDuration: record.pausedDuration ?? 0,
});
}

Expand Down Expand Up @@ -354,6 +364,9 @@ function computeStatus(stream: StreamRecord, at: number): StreamStatus {
if (at >= stream.startAt + stream.durationSeconds) {
return "completed";
}
if (stream.pausedAt !== undefined) {
return "active"; // Or could be a "paused" status if we want to add it
}
return "active";
}

Expand All @@ -362,11 +375,19 @@ export function calculateProgress(
at = nowInSeconds(),
): StreamProgress {
const streamEnd = stream.startAt + stream.durationSeconds;

// Calculate paused duration including current pause if active
let pausedDuration = stream.pausedDuration;
if (stream.pausedAt !== undefined) {
pausedDuration += Math.max(0, at - stream.pausedAt);
}

const effectiveEnd =
stream.canceledAt !== undefined
? Math.min(stream.canceledAt, streamEnd)
: streamEnd;
const elapsed = Math.max(0, Math.min(at, effectiveEnd) - stream.startAt);
? Math.min(stream.canceledAt, streamEnd + pausedDuration)
: streamEnd + pausedDuration;

const elapsed = Math.max(0, Math.min(at, effectiveEnd) - stream.startAt - pausedDuration);
const ratio = Math.min(1, elapsed / stream.durationSeconds);
const vestedAmount = stream.totalAmount * ratio;

Expand Down Expand Up @@ -623,6 +644,104 @@ export async function createStream(input: StreamInput): Promise<StreamRecord> {
return stream;
}

export async function pauseStream(id: string): Promise<StreamRecord | undefined> {
const stream = getStream(id);
if (!stream || stream.pausedAt !== undefined || stream.canceledAt !== undefined || stream.completedAt !== undefined) {
return stream;
}

const sorobanContext = getSorobanContext();
if (sorobanContext && rpcServer && serverKeypair) {
const sourceAccount = await rpcServer.getAccount(serverKeypair.publicKey());
const tx = sorobanContext.contract.call(
"pause_stream",
nativeToScVal(parseInt(id), { type: "u64" }),
);

const built = await rpcServer.prepareTransaction(
new TransactionBuilder(sourceAccount, {
fee: "1000",
networkPassphrase: process.env.NETWORK_PASSPHRASE || Networks.TESTNET,
})
.addOperation(tx)
.setTimeout(30)
.build(),
);

built.sign(serverKeypair);
const sendRes = await retryWithBackoff(() => rpcServer!.sendTransaction(built));
if (sendRes.status === "PENDING") {
let txResult;
let attempts = 0;
while (attempts < 10) {
txResult = await retryWithBackoff(() => rpcServer!.getTransaction(sendRes.hash));
if (txResult.status !== "NOT_FOUND") break;
await new Promise((r) => setTimeout(r, 1000));
attempts++;
}
}
}

const now = nowInSeconds();
const db = getDb();
db.prepare("UPDATE streams SET paused_at = ? WHERE id = ?").run(now, id);

invalidateCache(`stream:${id}`);
return getStream(id);
}

export async function resumeStream(id: string): Promise<StreamRecord | undefined> {
const stream = getStream(id);
if (!stream || stream.pausedAt === undefined) {
return stream;
}

const sorobanContext = getSorobanContext();
if (sorobanContext && rpcServer && serverKeypair) {
const sourceAccount = await rpcServer.getAccount(serverKeypair.publicKey());
const tx = sorobanContext.contract.call(
"resume_stream",
nativeToScVal(parseInt(id), { type: "u64" }),
);

const built = await rpcServer.prepareTransaction(
new TransactionBuilder(sourceAccount, {
fee: "1000",
networkPassphrase: process.env.NETWORK_PASSPHRASE || Networks.TESTNET,
})
.addOperation(tx)
.setTimeout(30)
.build(),
);

built.sign(serverKeypair);
const sendRes = await retryWithBackoff(() => rpcServer!.sendTransaction(built));
if (sendRes.status === "PENDING") {
let txResult;
let attempts = 0;
while (attempts < 10) {
txResult = await retryWithBackoff(() => rpcServer!.getTransaction(sendRes.hash));
if (txResult.status !== "NOT_FOUND") break;
await new Promise((r) => setTimeout(r, 1000));
attempts++;
}
}
}

const now = nowInSeconds();
const additionalPausedDuration = Math.max(0, now - stream.pausedAt);
const newTotalPausedDuration = stream.pausedDuration + additionalPausedDuration;

const db = getDb();
db.prepare("UPDATE streams SET paused_at = NULL, paused_duration = ? WHERE id = ?").run(
newTotalPausedDuration,
id,
);

invalidateCache(`stream:${id}`);
return getStream(id);
}

export function refreshStreamStatuses(): number {
const db = getDb();
const now = nowInSeconds();
Expand Down
Loading
Loading