Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion backend/src/services/soroban-indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ interface RpcResponse {
};
}

type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN';
type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN' | 'COMPLETED';

const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org';
const POLL_MS = Number(process.env.SOROBAN_INDEXER_POLL_MS ?? 15000);
Expand Down Expand Up @@ -112,6 +112,7 @@ export class SorobanIndexerService {
if (firstTopic.includes('stream_created')) return 'CREATED';
if (firstTopic.includes('stream_cancelled')) return 'CANCELLED';
if (firstTopic.includes('tokens_withdrawn')) return 'WITHDRAWN';
if (firstTopic.includes('stream_completed')) return 'COMPLETED';
return null;
}

Expand Down Expand Up @@ -222,6 +223,11 @@ export class SorobanIndexerService {
},
});
}
} else if (eventType === 'COMPLETED') {
await prisma.stream.updateMany({
where: { streamId },
data: { isActive: false, lastUpdateTime: timestamp },
});
}

await prisma.streamEvent.create({
Expand Down
51 changes: 51 additions & 0 deletions backend/src/workers/soroban-event-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ export class SorobanEventWorker {
case 'stream_cancelled':
await this.handleStreamCancelled(event, topic1);
break;
case 'stream_completed':
await this.handleStreamCompleted(event, topic1);
break;
default:
// Unrecognised event — ignore silently.
break;
Expand Down Expand Up @@ -483,6 +486,54 @@ export class SorobanEventWorker {
timestamp,
});
}

private async handleStreamCompleted(
event: rpc.Api.EventResponse,
streamIdTopic: xdr.ScVal,
): Promise<void> {
const streamId = Number(decodeU64(streamIdTopic));
const body = decodeMap(event.value);

if (!body['recipient'] || !body['total_withdrawn']) {
throw new Error(`StreamCompleted #${streamId}: missing body fields`);
}

const recipient = decodeAddress(body['recipient']);
const totalWithdrawn = decodeI128(body['total_withdrawn']);
const timestamp = Math.floor(Date.now() / 1000);

await prisma.$transaction(async (tx: any) => {
await tx.stream.update({
where: { streamId },
data: {
isActive: false,
withdrawnAmount: totalWithdrawn,
lastUpdateTime: timestamp,
},
});

await tx.streamEvent.create({
data: {
streamId,
eventType: 'COMPLETED',
amount: totalWithdrawn,
transactionHash: event.txHash,
ledgerSequence: event.ledger,
timestamp,
metadata: JSON.stringify({ recipient }),
},
});
});

sseService.broadcastToStream(String(streamId), 'stream.completed', {
streamId,
recipient,
totalWithdrawn,
transactionHash: event.txHash,
ledger: event.ledger,
timestamp,
});
}
}

export const sorobanEventWorker = new SorobanEventWorker();
33 changes: 33 additions & 0 deletions contracts/stream_contract/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,36 @@ pub struct FeeCollectedEvent {
pub fee_amount: i128,
pub token: Address,
}

/// Emitted when a sender pauses an active stream.
///
/// Topic: `("stream_paused", stream_id)`
#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StreamPausedEvent {
pub stream_id: u64,
pub sender: Address,
pub paused_at: u64,
}

/// Emitted when a sender resumes a paused stream.
///
/// Topic: `("stream_resumed", stream_id)`
#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StreamResumedEvent {
pub stream_id: u64,
pub sender: Address,
pub new_end_time: u64,
}

/// Emitted when a stream is fully drained on the final withdrawal.
///
/// Topic: `("stream_completed", stream_id)`
#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct StreamCompletedEvent {
pub stream_id: u64,
pub recipient: Address,
pub total_withdrawn: i128,
}
137 changes: 120 additions & 17 deletions contracts/stream_contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ use soroban_sdk::{contract, contractimpl, token, vec, Address, Env, InvokeError,

use errors::StreamError;
use events::{
FeeCollectedEvent, StreamCancelledEvent, StreamCreatedEvent, StreamToppedUpEvent,
TokensWithdrawnEvent,
FeeCollectedEvent, StreamCancelledEvent, StreamCompletedEvent, StreamCreatedEvent,
StreamPausedEvent, StreamResumedEvent, StreamToppedUpEvent, TokensWithdrawnEvent,
};
use storage::{
config_exists, load_config, load_stream, next_stream_id, save_config, save_stream,
try_load_config, try_load_stream,
};
use types::{ProtocolConfig, Stream};
use types::{ProtocolConfig, Stream, StreamStatus};

/// Maximum allowed protocol fee: 1 000 bps = 10%.
const MAX_FEE_RATE_BPS: u32 = 1_000;
Expand Down Expand Up @@ -157,6 +157,9 @@ impl StreamContract {
start_time,
last_update_time: start_time,
is_active: true,
paused: false,
paused_at: None,
status: StreamStatus::Active,
},
);

