diff --git a/chain-signatures/contract-sol/src/lib.rs b/chain-signatures/contract-sol/src/lib.rs index 43839a768..48e47e1bb 100644 --- a/chain-signatures/contract-sol/src/lib.rs +++ b/chain-signatures/contract-sol/src/lib.rs @@ -197,6 +197,7 @@ pub struct SignatureRespondedEvent { } #[event] +#[derive(Clone)] pub struct RespondBidirectionalEvent { pub request_id: [u8; 32], pub responder: Pubkey, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index bff97b4d4..10359c00c 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -41,7 +41,24 @@ use tokio::task::JoinHandle; const ROUND_INTERVAL: usize = 512; /// The default timeout budget for organizing and posit phases. -const ORGANIZE_POSIT_TIMEOUT: Duration = Duration::from_secs(20); +/// +/// Tests have stable network conditions and don't benefit from a longer +/// timeout. It only makes them run for longer. +const ORGANIZE_POSIT_TIMEOUT: Duration = Duration::from_secs(if cfg!(feature = "test-feature") { + 5 +} else { + 20 +}); + +/// A proposer tries to include all eligible deliberators but will go ahead with +/// a subset after this timeout, if above the minimum threshold. +/// +/// Use shorter time for tests, as network delays are much smaller. +const ACCEPT_POSIT_TIMEOUT: Duration = Duration::from_millis(if cfg!(feature = "test-feature") { + 500 +} else { + 100 +}); /// All relevant info pertaining to an indexed sign request. #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] @@ -630,6 +647,9 @@ impl SignPositor { let remaining = state.budget.remaining(); let posit_deadline = tokio::time::sleep(remaining); tokio::pin!(posit_deadline); + let accept_deadline = tokio::time::sleep(ACCEPT_POSIT_TIMEOUT); + tokio::pin!(accept_deadline); + let mut accept_deadline_reached = false; let accepted_participants = loop { tokio::select! { @@ -691,27 +711,24 @@ impl SignPositor { return SignPhase::Organizing(SignOrganizer); } - // Start as soon as we have enough accepts - if counter.enough_accepts(ctx.threshold) { - let participants = counter.accepts.into_iter().collect::>(); - tracing::info!(?sign_id, ?round, me = ?ctx.me, ?participants, "proposer broadcasting Start"); - - for &p in &participants { - if p == ctx.me { - continue; - } - ctx.msg - .send( - ctx.me, - p, - PositMessage { - id: PositProtocolId::Signature(sign_id, presignature_id, state.round), - from: ctx.me, - action: PositAction::Start(participants.clone()), - }, - ) - .await; - } + // Starting as soon as we have enough accepts leaves + // participants accepting a bit later in a bad state. + // They will try to become propose in later rounds, + // wasting Presignatures, memory and CPU time. + // + // Instead, wait for at least the `accept_deadline`, + // only nodes answer slower will be left out. This isn't + // perfect but much better than always forcing nodes + // into the bad state. + let ready_to_go = counter.meets_totality() || accept_deadline_reached; + if ready_to_go && counter.enough_accepts(ctx.threshold) { + let participants = Self::start_with_current_accepts( + ctx, + state, + counter, + sign_id, + presignature_id + ).await; break participants; } } @@ -735,6 +752,20 @@ impl SignPositor { state.bump_round(); return SignPhase::Organizing(SignOrganizer); } + _ = &mut accept_deadline, if is_proposer && !accept_deadline_reached => { + accept_deadline_reached = true; + if counter.enough_accepts(ctx.threshold) { + let participants = Self::start_with_current_accepts( + ctx, + state, + counter, + sign_id, + presignature_id + ).await; + break participants; + } + } + } }; @@ -745,6 +776,35 @@ impl SignPositor { accepted_participants, }) } + + async fn start_with_current_accepts( + ctx: &SignTask, + state: &mut SignState, + counter: SinglePositCounter, + sign_id: SignId, + presignature_id: PresignatureId, + ) -> Vec { + let participants = counter.accepts.into_iter().collect::>(); + tracing::info!(?sign_id, round=?state.round, me = ?ctx.me, ?participants, "proposer broadcasting Start"); + + for &p in &participants { + if p == ctx.me { + continue; + } + ctx.msg + .send( + ctx.me, + p, + PositMessage { + id: PositProtocolId::Signature(sign_id, presignature_id, state.round), + from: ctx.me, + action: PositAction::Start(participants.clone()), + }, + ) + .await; + } + participants + } } impl SignGenerating { @@ -1510,3 +1570,8 @@ impl PendingPresignature { } } } + +#[cfg(feature = "test-feature")] +pub fn organize_posit_timeout() -> Duration { + ORGANIZE_POSIT_TIMEOUT +} diff --git a/chain-signatures/node/src/rpc.rs b/chain-signatures/node/src/rpc.rs index 7ee4b881a..33c417d25 100644 --- a/chain-signatures/node/src/rpc.rs +++ b/chain-signatures/node/src/rpc.rs @@ -1600,8 +1600,6 @@ use signet_program::accounts::Respond as SolanaRespondAccount; use signet_program::accounts::RespondBidirectional as SolanaRespondBidirectionalAccount; use signet_program::instruction::Respond as SolanaRespond; use signet_program::instruction::RespondBidirectional as SolanaRespondBidirectional; -use signet_program::AffinePoint as SolanaContractAffinePoint; -use signet_program::Signature as SolanaContractSignature; use solana_sdk::signature::Signer as SolanaSigner; async fn try_publish_sol( sol: &SolanaClient, @@ -1614,14 +1612,7 @@ async fn try_publish_sol( let sign_id = action.indexed.id; let request_ids = vec![action.indexed.id.request_id]; let big_r = signature.big_r.to_encoded_point(false); - let signature = SolanaContractSignature { - big_r: SolanaContractAffinePoint { - x: big_r.as_bytes()[1..33].try_into().unwrap(), - y: big_r.as_bytes()[33..65].try_into().unwrap(), - }, - s: signature.s.to_bytes().into(), - recovery_id: signature.recovery_id, - }; + let signature = crate::util::mpc_to_sol_signature(signature, big_r); tracing::debug!( ?sign_id, diff --git a/chain-signatures/node/src/stream/mod.rs b/chain-signatures/node/src/stream/mod.rs index 4c506eea5..ae7afee5f 100644 --- a/chain-signatures/node/src/stream/mod.rs +++ b/chain-signatures/node/src/stream/mod.rs @@ -24,6 +24,7 @@ pub fn channel() -> (mpsc::Sender, mpsc::Receiver) { /// Unified event produced by a chain stream #[allow(clippy::large_enum_variant)] +#[derive(Clone)] pub enum ChainEvent { SignRequest(IndexedSignRequest), Respond(SignatureRespondedEvent), diff --git a/chain-signatures/node/src/stream/ops.rs b/chain-signatures/node/src/stream/ops.rs index 1f7fe1b1d..415d58dca 100644 --- a/chain-signatures/node/src/stream/ops.rs +++ b/chain-signatures/node/src/stream/ops.rs @@ -132,6 +132,7 @@ impl SignBidirectionalEvent { } } +#[derive(Clone)] pub enum RespondBidirectionalEvent { Solana(signet_program::RespondBidirectionalEvent), Hydration(HydrationRespondBidirectionalEvent), diff --git a/chain-signatures/node/src/util/mod.rs b/chain-signatures/node/src/util/mod.rs index bce4a5e9d..9b839ec07 100644 --- a/chain-signatures/node/src/util/mod.rs +++ b/chain-signatures/node/src/util/mod.rs @@ -71,6 +71,20 @@ impl AffinePointExt for AffinePoint { } } +pub fn mpc_to_sol_signature( + signature: &mpc_primitives::Signature, + big_r: k256::EncodedPoint, +) -> signet_program::Signature { + signet_program::Signature { + big_r: signet_program::AffinePoint { + x: big_r.as_bytes()[1..33].try_into().unwrap(), + y: big_r.as_bytes()[33..65].try_into().unwrap(), + }, + s: signature.s.to_bytes().into(), + recovery_id: signature.recovery_id, + } +} + pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool { if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) { let timeout = Duration::from_millis(timeout); diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 463ec4b85..224ce003b 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -7,6 +7,7 @@ use crate::mpc_fixture::fixture_tasks::MessageFilter; use crate::mpc_fixture::input::FixtureInput; use crate::mpc_fixture::message_collector::CollectMessages; use crate::mpc_fixture::mock_governance::MockGovernance; +use crate::mpc_fixture::mock_stream::MockStream; use crate::mpc_fixture::{fixture_tasks, MpcFixture, MpcFixtureNode}; use cait_sith::protocol::Participant; use mpc_contract::config::{ @@ -31,6 +32,7 @@ use mpc_node::protocol::{self, MessageChannel, MpcSignProtocol, ProtocolState}; use mpc_node::rpc::ContractStateWatcher; use mpc_node::rpc::RpcChannel; use mpc_node::storage::{secret_storage, triple_storage::TriplePair, Options}; +use mpc_primitives::Chain; use near_sdk::AccountId; use std::collections::HashMap; use std::sync::Arc; @@ -56,6 +58,7 @@ struct MpcFixtureNodeBuilder { config: Config, messaging: NodeMessagingBuilder, key_info: Option, + mock_streams: HashMap, } /// Config options for the test setup. @@ -420,6 +423,22 @@ impl MpcFixtureBuilder { .with_node_min_triples(0) .with_node_min_presignatures(0) } + + /// Add a mock stream to all nodes. + /// + /// Each node will have a independent deep-clone of the provided stream. + /// Events are thus delivered to all nodes. + pub async fn with_mock_stream(mut self, chain: Chain, stream: MockStream) -> Self { + for node in &mut self.prepared_nodes { + let cloned = stream.deep_clone().await; + let prev = node.mock_streams.insert(chain, cloned); + assert!( + prev.is_none(), + "test setup only supports one stream per chain" + ); + } + self + } } impl MpcFixtureNodeBuilder { @@ -467,6 +486,7 @@ impl MpcFixtureNodeBuilder { config, messaging, key_info: None, + mock_streams: Default::default(), } } @@ -532,6 +552,17 @@ impl MpcFixtureNodeBuilder { mesh_rx.clone(), )); + let backlog = Backlog::new(); + + let flat_mock_streams = self.mock_streams.values().cloned().collect::>(); + fixture_tasks::start_mock_stream_tasks( + &flat_mock_streams, + sign_tx.clone(), + backlog.clone(), + context.contract_state.clone(), + &mesh_rx, + ); + // handle outbox messages manually, we want them before they are // encrypted and we want to send them directly to other node's inboxes let _mock_network_handle = fixture_tasks::test_mock_network( @@ -542,6 +573,7 @@ impl MpcFixtureNodeBuilder { mesh_tx.clone(), config_tx.clone(), self.messaging.filter, + flat_mock_streams.clone(), ); // --- SyncChannel and SyncTask setup --- @@ -563,9 +595,10 @@ impl MpcFixtureNodeBuilder { config: config_tx, sign_tx, msg_channel: self.messaging.channel, + mock_streams: self.mock_streams, triple_storage, presignature_storage, - backlog: Backlog::new(), + backlog, sync_channel, web_handle: None, }; diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 28acba891..3a3819014 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -3,16 +3,18 @@ use crate::containers::Redis; use crate::mpc_fixture::message_collector::{CollectMessages, MessagePrinter}; +use crate::mpc_fixture::mock_stream::MockStream; use cait_sith::protocol::Participant; use mpc_node::backlog::Backlog; use mpc_node::config::Config; use mpc_node::mesh::MeshState; use mpc_node::protocol::state::NodeStateWatcher; use mpc_node::protocol::sync::{SyncChannel, SyncUpdate}; -use mpc_node::protocol::{MessageChannel, ProtocolState, Sign}; +use mpc_node::protocol::{IndexedSignRequest, MessageChannel, ProtocolState, Sign}; use mpc_node::storage::{PresignatureStorage, TripleStorage}; +use mpc_primitives::Chain; use near_sdk::AccountId; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -33,6 +35,7 @@ pub struct MpcFixtureNode { pub sign_tx: mpsc::Sender, pub msg_channel: MessageChannel, + pub mock_streams: HashMap, pub triple_storage: TripleStorage, pub presignature_storage: PresignatureStorage, @@ -131,11 +134,24 @@ impl MpcFixture { let actions: tokio::sync::MutexGuard<'_, HashSet> = self.output.rpc_actions.lock().await; - tracing::info!("All published RPC actions:"); + tracing::info!(count = actions.len(), "All published RPC actions:"); for action in actions.iter() { tracing::info!("{action}"); } } + + /// Add a block that contains signature requesting events, visible to all + /// nodes, then progress the chain to execute it immediately. + pub async fn process_sign_requests(&self, chain: Chain, requests: &[IndexedSignRequest]) { + for node in &self.nodes { + let stream = node + .mock_streams + .get(&chain) + .expect("must have mock stream configured"); + stream.prepare_block_of_sign_requests(requests).await; + stream.progress_block_height(1).await; + } + } } impl MpcFixtureNode { diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index c740897e4..0a6d22add 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -2,20 +2,26 @@ //! passing between nodes and updates to the governance smart contract. use crate::mpc_fixture::fixture_interface::SharedOutput; +use crate::mpc_fixture::mock_stream::MockStream; use cait_sith::protocol::Participant; use mpc_keys::hpke::Ciphered; +use mpc_node::backlog::Backlog; use mpc_node::config::Config; use mpc_node::mesh::MeshState; +use mpc_node::node_client::NodeClient; use mpc_node::protocol::message::{MessageOutbox, SendMessage, SignedMessage}; -use mpc_node::rpc::RpcAction; +use mpc_node::protocol::Sign; +use mpc_node::rpc::{ContractStateWatcher, RpcAction}; +use mpc_node::stream::run_stream; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::watch; use tokio::task::JoinHandle; pub type MessageFilter = Box bool + Send>; +#[allow(clippy::too_many_arguments)] pub(super) fn test_mock_network( routing_table: HashMap>, shared_output: &SharedOutput, @@ -24,6 +30,7 @@ pub(super) fn test_mock_network( mesh: watch::Sender, config: watch::Sender, mut filter: MessageFilter, + mock_streams: Vec, ) -> JoinHandle<()> { let msg_log = Arc::clone(&shared_output.msg_log); let rpc_actions = Arc::clone(&shared_output.rpc_actions); @@ -67,7 +74,7 @@ pub(super) fn test_mock_network( } Some(rpc) = rpc_rx.recv() => { - let action_str = match rpc { + let action_str = match &rpc { RpcAction::Publish(publish_action) => { format!( "RpcAction::Publish({:?})", @@ -78,6 +85,10 @@ pub(super) fn test_mock_network( tracing::info!(target: "mock_network", ?action_str, "Received RPC action"); let mut actions_log = rpc_actions.lock().await; actions_log.insert(action_str); + let block = [rpc]; + for stream in &mock_streams { + stream.prepare_block_of_rpc_actions(&block).await; + } } else => { @@ -89,3 +100,23 @@ pub(super) fn test_mock_network( tracing::info!(target: "mock_network", "Test mock network task exited"); }) } + +pub(super) fn start_mock_stream_tasks( + mock_streams: &[MockStream], + sign_tx: mpsc::Sender, + backlog: Backlog, + contract_watcher: ContractStateWatcher, + mesh_state: &watch::Receiver, +) { + for stream in mock_streams { + tokio::spawn(run_stream( + stream.clone(), + sign_tx.clone(), + backlog.clone(), + contract_watcher.clone(), + mesh_state.clone(), + // Only used for backlog recovery - not implemented in component tests yet + NodeClient::new(&Default::default()), + )); + } +} diff --git a/integration-tests/src/mpc_fixture/mock_stream.rs b/integration-tests/src/mpc_fixture/mock_stream.rs new file mode 100644 index 000000000..44da8c561 --- /dev/null +++ b/integration-tests/src/mpc_fixture/mock_stream.rs @@ -0,0 +1,142 @@ +use elliptic_curve::sec1::ToEncodedPoint; +use mpc_node::protocol::{IndexedSignRequest, SignKind}; +use mpc_node::rpc::RpcAction; +use mpc_node::stream::{ChainEvent, ChainStream}; +use mpc_primitives::Chain; +use solana_sdk::pubkey::Pubkey; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; + +#[derive(Default, Clone)] +pub struct MockStream { + inner: Arc>, +} + +#[derive(Default)] +pub struct InnerMockStream { + block_height: u64, + /// Events for blocks >= `block_height`, not ready to be published, yet. + future_blocks: Vec>, + /// Events already produced < `block_height` but not yet consumed by + /// `next_event()`. + pending_events: Vec, +} + +impl ChainStream for MockStream { + const CHAIN: Chain = Chain::Solana; + + async fn start(&mut self) { + let mut guard = self.inner.lock().await; + guard.pending_events.push(ChainEvent::CatchupCompleted); + } + + async fn next_event(&mut self) -> Option { + loop { + let mut guard = self.inner.lock().await; + let out = guard.pending_events.pop(); + if out.is_some() { + return out; + } + // TODO: would be better to avoid sleep by awaiting new data + tokio::time::sleep(Duration::from_millis(10)).await; + } + } +} + +impl MockStream { + pub async fn deep_clone(&self) -> Self { + let guard = self.inner.lock().await; + let cloned = InnerMockStream { + block_height: guard.block_height, + future_blocks: guard.future_blocks.clone(), + pending_events: guard.pending_events.clone(), + }; + Self { + inner: Arc::new(Mutex::new(cloned)), + } + } + + pub async fn progress_block_height(&self, steps: usize) { + let mut guard = self.inner.lock().await; + guard.progress_block_height(steps) + } + + /// Add a future block that contains signature requesting events. + pub async fn prepare_block_of_sign_requests(&self, requests: &[IndexedSignRequest]) { + let mut guard = self.inner.lock().await; + guard.prepare_block_of_sign_requests(requests) + } + + /// Add a future block that contains events corresponding to the provided rpc actions. + pub async fn prepare_block_of_rpc_actions(&self, actions: &[RpcAction]) { + let mut guard = self.inner.lock().await; + guard.prepare_block_of_rpc_actions(actions) + } +} + +impl InnerMockStream { + /// Move events from future blocks tp pending blocks. + pub fn progress_block_height(&mut self, steps: usize) { + let checked_steps = steps.min(self.future_blocks.len()); + for mut block in self.future_blocks.drain(0..checked_steps) { + self.pending_events.append(&mut block); + self.pending_events + .push(ChainEvent::Block(self.block_height)); + self.block_height += 1; + } + } + + /// Add a future block that contains signature requesting events. + pub fn prepare_block_of_sign_requests(&mut self, requests: &[IndexedSignRequest]) { + let mut block = Vec::new(); + + for request in requests { + // Skip events for other chains + if request.chain != MockStream::CHAIN { + continue; + } + + block.push(ChainEvent::SignRequest(request.clone())) + } + + self.future_blocks.push(block); + } + + /// Add a future block that contains events corresponding to the provided rpc actions. + pub fn prepare_block_of_rpc_actions(&mut self, actions: &[RpcAction]) { + let mut block = Vec::new(); + + for action in actions { + let RpcAction::Publish(publish_action) = action; + + // Skip events for other chains + if publish_action.indexed.chain != MockStream::CHAIN { + continue; + } + + // for now, the mock stream only converts signature RPC actions to chain events + if !matches!(publish_action.indexed.kind, SignKind::Sign,) { + tracing::warn!( + kind=?publish_action.indexed.kind, + "kind not yet supported in test framework", + ); + continue; + } + + // type conversions that would usually happen in RPC publishing -> Solana contract -> CPI event library + let big_r = publish_action.signature.big_r.to_encoded_point(false); + let sol_event = signet_program::SignatureRespondedEvent { + request_id: publish_action.indexed.id.request_id, + responder: Pubkey::new_unique(), + signature: mpc_node::util::mpc_to_sol_signature(&publish_action.signature, big_r), + }; + + let respond_event = mpc_node::stream::ops::SignatureRespondedEvent::Solana(sol_event); + + block.push(ChainEvent::Respond(respond_event)); + } + + self.future_blocks.push(block); + } +} diff --git a/integration-tests/src/mpc_fixture/mod.rs b/integration-tests/src/mpc_fixture/mod.rs index c867bfb5a..ec8f91e27 100644 --- a/integration-tests/src/mpc_fixture/mod.rs +++ b/integration-tests/src/mpc_fixture/mod.rs @@ -8,6 +8,7 @@ pub mod fixture_tasks; pub mod input; pub mod message_collector; pub mod mock_governance; +pub mod mock_stream; pub use builder::MpcFixtureBuilder; pub use fixture_interface::{MpcFixture, MpcFixtureNode}; diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index 0e6f2c9ea..069360d0c 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -7,6 +7,8 @@ use mpc_node::protocol::presignature::Presignature; use mpc_node::protocol::triple::Triple; use mpc_node::storage::triple_storage::TriplePair; use mpc_node::storage::{PresignatureStorage, TripleStorage}; +use mpc_primitives::{SignArgs, LATEST_MPC_KEY_VERSION}; +use sha2::Digest; pub(crate) fn dummy_presignature(id: u64) -> Presignature { dummy_presignature_with_holders(id, vec![Participant::from(1), Participant::from(2)]) @@ -133,3 +135,19 @@ pub(crate) async fn assert_presig_owned_state( ); } } + +pub fn test_sign_arg(seed: impl Into) -> SignArgs { + let seed = seed.into(); + // entropy should have well-distributed bits even in tests + let entropy: [u8; 32] = sha2::Sha256::digest(seed.to_be_bytes()) + .as_slice() + .try_into() + .expect("digest length should be 32"); + SignArgs { + entropy, + epsilon: k256::Scalar::default(), + payload: k256::Scalar::default(), + path: "test".to_owned(), + key_version: LATEST_MPC_KEY_VERSION, + } +} diff --git a/integration-tests/tests/cases/mod.rs b/integration-tests/tests/cases/mod.rs index 5588b164e..1f50efaec 100644 --- a/integration-tests/tests/cases/mod.rs +++ b/integration-tests/tests/cases/mod.rs @@ -21,6 +21,7 @@ pub mod ethereum; pub mod ethereum_stream; pub mod helpers; pub mod mpc; +pub mod mpc_with_stream; pub mod nightly; pub mod solana; pub mod solana_stream; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 5a0897870..aae87c8ae 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -5,7 +5,7 @@ use integration_tests::mpc_fixture::MpcFixtureBuilder; use mpc_node::protocol::presignature::Presignature; use mpc_node::protocol::{Chain, IndexedSignRequest, ProtocolState, Sign}; use mpc_node::storage::triple_storage::TriplePair; -use mpc_primitives::{SignArgs, SignId, LATEST_MPC_KEY_VERSION}; +use mpc_primitives::SignId; use test_log::test; use tokio::sync::oneshot; use tokio::sync::Mutex; @@ -217,24 +217,12 @@ async fn test_basic_sign() { fn sign_request(seed: u8) -> Sign { Sign::Request(IndexedSignRequest::sign( SignId::new([seed; 32]), - sign_arg(seed), + super::helpers::test_sign_arg(seed), Chain::NEAR, 0, )) } -fn sign_arg(seed: u8) -> SignArgs { - let mut entropy = [1; 32]; - entropy[0] = seed; - SignArgs { - entropy, - epsilon: k256::Scalar::default(), - payload: k256::Scalar::default(), - path: "test".to_owned(), - key_version: LATEST_MPC_KEY_VERSION, - } -} - /// drop the first 20 presignature messages on each node and see if the system /// can recover #[test(tokio::test(flavor = "multi_thread"))] diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs new file mode 100644 index 000000000..4ecf467cd --- /dev/null +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -0,0 +1,147 @@ +//! Component test that combine the MPC network combined with a chain stream as input and output. + +use integration_tests::mpc_fixture::{mock_stream::MockStream, MpcFixtureBuilder}; +use mpc_node::protocol::IndexedSignRequest; +use mpc_primitives::{Chain, SignId}; +use std::time::Duration; +use test_log::test; + +fn sign_request(seed: u32) -> IndexedSignRequest { + let bytes = [ + seed.to_be_bytes()[0], + seed.to_be_bytes()[1], + seed.to_be_bytes()[2], + seed.to_be_bytes()[3], + ] + .repeat(8); + IndexedSignRequest::sign( + SignId::new(bytes.try_into().unwrap()), + super::helpers::test_sign_arg(seed), + Chain::Solana, + 0, + ) +} + +/// Simple test, mostly just here to check the MockStream setup is working. +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sign() { + let network = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_mock_stream(Chain::Solana, MockStream::default()) + .await + .build() + .await; + + tracing::info!("sending requests now"); + network + .process_sign_requests(Chain::Solana, &[sign_request(0)]) + .await; + + let timeout = Duration::from_secs(10); + let actions = network.assert_actions(1, timeout).await; + + assert_eq!(actions.len(), 1); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); +} + +/// Common checker function called with different parameters in test case below. +async fn check_channel_contention( + // number of blocks with requests to send + num_blocks: usize, + // number of requests within each block + req_per_block: usize, + // how many signatures should be generated successfully, usually + // `num_blocks` * `req_per_block` + expected_signatures: usize, + // add an observation delay between nodes + observation_delay: Option, +) { + let num_nodes = 3; + let threshold = 2; + let network = MpcFixtureBuilder::new(num_nodes as u32, threshold) + .only_generate_signatures() + .with_mock_stream(Chain::Solana, MockStream::default()) + .await + .build() + .await; + + // prepare blocks but do not send process them, yet + for outer in 0..(num_blocks as u16) { + let requests = (0..req_per_block) + .map(|inner| sign_request(outer as u32 * req_per_block as u32 + inner as u32)) + .collect::>(); + + for i in 0..num_nodes { + network[i] + .mock_streams + .get(&Chain::Solana) + .unwrap() + .prepare_block_of_sign_requests(&requests) + .await; + } + } + + // start sending requests, with optional observation delays between nodes + for i in 0..num_nodes { + network[i] + .mock_streams + .get(&Chain::Solana) + .unwrap() + .progress_block_height(num_blocks) + .await; + if let Some(delay) = observation_delay { + tokio::time::sleep(delay).await; + } + } + + let actions = network + .assert_actions(expected_signatures, Duration::from_secs(60)) + .await; + + assert_eq!(actions.len(), expected_signatures); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_many_requests_per_block() { + check_channel_contention(1, 50, 50, None).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_multiple_blocks_at_once() { + check_channel_contention(5, 10, 50, None).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_multiple_blocks_at_once_delayed() { + // TODO: delay should be > ORGANIZE_POSIT_TIMEOUT and still work but currently doesn't + let delay = mpc_node::protocol::signature::organize_posit_timeout() / 2; + check_channel_contention(5, 10, 50, Some(delay)).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_show_limit() { + // There are exactly enough presignatures in the fixture input for 75 signatures. + check_channel_contention(6, 50, 75, None).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_10k_requests() { + // sending 100 x 100 requests at once + check_channel_contention(100, 100, 75, None).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +#[allow(non_snake_case)] +async fn test_channel_contention_1M_requests() { + // sending 1000 x 1000 requests at once + check_channel_contention(1000, 1000, 75, None).await; +}