From dc6b31c6c79d333f94529d2140f449041d77a86f Mon Sep 17 00:00:00 2001 From: ussyalfaks Date: Mon, 27 Apr 2026 23:33:31 +0100 Subject: [PATCH] feat(contracts): implement pause/resume, stream completion, and edge case tests Add unit tests for stream creation edge cases Fixes #232 Add unit tests for withdraw, top_up, and cancel stream flows Fixes #233 Add stream pause and resume functionality Fixes #234 Add stream completion detection and COMPLETED event Fixes #235 --- .../src/services/soroban-indexer.service.ts | 8 +- backend/src/workers/soroban-event-worker.ts | 51 ++ contracts/stream_contract/src/events.rs | 33 ++ contracts/stream_contract/src/lib.rs | 137 ++++- contracts/stream_contract/src/test.rs | 557 +++++++++++++++++- contracts/stream_contract/src/types.rs | 16 + ...cel_stream_after_partial_withdrawal.1.json | 26 + .../test_cancel_stream_refunds_sender.1.json | 26 + ..._withdraw_caps_at_remaining_balance.1.json | 26 + ...est_withdraw_time_based_calculation.1.json | 26 + 10 files changed, 885 insertions(+), 21 deletions(-) diff --git a/backend/src/services/soroban-indexer.service.ts b/backend/src/services/soroban-indexer.service.ts index 433029a..9e2f729 100644 --- a/backend/src/services/soroban-indexer.service.ts +++ b/backend/src/services/soroban-indexer.service.ts @@ -22,7 +22,7 @@ interface RpcResponse { }; } -type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN'; +type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN' | 'COMPLETED'; const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org'; const POLL_MS = Number(process.env.SOROBAN_INDEXER_POLL_MS ?? 15000); @@ -112,6 +112,7 @@ export class SorobanIndexerService { if (firstTopic.includes('stream_created')) return 'CREATED'; if (firstTopic.includes('stream_cancelled')) return 'CANCELLED'; if (firstTopic.includes('tokens_withdrawn')) return 'WITHDRAWN'; + if (firstTopic.includes('stream_completed')) return 'COMPLETED'; return null; } @@ -222,6 +223,11 @@ export class SorobanIndexerService { }, }); } + } else if (eventType === 'COMPLETED') { + await prisma.stream.updateMany({ + where: { streamId }, + data: { isActive: false, lastUpdateTime: timestamp }, + }); } await prisma.streamEvent.create({ diff --git a/backend/src/workers/soroban-event-worker.ts b/backend/src/workers/soroban-event-worker.ts index ac4600c..831eba2 100644 --- a/backend/src/workers/soroban-event-worker.ts +++ b/backend/src/workers/soroban-event-worker.ts @@ -238,6 +238,9 @@ export class SorobanEventWorker { case 'stream_cancelled': await this.handleStreamCancelled(event, topic1); break; + case 'stream_completed': + await this.handleStreamCompleted(event, topic1); + break; default: // Unrecognised event — ignore silently. break; @@ -483,6 +486,54 @@ export class SorobanEventWorker { timestamp, }); } + + private async handleStreamCompleted( + event: rpc.Api.EventResponse, + streamIdTopic: xdr.ScVal, + ): Promise { + const streamId = Number(decodeU64(streamIdTopic)); + const body = decodeMap(event.value); + + if (!body['recipient'] || !body['total_withdrawn']) { + throw new Error(`StreamCompleted #${streamId}: missing body fields`); + } + + const recipient = decodeAddress(body['recipient']); + const totalWithdrawn = decodeI128(body['total_withdrawn']); + const timestamp = Math.floor(Date.now() / 1000); + + await prisma.$transaction(async (tx: any) => { + await tx.stream.update({ + where: { streamId }, + data: { + isActive: false, + withdrawnAmount: totalWithdrawn, + lastUpdateTime: timestamp, + }, + }); + + await tx.streamEvent.create({ + data: { + streamId, + eventType: 'COMPLETED', + amount: totalWithdrawn, + transactionHash: event.txHash, + ledgerSequence: event.ledger, + timestamp, + metadata: JSON.stringify({ recipient }), + }, + }); + }); + + sseService.broadcastToStream(String(streamId), 'stream.completed', { + streamId, + recipient, + totalWithdrawn, + transactionHash: event.txHash, + ledger: event.ledger, + timestamp, + }); + } } export const sorobanEventWorker = new SorobanEventWorker(); diff --git a/contracts/stream_contract/src/events.rs b/contracts/stream_contract/src/events.rs index f67ccb0..dc8f9dc 100644 --- a/contracts/stream_contract/src/events.rs +++ b/contracts/stream_contract/src/events.rs @@ -69,3 +69,36 @@ pub struct FeeCollectedEvent { pub fee_amount: i128, pub token: Address, } + +/// Emitted when a sender pauses an active stream. +/// +/// Topic: `("stream_paused", stream_id)` +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StreamPausedEvent { + pub stream_id: u64, + pub sender: Address, + pub paused_at: u64, +} + +/// Emitted when a sender resumes a paused stream. +/// +/// Topic: `("stream_resumed", stream_id)` +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StreamResumedEvent { + pub stream_id: u64, + pub sender: Address, + pub new_end_time: u64, +} + +/// Emitted when a stream is fully drained on the final withdrawal. +/// +/// Topic: `("stream_completed", stream_id)` +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StreamCompletedEvent { + pub stream_id: u64, + pub recipient: Address, + pub total_withdrawn: i128, +} diff --git a/contracts/stream_contract/src/lib.rs b/contracts/stream_contract/src/lib.rs index 88ecdf7..6f4875f 100644 --- a/contracts/stream_contract/src/lib.rs +++ b/contracts/stream_contract/src/lib.rs @@ -12,14 +12,14 @@ use soroban_sdk::{contract, contractimpl, token, vec, Address, Env, InvokeError, use errors::StreamError; use events::{ - FeeCollectedEvent, StreamCancelledEvent, StreamCreatedEvent, StreamToppedUpEvent, - TokensWithdrawnEvent, + FeeCollectedEvent, StreamCancelledEvent, StreamCompletedEvent, StreamCreatedEvent, + StreamPausedEvent, StreamResumedEvent, StreamToppedUpEvent, TokensWithdrawnEvent, }; use storage::{ config_exists, load_config, load_stream, next_stream_id, save_config, save_stream, try_load_config, try_load_stream, }; -use types::{ProtocolConfig, Stream}; +use types::{ProtocolConfig, Stream, StreamStatus}; /// Maximum allowed protocol fee: 1 000 bps = 10%. const MAX_FEE_RATE_BPS: u32 = 1_000; @@ -157,6 +157,9 @@ impl StreamContract { start_time, last_update_time: start_time, is_active: true, + paused: false, + paused_at: None, + status: StreamStatus::Active, }, ); @@ -248,17 +251,15 @@ impl StreamContract { /// Calculate the claimable amount for a stream at a given timestamp. /// - /// This helper computes how many tokens have been streamed since the last - /// update, capped at the remaining balance to prevent over-withdrawal. - /// - /// # Arguments - /// * `stream` - The stream to calculate claimable amount for - /// * `now` - Current ledger timestamp - /// - /// # Returns - /// The amount of tokens that can be claimed, never exceeding remaining balance + /// Excludes any time the stream was paused. If the stream is currently + /// paused, accrual stops at `paused_at`. fn calculate_claimable(stream: &Stream, now: u64) -> i128 { - let elapsed = now.saturating_sub(stream.last_update_time); + let effective_now = if stream.paused { + stream.paused_at.unwrap_or(stream.last_update_time) + } else { + now + }; + let elapsed = effective_now.saturating_sub(stream.last_update_time); let streamed = (elapsed as i128) .checked_mul(stream.rate_per_second) @@ -315,9 +316,10 @@ impl StreamContract { stream.withdrawn_amount += amount; stream.last_update_time = now; - // Mark stream as inactive if fully drained + // Mark stream as inactive and completed if fully drained if stream.withdrawn_amount >= stream.deposited_amount { stream.is_active = false; + stream.status = StreamStatus::Completed; } } @@ -342,8 +344,11 @@ impl StreamContract { return Err(StreamError::Unauthorized); } - // Validate stream is active + // Validate stream is active and not paused Self::validate_stream_active(&stream)?; + if stream.paused { + return Err(StreamError::StreamInactive); + } let now = env.ledger().timestamp(); let claimable = Self::calculate_claimable(&stream, now); @@ -355,19 +360,31 @@ impl StreamContract { // Use helper function to transfer tokens and update state Self::transfer_and_update_stream(&env, &mut stream, &recipient, claimable, now); + let completed = stream.status == StreamStatus::Completed; save_stream(&env, stream_id, &stream); - // Emit withdrawal event env.events().publish( (Symbol::new(&env, "tokens_withdrawn"), stream_id), TokensWithdrawnEvent { stream_id, - recipient, + recipient: recipient.clone(), amount: claimable, timestamp: stream.last_update_time, }, ); + // Emit COMPLETED event on final withdrawal + if completed { + env.events().publish( + (Symbol::new(&env, "stream_completed"), stream_id), + StreamCompletedEvent { + stream_id, + recipient, + total_withdrawn: stream.withdrawn_amount, + }, + ); + } + Ok(claimable) } @@ -413,6 +430,7 @@ impl StreamContract { // Mark stream as inactive stream.is_active = false; + stream.status = StreamStatus::Cancelled; stream.last_update_time = now; let recipient = stream.recipient.clone(); @@ -435,6 +453,84 @@ impl StreamContract { Ok(()) } + /// Pause an active stream. Only the sender may pause. + /// + /// # Errors + /// - `StreamNotFound` — no stream exists with `stream_id`. + /// - `Unauthorized` — caller is not the stream's sender. + /// - `StreamInactive` — stream is already inactive. + pub fn pause_stream(env: Env, sender: Address, stream_id: u64) -> Result<(), StreamError> { + sender.require_auth(); + + let mut stream = load_stream(&env, stream_id)?; + Self::validate_stream_ownership(&stream, &sender)?; + Self::validate_stream_active(&stream)?; + + if stream.paused { + return Err(StreamError::StreamInactive); + } + + let now = env.ledger().timestamp(); + stream.paused = true; + stream.paused_at = Some(now); + stream.status = StreamStatus::Paused; + save_stream(&env, stream_id, &stream); + + env.events().publish( + (Symbol::new(&env, "stream_paused"), stream_id), + StreamPausedEvent { stream_id, sender, paused_at: now }, + ); + + Ok(()) + } + + /// Resume a paused stream. Adjusts `end_time` by the pause duration. + /// + /// The `last_update_time` is advanced to `now` so that accrual resumes + /// from the current moment, effectively extending the stream by the + /// duration it was paused. + /// + /// # Errors + /// - `StreamNotFound` — no stream exists with `stream_id`. + /// - `Unauthorized` — caller is not the stream's sender. + /// - `StreamInactive` — stream is not paused (already active or cancelled). + pub fn resume_stream(env: Env, sender: Address, stream_id: u64) -> Result { + sender.require_auth(); + + let mut stream = load_stream(&env, stream_id)?; + Self::validate_stream_ownership(&stream, &sender)?; + + if !stream.paused { + return Err(StreamError::StreamInactive); + } + + let now = env.ledger().timestamp(); + let paused_at = stream.paused_at.unwrap_or(now); + let pause_duration = now.saturating_sub(paused_at); + + // Advance last_update_time by pause duration so accrual resumes from now. + stream.last_update_time = stream.last_update_time.saturating_add(pause_duration); + // new_end_time represents when the stream will fully drain from now. + let remaining = stream.deposited_amount.saturating_sub(stream.withdrawn_amount); + let new_end_time = if stream.rate_per_second > 0 { + now + (remaining / stream.rate_per_second) as u64 + } else { + now + }; + + stream.paused = false; + stream.paused_at = None; + stream.status = StreamStatus::Active; + save_stream(&env, stream_id, &stream); + + env.events().publish( + (Symbol::new(&env, "stream_resumed"), stream_id), + StreamResumedEvent { stream_id, sender, new_end_time }, + ); + + Ok(new_end_time) + } + // ─── Read-only Queries ──────────────────────────────────────────────────── /// Returns the stream record for `stream_id`, or `None` if it does not exist. @@ -442,6 +538,13 @@ impl StreamContract { try_load_stream(&env, stream_id) } + /// Returns `true` if the stream exists and has status `Completed`. + pub fn is_stream_completed(env: Env, stream_id: u64) -> bool { + try_load_stream(&env, stream_id) + .map(|s| s.status == StreamStatus::Completed) + .unwrap_or(false) + } + /// Get the current claimable amount for a stream without modifying state. /// /// This is a read-only query that calculates how many tokens the recipient diff --git a/contracts/stream_contract/src/test.rs b/contracts/stream_contract/src/test.rs index b38a016..6d22901 100644 --- a/contracts/stream_contract/src/test.rs +++ b/contracts/stream_contract/src/test.rs @@ -10,10 +10,10 @@ use soroban_sdk::{ use errors::StreamError; use events::{ - FeeCollectedEvent, StreamCancelledEvent, StreamCreatedEvent, StreamToppedUpEvent, - TokensWithdrawnEvent, + FeeCollectedEvent, StreamCancelledEvent, StreamCompletedEvent, StreamCreatedEvent, + StreamPausedEvent, StreamResumedEvent, StreamToppedUpEvent, TokensWithdrawnEvent, }; -use types::{DataKey, Stream}; +use types::{DataKey, Stream, StreamStatus}; // ─── Test Helpers ───────────────────────────────────────────────────────────── @@ -68,6 +68,9 @@ fn test_datakey_stream_serializes_deterministically() { start_time: 1, last_update_time: 1, is_active: true, + paused: false, + paused_at: None, + status: StreamStatus::Active, }; env.as_contract(&contract_id, || { env.storage().persistent().set(&key, &stream); @@ -944,3 +947,551 @@ fn test_cancel_stream_after_partial_withdrawal() { // Contract should be fully drained assert_eq!(contract_balance_after, 0); } + +// ─── #232 create_stream edge cases ─────────────────────────────────────────── + +#[test] +fn test_create_stream_minimum_amount() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1, &1); + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.deposited_amount, 1); + assert!(s.is_active); +} + +#[test] +fn test_create_stream_minimum_duration() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 100); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &100, &1); + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.rate_per_second, 100); +} + +#[test] +fn test_create_stream_max_i128_amount() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + // Use a large but safe amount: 10^18 tokens over 10^9 seconds = 10^9 rate. + let amount: i128 = 1_000_000_000_000_000_000i128; // 10^18 + let duration: u64 = 1_000_000_000u64; // 10^9 + mint(&env, &token, &sender, amount); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &amount, &duration); + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.deposited_amount, amount); + assert_eq!(s.rate_per_second, 1_000_000_000i128); // 10^18 / 10^9 +} + +#[test] +fn test_create_stream_invalid_token() { + let env = Env::default(); + env.mock_all_auths(); + let client = create_contract(&env); + + // A plain account address is not a SAC — must return InvalidTokenAddress. + let result = client.try_create_stream( + &Address::generate(&env), + &Address::generate(&env), + &Address::generate(&env), + &100, + &10, + ); + assert_eq!(result, Err(Ok(StreamError::InvalidTokenAddress))); +} + +#[test] +fn test_create_stream_self_stream() { + // sender == recipient is allowed by the contract (no explicit guard), + // but the stream must be created successfully and state must be consistent. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let actor = Address::generate(&env); + mint(&env, &token, &actor, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&actor, &actor, &token, &1_000, &100); + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.sender, actor); + assert_eq!(s.recipient, actor); +} + +#[test] +fn test_create_stream_zero_rate() { + // amount < duration → rate_per_second rounds to 0 via integer division. + // The stream is created but will never accrue anything. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1, &1_000); + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.rate_per_second, 0); + + // Advance time — nothing should be claimable. + env.ledger().with_mut(|l| l.timestamp += 500); + assert_eq!(client.get_claimable_amount(&id), Some(0)); +} + +#[test] +fn test_stream_id_uniqueness() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 2_000); + + let client = create_contract(&env); + let id1 = client.create_stream(&sender, &recipient, &token, &1_000, &100); + let id2 = client.create_stream(&sender, &recipient, &token, &1_000, &100); + assert_ne!(id1, id2); + + // Both streams must be independently retrievable. + assert!(client.get_stream(&id1).is_some()); + assert!(client.get_stream(&id2).is_some()); +} + +// ─── #233 withdraw / top_up / cancel lifecycle ─────────────────────────────── + +#[test] +fn test_withdraw_accrued_amount() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let token_client = token::Client::new(&env, &token); + // 1_000 tokens / 1_000 s = 1 token/s + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 200); + let claimed = client.withdraw(&recipient, &id); + assert_eq!(claimed, 200); + assert_eq!(token_client.balance(&recipient), 200); +} + +#[test] +fn test_withdraw_zero_balance() { + // Withdraw before any time elapses → InvalidAmount. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + assert_eq!( + client.try_withdraw(&recipient, &id), + Err(Ok(StreamError::InvalidAmount)) + ); +} + +#[test] +fn test_withdraw_full_balance() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 500); + + let client = create_contract(&env); + let token_client = token::Client::new(&env, &token); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + // Advance past stream end. + env.ledger().with_mut(|l| l.timestamp += 200); + let claimed = client.withdraw(&recipient, &id); + assert_eq!(claimed, 500); + assert_eq!(token_client.balance(&recipient), 500); + + let s = client.get_stream(&id).unwrap(); + assert!(!s.is_active); + assert_eq!(s.status, StreamStatus::Completed); +} + +#[test] +fn test_top_up_extends_stream() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 2_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100); + + client.top_up_stream(&sender, &id, &1_000); + + let s = client.get_stream(&id).unwrap(); + // deposited_amount should now be 2_000 + assert_eq!(s.deposited_amount, 2_000); + // rate unchanged; effective end extends by 1_000 / rate_per_second more seconds + assert_eq!(s.rate_per_second, 10); // 1_000 / 100 +} + +#[test] +fn test_top_up_on_completed_stream() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + // Drain the stream. + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + // Top-up on a completed (inactive) stream must fail. + mint(&env, &token, &sender, 500); + assert_eq!( + client.try_top_up_stream(&sender, &id, &500), + Err(Ok(StreamError::StreamInactive)) + ); +} + +#[test] +fn test_cancel_refunds_sender() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let token_client = token::Client::new(&env, &token); + // 1_000 tokens / 1_000 s = 1 token/s + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 400); + let before = token_client.balance(&sender); + client.cancel_stream(&sender, &id); + // 400 accrued to recipient, 600 refunded to sender + assert_eq!(token_client.balance(&sender) - before, 600); +} + +#[test] +fn test_cancel_by_non_sender() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + + assert_eq!( + client.try_cancel_stream(&Address::generate(&env), &id), + Err(Ok(StreamError::Unauthorized)) + ); +} + +#[test] +fn test_cancel_after_completion() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 500); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + assert_eq!( + client.try_cancel_stream(&sender, &id), + Err(Ok(StreamError::StreamInactive)) + ); +} + +// ─── #234 pause / resume ───────────────────────────────────────────────────── + +#[test] +fn test_pause_stops_accrual() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + // 1_000 tokens / 1_000 s = 1 token/s + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 100); + client.pause_stream(&sender, &id); + + // Advance more time — should not accrue while paused. + env.ledger().with_mut(|l| l.timestamp += 200); + assert_eq!(client.get_claimable_amount(&id), Some(100)); + + let s = client.get_stream(&id).unwrap(); + assert!(s.paused); + assert_eq!(s.status, StreamStatus::Paused); +} + +#[test] +fn test_resume_adjusts_end_time() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 100); + client.pause_stream(&sender, &id); + + // Paused for 300 seconds. + env.ledger().with_mut(|l| l.timestamp += 300); + let new_end = client.resume_stream(&sender, &id); + + // After resume, stream should be active again. + let s = client.get_stream(&id).unwrap(); + assert!(!s.paused); + assert_eq!(s.status, StreamStatus::Active); + + // Advance 100 more seconds — should accrue 100 tokens (not 400). + env.ledger().with_mut(|l| l.timestamp += 100); + assert_eq!(client.get_claimable_amount(&id), Some(200)); // 100 before pause + 100 after +} + +#[test] +fn test_pause_emits_event() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + client.pause_stream(&sender, &id); + + let events = env.events().all(); + let ev = events + .iter() + .find(|e| { + Symbol::try_from_val(&env, &e.1.get(0).unwrap()).unwrap() + == Symbol::new(&env, "stream_paused") + }) + .expect("stream_paused event not found"); + + let payload: StreamPausedEvent = StreamPausedEvent::try_from_val(&env, &ev.2).unwrap(); + assert_eq!(payload.stream_id, id); + assert_eq!(payload.sender, sender); +} + +#[test] +fn test_resume_emits_event() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + client.pause_stream(&sender, &id); + env.ledger().with_mut(|l| l.timestamp += 100); + client.resume_stream(&sender, &id); + + let events = env.events().all(); + let ev = events + .iter() + .find(|e| { + Symbol::try_from_val(&env, &e.1.get(0).unwrap()).unwrap() + == Symbol::new(&env, "stream_resumed") + }) + .expect("stream_resumed event not found"); + + let payload: StreamResumedEvent = StreamResumedEvent::try_from_val(&env, &ev.2).unwrap(); + assert_eq!(payload.stream_id, id); + assert_eq!(payload.sender, sender); +} + +#[test] +fn test_pause_by_non_sender_fails() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + + assert_eq!( + client.try_pause_stream(&Address::generate(&env), &id), + Err(Ok(StreamError::Unauthorized)) + ); +} + +#[test] +fn test_resume_non_paused_stream_fails() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + + assert_eq!( + client.try_resume_stream(&sender, &id), + Err(Ok(StreamError::StreamInactive)) + ); +} + +#[test] +fn test_withdraw_on_paused_stream_fails() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 100); + client.pause_stream(&sender, &id); + env.ledger().with_mut(|l| l.timestamp += 100); + + assert_eq!( + client.try_withdraw(&recipient, &id), + Err(Ok(StreamError::StreamInactive)) + ); +} + +// ─── #235 stream completion ─────────────────────────────────────────────────── + +#[test] +fn test_final_withdrawal_transitions_to_completed() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 500); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.status, StreamStatus::Completed); + assert!(!s.is_active); +} + +#[test] +fn test_is_stream_completed_helper() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 500); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + assert!(!client.is_stream_completed(&id)); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + assert!(client.is_stream_completed(&id)); +} + +#[test] +fn test_completed_event_emitted_on_final_withdrawal() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 500); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &500, &100); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + let events = env.events().all(); + let ev = events + .iter() + .find(|e| { + Symbol::try_from_val(&env, &e.1.get(0).unwrap()).unwrap() + == Symbol::new(&env, "stream_completed") + }) + .expect("stream_completed event not found"); + + let payload: StreamCompletedEvent = StreamCompletedEvent::try_from_val(&env, &ev.2).unwrap(); + assert_eq!(payload.stream_id, id); + assert_eq!(payload.recipient, recipient); + assert_eq!(payload.total_withdrawn, 500); +} + +#[test] +fn test_partial_withdrawal_does_not_complete() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.withdraw(&recipient, &id); + + let s = client.get_stream(&id).unwrap(); + assert_eq!(s.status, StreamStatus::Active); + assert!(s.is_active); + assert!(!client.is_stream_completed(&id)); +} diff --git a/contracts/stream_contract/src/types.rs b/contracts/stream_contract/src/types.rs index 678a77f..5a25cd8 100644 --- a/contracts/stream_contract/src/types.rs +++ b/contracts/stream_contract/src/types.rs @@ -2,6 +2,16 @@ use soroban_sdk::{contracttype, Address}; +/// Status of a payment stream. +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum StreamStatus { + Active, + Paused, + Cancelled, + Completed, +} + /// Centralized storage key strategy. /// /// All contract storage is keyed exclusively through this enum, ensuring: @@ -44,6 +54,12 @@ pub struct Stream { pub last_update_time: u64, /// `false` once fully withdrawn or cancelled. pub is_active: bool, + /// `true` when the stream is paused by the sender. + pub paused: bool, + /// Ledger timestamp when the stream was paused, `None` if not paused. + pub paused_at: Option, + /// Current status of the stream. + pub status: StreamStatus, } /// Protocol-wide fee configuration. diff --git a/contracts/stream_contract/test_snapshots/test/test_cancel_stream_after_partial_withdrawal.1.json b/contracts/stream_contract/test_snapshots/test/test_cancel_stream_after_partial_withdrawal.1.json index b2efad3..56e2ee6 100644 --- a/contracts/stream_contract/test_snapshots/test/test_cancel_stream_after_partial_withdrawal.1.json +++ b/contracts/stream_contract/test_snapshots/test/test_cancel_stream_after_partial_withdrawal.1.json @@ -424,6 +424,20 @@ "u64": 300 } }, + { + "key": { + "symbol": "paused" + }, + "val": { + "bool": false + } + }, + { + "key": { + "symbol": "paused_at" + }, + "val": "void" + }, { "key": { "symbol": "rate_per_second" @@ -459,6 +473,18 @@ "u64": 0 } }, + { + "key": { + "symbol": "status" + }, + "val": { + "vec": [ + { + "symbol": "Cancelled" + } + ] + } + }, { "key": { "symbol": "token_address" diff --git a/contracts/stream_contract/test_snapshots/test/test_cancel_stream_refunds_sender.1.json b/contracts/stream_contract/test_snapshots/test/test_cancel_stream_refunds_sender.1.json index 7315f50..253cab0 100644 --- a/contracts/stream_contract/test_snapshots/test/test_cancel_stream_refunds_sender.1.json +++ b/contracts/stream_contract/test_snapshots/test/test_cancel_stream_refunds_sender.1.json @@ -369,6 +369,20 @@ "u64": 300 } }, + { + "key": { + "symbol": "paused" + }, + "val": { + "bool": false + } + }, + { + "key": { + "symbol": "paused_at" + }, + "val": "void" + }, { "key": { "symbol": "rate_per_second" @@ -404,6 +418,18 @@ "u64": 0 } }, + { + "key": { + "symbol": "status" + }, + "val": { + "vec": [ + { + "symbol": "Cancelled" + } + ] + } + }, { "key": { "symbol": "token_address" diff --git a/contracts/stream_contract/test_snapshots/test/test_withdraw_caps_at_remaining_balance.1.json b/contracts/stream_contract/test_snapshots/test/test_withdraw_caps_at_remaining_balance.1.json index 2107be1..4071bb9 100644 --- a/contracts/stream_contract/test_snapshots/test/test_withdraw_caps_at_remaining_balance.1.json +++ b/contracts/stream_contract/test_snapshots/test/test_withdraw_caps_at_remaining_balance.1.json @@ -365,6 +365,20 @@ "u64": 200 } }, + { + "key": { + "symbol": "paused" + }, + "val": { + "bool": false + } + }, + { + "key": { + "symbol": "paused_at" + }, + "val": "void" + }, { "key": { "symbol": "rate_per_second" @@ -400,6 +414,18 @@ "u64": 0 } }, + { + "key": { + "symbol": "status" + }, + "val": { + "vec": [ + { + "symbol": "Completed" + } + ] + } + }, { "key": { "symbol": "token_address" diff --git a/contracts/stream_contract/test_snapshots/test/test_withdraw_time_based_calculation.1.json b/contracts/stream_contract/test_snapshots/test/test_withdraw_time_based_calculation.1.json index c0f9304..5aba01e 100644 --- a/contracts/stream_contract/test_snapshots/test/test_withdraw_time_based_calculation.1.json +++ b/contracts/stream_contract/test_snapshots/test/test_withdraw_time_based_calculation.1.json @@ -421,6 +421,20 @@ "u64": 300 } }, + { + "key": { + "symbol": "paused" + }, + "val": { + "bool": false + } + }, + { + "key": { + "symbol": "paused_at" + }, + "val": "void" + }, { "key": { "symbol": "rate_per_second" @@ -456,6 +470,18 @@ "u64": 0 } }, + { + "key": { + "symbol": "status" + }, + "val": { + "vec": [ + { + "symbol": "Active" + } + ] + } + }, { "key": { "symbol": "token_address"