diff --git a/fluxapay/src/lib.rs b/fluxapay/src/lib.rs index 45459a7..d420464 100644 --- a/fluxapay/src/lib.rs +++ b/fluxapay/src/lib.rs @@ -302,6 +302,33 @@ pub struct Stream { pub created_at: u64, } +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum StreamStatus { + Active, + Cancelled, + Completed, +} + +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Stream { + pub stream_id: String, + pub sender: Address, + pub recipient: Address, + pub amount: i128, + pub status: StreamStatus, + pub created_at: u64, +} + +#[contracttype] +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct WithdrawalTo { + pub stream_id: String, + pub destination: Address, + pub amount: i128, +} + #[contracttype] pub enum DataKey { Payment(String), @@ -313,6 +340,7 @@ pub enum DataKey { Dispute(String), PaymentDisputes(String), DisputeCounter, + Stream(String), UsdcToken, Paused, CreationPaused, @@ -2131,6 +2159,13 @@ impl PaymentProcessor { Ok(()) } + pub fn create_stream( + env: Env, + sender: Address, + stream_id: String, + recipient: Address, + amount: i128, + ) -> Result<(), Error> { /// Atomic swap and pay: swap sender's token to merchant's required token and create payment. /// Integrates with DEX (e.g., Soroswap) for atomic asset conversion. /// @@ -2325,6 +2360,24 @@ impl PaymentProcessor { if amount <= 0 { return Err(Error::InvalidAmount); } + if stream_id.is_empty() { + return Err(Error::InvalidPaymentId); + } + if env + .storage() + .persistent() + .has(&DataKey::Stream(stream_id.clone())) + { + return Err(Error::PaymentAlreadyExists); + } + + let stream = Stream { + stream_id: stream_id.clone(), + sender: sender.clone(), + recipient: recipient.clone(), + amount, + status: StreamStatus::Active, + created_at: env.ledger().timestamp(), if duration == 0 { return Err(Error::InvalidAmount); @@ -2360,6 +2413,16 @@ impl PaymentProcessor { env.storage() .persistent() + .set(&DataKey::Stream(stream_id.clone()), &stream); + Self::bump_ttl(&env, &DataKey::Stream(stream_id.clone()), LONG_LIVE_TTL); + + env.events().publish( + ( + Symbol::new(&env, "STREAM"), + Symbol::new(&env, "CREATED"), + stream_id.clone(), + ), + (sender, recipient, amount), .set(&RecipientDataKey::RecipientStream(recipient.clone()), &stream); env.events().publish( @@ -2370,6 +2433,113 @@ impl PaymentProcessor { Ok(()) } + pub fn cancel_multiple_streams( + env: Env, + sender: Address, + stream_ids: Vec, + ) -> Result, Error> { + sender.require_auth(); + + let mut cancelled = vec![&env]; + let mut i = 0; + while i < stream_ids.len() { + if let Some(stream_id) = stream_ids.get(i) { + if let Ok(mut stream) = Self::get_stream_internal(&env, stream_id) { + if stream.sender == sender && stream.status == StreamStatus::Active { + stream.status = StreamStatus::Cancelled; + env.storage() + .persistent() + .set(&DataKey::Stream(stream_id.clone()), &stream); + Self::bump_ttl(&env, &DataKey::Stream(stream_id.clone()), LONG_LIVE_TTL); + + env.events().publish( + ( + Symbol::new(&env, "STREAM"), + Symbol::new(&env, "CANCELLED"), + stream_id.clone(), + ), + (stream.sender.clone(), stream.recipient.clone(), stream.amount), + ); + env.events().publish( + ( + Symbol::new(&env, "REFUND"), + Symbol::new(&env, "PROCESSED"), + stream_id.clone(), + ), + (stream.sender.clone(), stream.amount), + ); + + cancelled.push_back(stream_id.clone()); + } + } + } + i += 1; + } + + Ok(cancelled) + } + + pub fn batch_withdraw_to( + env: Env, + recipient: Address, + withdrawals: Vec, + ) -> Result, Error> { + recipient.require_auth(); + + let mut success = vec![&env]; + let mut i = 0; + while i < withdrawals.len() { + if let Some(withdrawal) = withdrawals.get(i) { + if let Ok(mut stream) = Self::get_stream_internal(&env, &withdrawal.stream_id) { + if stream.recipient == recipient + && stream.status == StreamStatus::Active + && withdrawal.amount > 0 + && withdrawal.amount <= stream.amount + { + stream.amount = stream.amount.saturating_sub(withdrawal.amount); + if stream.amount == 0 { + stream.status = StreamStatus::Completed; + } + env.storage() + .persistent() + .set(&DataKey::Stream(withdrawal.stream_id.clone()), &stream); + Self::bump_ttl( + &env, + &DataKey::Stream(withdrawal.stream_id.clone()), + LONG_LIVE_TTL, + ); + + env.events().publish( + ( + Symbol::new(&env, "WITHDRAWAL"), + Symbol::new(&env, "WithdrawalTo"), + withdrawal.stream_id.clone(), + ), + ( + recipient.clone(), + withdrawal.destination.clone(), + withdrawal.amount, + ), + ); + success.push_back(withdrawal.stream_id.clone()); + } + } + } + i += 1; + } + + Ok(success) + } + + pub fn get_stream(env: Env, stream_id: String) -> Result { + Self::get_stream_internal(&env, &stream_id) + } + + fn get_stream_internal(env: &Env, stream_id: &String) -> Result { + env.storage() + .persistent() + .get(&DataKey::Stream(stream_id.clone())) + .ok_or(Error::PaymentNotFound) /// Claim available amount from a recipient stream. pub fn claim_from_stream(env: Env, recipient: Address) -> Result { recipient.require_auth(); diff --git a/fluxapay/src/test.rs b/fluxapay/src/test.rs index 483bc93..ef48687 100644 --- a/fluxapay/src/test.rs +++ b/fluxapay/src/test.rs @@ -106,6 +106,67 @@ fn test_create_payment_rate_limit_enforced() { assert_eq!(overflow, Err(Ok(Error::RateLimitExceeded))); } +#[test] +fn test_cancel_multiple_streams_for_sender() { + let env = Env::default(); + env.mock_all_auths(); + let (_admin, client) = setup_payment_processor(&env); + + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + let stream_id1 = String::from_str(&env, "stream_1"); + let stream_id2 = String::from_str(&env, "stream_2"); + + assert_eq!(client.create_stream(&sender, &stream_id1, &recipient, &100i128), Ok(())); + assert_eq!(client.create_stream(&sender, &stream_id2, &recipient, &200i128), Ok(())); + + let stream_ids = vec![&env, stream_id1.clone(), stream_id2.clone()]; + let cancelled = client.cancel_multiple_streams(&sender, &stream_ids).unwrap(); + + assert_eq!(cancelled.len(), 2); + let stream1 = client.get_stream(&stream_id1).unwrap(); + let stream2 = client.get_stream(&stream_id2).unwrap(); + assert_eq!(stream1.status, StreamStatus::Cancelled); + assert_eq!(stream2.status, StreamStatus::Cancelled); +} + +#[test] +fn test_batch_withdraw_to_custom_routing() { + let env = Env::default(); + env.mock_all_auths(); + let (_admin, client) = setup_payment_processor(&env); + + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + let destination1 = Address::generate(&env); + let destination2 = Address::generate(&env); + let stream_id1 = String::from_str(&env, "stream_a"); + let stream_id2 = String::from_str(&env, "stream_b"); + + assert_eq!(client.create_stream(&sender, &stream_id1, &recipient, &100i128), Ok(())); + assert_eq!(client.create_stream(&sender, &stream_id2, &recipient, &200i128), Ok(())); + + let withdrawal1 = WithdrawalTo { + stream_id: stream_id1.clone(), + destination: destination1.clone(), + amount: 40, + }; + let withdrawal2 = WithdrawalTo { + stream_id: stream_id2.clone(), + destination: destination2.clone(), + amount: 150, + }; + let withdrawals = vec![&env, withdrawal1, withdrawal2]; + + let success = client.batch_withdraw_to(&recipient, &withdrawals).unwrap(); + assert_eq!(success.len(), 2); + + let stream1 = client.get_stream(&stream_id1).unwrap(); + let stream2 = client.get_stream(&stream_id2).unwrap(); + assert_eq!(stream1.amount, 60); + assert_eq!(stream2.amount, 50); +} + #[test] fn test_verify_payment_success() { let env = Env::default();