Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions fluxapay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,33 @@ pub struct Stream {
pub created_at: u64,
}

#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum StreamStatus {
Active,
Cancelled,
Completed,
}

#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Stream {
pub stream_id: String,
pub sender: Address,
pub recipient: Address,
pub amount: i128,
pub status: StreamStatus,
pub created_at: u64,
}

#[contracttype]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct WithdrawalTo {
pub stream_id: String,
pub destination: Address,
pub amount: i128,
}

#[contracttype]
pub enum DataKey {
Payment(String),
Expand All @@ -313,6 +340,7 @@ pub enum DataKey {
Dispute(String),
PaymentDisputes(String),
DisputeCounter,
Stream(String),
UsdcToken,
Paused,
CreationPaused,
Expand Down Expand Up @@ -2131,6 +2159,13 @@ impl PaymentProcessor {
Ok(())
}

pub fn create_stream(
env: Env,
sender: Address,
stream_id: String,
recipient: Address,
amount: i128,
) -> Result<(), Error> {
/// Atomic swap and pay: swap sender's token to merchant's required token and create payment.
/// Integrates with DEX (e.g., Soroswap) for atomic asset conversion.
///
Expand Down Expand Up @@ -2325,6 +2360,24 @@ impl PaymentProcessor {
if amount <= 0 {
return Err(Error::InvalidAmount);
}
if stream_id.is_empty() {
return Err(Error::InvalidPaymentId);
}
if env
.storage()
.persistent()
.has(&DataKey::Stream(stream_id.clone()))
{
return Err(Error::PaymentAlreadyExists);
}

let stream = Stream {
stream_id: stream_id.clone(),
sender: sender.clone(),
recipient: recipient.clone(),
amount,
status: StreamStatus::Active,
created_at: env.ledger().timestamp(),

if duration == 0 {
return Err(Error::InvalidAmount);
Expand Down Expand Up @@ -2360,6 +2413,16 @@ impl PaymentProcessor {

env.storage()
.persistent()
.set(&DataKey::Stream(stream_id.clone()), &stream);
Self::bump_ttl(&env, &DataKey::Stream(stream_id.clone()), LONG_LIVE_TTL);

env.events().publish(
(
Symbol::new(&env, "STREAM"),
Symbol::new(&env, "CREATED"),
stream_id.clone(),
),
(sender, recipient, amount),
.set(&RecipientDataKey::RecipientStream(recipient.clone()), &stream);

env.events().publish(
Expand All @@ -2370,6 +2433,113 @@ impl PaymentProcessor {
Ok(())
}

pub fn cancel_multiple_streams(
env: Env,
sender: Address,
stream_ids: Vec<String>,
) -> Result<Vec<String>, Error> {
sender.require_auth();

let mut cancelled = vec![&env];
let mut i = 0;
while i < stream_ids.len() {
if let Some(stream_id) = stream_ids.get(i) {
if let Ok(mut stream) = Self::get_stream_internal(&env, stream_id) {
if stream.sender == sender && stream.status == StreamStatus::Active {
stream.status = StreamStatus::Cancelled;
env.storage()
.persistent()
.set(&DataKey::Stream(stream_id.clone()), &stream);
Self::bump_ttl(&env, &DataKey::Stream(stream_id.clone()), LONG_LIVE_TTL);

env.events().publish(
(
Symbol::new(&env, "STREAM"),
Symbol::new(&env, "CANCELLED"),
stream_id.clone(),
),
(stream.sender.clone(), stream.recipient.clone(), stream.amount),
);
env.events().publish(
(
Symbol::new(&env, "REFUND"),
Symbol::new(&env, "PROCESSED"),
stream_id.clone(),
),
(stream.sender.clone(), stream.amount),
);

cancelled.push_back(stream_id.clone());
}
}
}
i += 1;
}

Ok(cancelled)
}

pub fn batch_withdraw_to(
env: Env,
recipient: Address,
withdrawals: Vec<WithdrawalTo>,
) -> Result<Vec<String>, Error> {
recipient.require_auth();

let mut success = vec![&env];
let mut i = 0;
while i < withdrawals.len() {
if let Some(withdrawal) = withdrawals.get(i) {
if let Ok(mut stream) = Self::get_stream_internal(&env, &withdrawal.stream_id) {
if stream.recipient == recipient
&& stream.status == StreamStatus::Active
&& withdrawal.amount > 0
&& withdrawal.amount <= stream.amount
{
stream.amount = stream.amount.saturating_sub(withdrawal.amount);
if stream.amount == 0 {
stream.status = StreamStatus::Completed;
}
env.storage()
.persistent()
.set(&DataKey::Stream(withdrawal.stream_id.clone()), &stream);
Self::bump_ttl(
&env,
&DataKey::Stream(withdrawal.stream_id.clone()),
LONG_LIVE_TTL,
);

env.events().publish(
(
Symbol::new(&env, "WITHDRAWAL"),
Symbol::new(&env, "WithdrawalTo"),
withdrawal.stream_id.clone(),
),
(
recipient.clone(),
withdrawal.destination.clone(),
withdrawal.amount,
),
);
success.push_back(withdrawal.stream_id.clone());
}
}
}
i += 1;
}

Ok(success)
}

pub fn get_stream(env: Env, stream_id: String) -> Result<Stream, Error> {
Self::get_stream_internal(&env, &stream_id)
}

fn get_stream_internal(env: &Env, stream_id: &String) -> Result<Stream, Error> {
env.storage()
.persistent()
.get(&DataKey::Stream(stream_id.clone()))
.ok_or(Error::PaymentNotFound)
/// Claim available amount from a recipient stream.
pub fn claim_from_stream(env: Env, recipient: Address) -> Result<i128, Error> {
recipient.require_auth();
Expand Down
61 changes: 61 additions & 0 deletions fluxapay/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,67 @@ fn test_create_payment_rate_limit_enforced() {
assert_eq!(overflow, Err(Ok(Error::RateLimitExceeded)));
}

#[test]
fn test_cancel_multiple_streams_for_sender() {
let env = Env::default();
env.mock_all_auths();
let (_admin, client) = setup_payment_processor(&env);

let sender = Address::generate(&env);
let recipient = Address::generate(&env);
let stream_id1 = String::from_str(&env, "stream_1");
let stream_id2 = String::from_str(&env, "stream_2");

assert_eq!(client.create_stream(&sender, &stream_id1, &recipient, &100i128), Ok(()));
assert_eq!(client.create_stream(&sender, &stream_id2, &recipient, &200i128), Ok(()));

let stream_ids = vec![&env, stream_id1.clone(), stream_id2.clone()];
let cancelled = client.cancel_multiple_streams(&sender, &stream_ids).unwrap();

assert_eq!(cancelled.len(), 2);
let stream1 = client.get_stream(&stream_id1).unwrap();
let stream2 = client.get_stream(&stream_id2).unwrap();
assert_eq!(stream1.status, StreamStatus::Cancelled);
assert_eq!(stream2.status, StreamStatus::Cancelled);
}

#[test]
fn test_batch_withdraw_to_custom_routing() {
let env = Env::default();
env.mock_all_auths();
let (_admin, client) = setup_payment_processor(&env);

let sender = Address::generate(&env);
let recipient = Address::generate(&env);
let destination1 = Address::generate(&env);
let destination2 = Address::generate(&env);
let stream_id1 = String::from_str(&env, "stream_a");
let stream_id2 = String::from_str(&env, "stream_b");

assert_eq!(client.create_stream(&sender, &stream_id1, &recipient, &100i128), Ok(()));
assert_eq!(client.create_stream(&sender, &stream_id2, &recipient, &200i128), Ok(()));

let withdrawal1 = WithdrawalTo {
stream_id: stream_id1.clone(),
destination: destination1.clone(),
amount: 40,
};
let withdrawal2 = WithdrawalTo {
stream_id: stream_id2.clone(),
destination: destination2.clone(),
amount: 150,
};
let withdrawals = vec![&env, withdrawal1, withdrawal2];

let success = client.batch_withdraw_to(&recipient, &withdrawals).unwrap();
assert_eq!(success.len(), 2);

let stream1 = client.get_stream(&stream_id1).unwrap();
let stream2 = client.get_stream(&stream_id2).unwrap();
assert_eq!(stream1.amount, 60);
assert_eq!(stream2.amount, 50);
}

#[test]
fn test_verify_payment_success() {
let env = Env::default();
Expand Down