Expand Down Expand Up @@ -248,17 +251,15 @@ impl StreamContract {

/// Calculate the claimable amount for a stream at a given timestamp.
///
/// This helper computes how many tokens have been streamed since the last
/// update, capped at the remaining balance to prevent over-withdrawal.
///
/// # Arguments
/// * `stream` - The stream to calculate claimable amount for
/// * `now` - Current ledger timestamp
///
/// # Returns
/// The amount of tokens that can be claimed, never exceeding remaining balance
/// Excludes any time the stream was paused. If the stream is currently
/// paused, accrual stops at `paused_at`.
fn calculate_claimable(stream: &Stream, now: u64) -> i128 {
let elapsed = now.saturating_sub(stream.last_update_time);
let effective_now = if stream.paused {
stream.paused_at.unwrap_or(stream.last_update_time)
} else {
now
};
let elapsed = effective_now.saturating_sub(stream.last_update_time);

let streamed = (elapsed as i128)
.checked_mul(stream.rate_per_second)
Expand Down Expand Up @@ -315,9 +316,10 @@ impl StreamContract {
stream.withdrawn_amount += amount;
stream.last_update_time = now;

// Mark stream as inactive if fully drained
// Mark stream as inactive and completed if fully drained
if stream.withdrawn_amount >= stream.deposited_amount {
stream.is_active = false;
stream.status = StreamStatus::Completed;
}
}

Expand All @@ -342,8 +344,11 @@ impl StreamContract {
return Err(StreamError::Unauthorized);
}

// Validate stream is active
// Validate stream is active and not paused
Self::validate_stream_active(&stream)?;
if stream.paused {
return Err(StreamError::StreamInactive);
}

let now = env.ledger().timestamp();
let claimable = Self::calculate_claimable(&stream, now);
Expand All @@ -355,19 +360,31 @@ impl StreamContract {
// Use helper function to transfer tokens and update state
Self::transfer_and_update_stream(&env, &mut stream, &recipient, claimable, now);

let completed = stream.status == StreamStatus::Completed;
save_stream(&env, stream_id, &stream);

// Emit withdrawal event
env.events().publish(
(Symbol::new(&env, "tokens_withdrawn"), stream_id),
TokensWithdrawnEvent {
stream_id,
recipient,
recipient: recipient.clone(),
amount: claimable,
timestamp: stream.last_update_time,
},
);

// Emit COMPLETED event on final withdrawal
if completed {
env.events().publish(
(Symbol::new(&env, "stream_completed"), stream_id),
StreamCompletedEvent {
stream_id,
recipient,
total_withdrawn: stream.withdrawn_amount,
},
);
}

Ok(claimable)
}

Expand Down Expand Up @@ -413,6 +430,7 @@ impl StreamContract {

// Mark stream as inactive
stream.is_active = false;
stream.status = StreamStatus::Cancelled;
stream.last_update_time = now;

let recipient = stream.recipient.clone();
Expand All @@ -435,13 +453,98 @@ impl StreamContract {
Ok(())
}

/// Pause an active stream. Only the sender may pause.
///
/// # Errors
/// - `StreamNotFound` — no stream exists with `stream_id`.
/// - `Unauthorized` — caller is not the stream's sender.
/// - `StreamInactive` — stream is already inactive.
pub fn pause_stream(env: Env, sender: Address, stream_id: u64) -> Result<(), StreamError> {
sender.require_auth();

let mut stream = load_stream(&env, stream_id)?;
Self::validate_stream_ownership(&stream, &sender)?;
Self::validate_stream_active(&stream)?;

if stream.paused {
return Err(StreamError::StreamInactive);
}

let now = env.ledger().timestamp();
stream.paused = true;
stream.paused_at = Some(now);
stream.status = StreamStatus::Paused;
save_stream(&env, stream_id, &stream);

env.events().publish(
(Symbol::new(&env, "stream_paused"), stream_id),
StreamPausedEvent { stream_id, sender, paused_at: now },
);

Ok(())
}

/// Resume a paused stream. Adjusts `end_time` by the pause duration.
///
/// The `last_update_time` is advanced to `now` so that accrual resumes
/// from the current moment, effectively extending the stream by the
/// duration it was paused.
///
/// # Errors
/// - `StreamNotFound` — no stream exists with `stream_id`.
/// - `Unauthorized` — caller is not the stream's sender.
/// - `StreamInactive` — stream is not paused (already active or cancelled).
pub fn resume_stream(env: Env, sender: Address, stream_id: u64) -> Result<u64, StreamError> {
sender.require_auth();

let mut stream = load_stream(&env, stream_id)?;
Self::validate_stream_ownership(&stream, &sender)?;

if !stream.paused {
return Err(StreamError::StreamInactive);
}

let now = env.ledger().timestamp();
let paused_at = stream.paused_at.unwrap_or(now);
let pause_duration = now.saturating_sub(paused_at);

// Advance last_update_time by pause duration so accrual resumes from now.
stream.last_update_time = stream.last_update_time.saturating_add(pause_duration);
// new_end_time represents when the stream will fully drain from now.
let remaining = stream.deposited_amount.saturating_sub(stream.withdrawn_amount);
let new_end_time = if stream.rate_per_second > 0 {
now + (remaining / stream.rate_per_second) as u64
} else {
now
};

stream.paused = false;
stream.paused_at = None;
stream.status = StreamStatus::Active;
save_stream(&env, stream_id, &stream);

env.events().publish(
(Symbol::new(&env, "stream_resumed"), stream_id),
StreamResumedEvent { stream_id, sender, new_end_time },
);

Ok(new_end_time)
}

// ─── Read-only Queries ────────────────────────────────────────────────────

/// Returns the stream record for `stream_id`, or `None` if it does not exist.
pub fn get_stream(env: Env, stream_id: u64) -> Option<Stream> {
try_load_stream(&env, stream_id)
}

/// Returns `true` if the stream exists and has status `Completed`.
pub fn is_stream_completed(env: Env, stream_id: u64) -> bool {
try_load_stream(&env, stream_id)
.map(|s| s.status == StreamStatus::Completed)
.unwrap_or(false)
}

/// Get the current claimable amount for a stream without modifying state.
///
/// This is a read-only query that calculates how many tokens the recipient
Expand Down
Loading
Loading