From 82a1555c4b658df4ddbd52346e4c643a8d69a9b7 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 2 Apr 2026 21:47:44 +0200 Subject: [PATCH 1/9] test: MPC + stream component test Create a MockStream that connects to the MPC fixture setup. This lets us test the indexer stream <-> MPC glue. --- chain-signatures/contract-sol/src/lib.rs | 1 + chain-signatures/node/src/rpc.rs | 11 +- chain-signatures/node/src/stream/mod.rs | 1 + chain-signatures/node/src/stream/ops.rs | 1 + chain-signatures/node/src/util/mod.rs | 14 ++ integration-tests/src/mpc_fixture/builder.rs | 28 +++- .../src/mpc_fixture/fixture_interface.rs | 2 + .../src/mpc_fixture/fixture_tasks.rs | 37 ++++- .../src/mpc_fixture/mock_stream.rs | 140 ++++++++++++++++++ integration-tests/src/mpc_fixture/mod.rs | 1 + integration-tests/tests/cases/helpers.rs | 13 ++ integration-tests/tests/cases/mod.rs | 1 + integration-tests/tests/cases/mpc.rs | 16 +- .../tests/cases/mpc_with_stream.rs | 47 ++++++ 14 files changed, 285 insertions(+), 28 deletions(-) create mode 100644 integration-tests/src/mpc_fixture/mock_stream.rs create mode 100644 integration-tests/tests/cases/mpc_with_stream.rs diff --git a/chain-signatures/contract-sol/src/lib.rs b/chain-signatures/contract-sol/src/lib.rs index 43839a768..48e47e1bb 100644 --- a/chain-signatures/contract-sol/src/lib.rs +++ b/chain-signatures/contract-sol/src/lib.rs @@ -197,6 +197,7 @@ pub struct SignatureRespondedEvent { } #[event] +#[derive(Clone)] pub struct RespondBidirectionalEvent { pub request_id: [u8; 32], pub responder: Pubkey, diff --git a/chain-signatures/node/src/rpc.rs b/chain-signatures/node/src/rpc.rs index 7260d7461..b90f6f848 100644 --- a/chain-signatures/node/src/rpc.rs +++ b/chain-signatures/node/src/rpc.rs @@ -1830,8 +1830,6 @@ use signet_program::accounts::Respond as SolanaRespondAccount; use signet_program::accounts::RespondBidirectional as SolanaRespondBidirectionalAccount; use signet_program::instruction::Respond as SolanaRespond; use signet_program::instruction::RespondBidirectional as SolanaRespondBidirectional; -use signet_program::AffinePoint as SolanaContractAffinePoint; -use signet_program::Signature as SolanaContractSignature; use solana_sdk::signature::Signer as SolanaSigner; async fn try_publish_sol( sol: &SolanaClient, @@ -1844,14 +1842,7 @@ async fn try_publish_sol( let sign_id = action.indexed.id; let request_ids = vec![action.indexed.id.request_id]; let big_r = signature.big_r.to_encoded_point(false); - let signature = SolanaContractSignature { - big_r: SolanaContractAffinePoint { - x: big_r.as_bytes()[1..33].try_into().unwrap(), - y: big_r.as_bytes()[33..65].try_into().unwrap(), - }, - s: signature.s.to_bytes().into(), - recovery_id: signature.recovery_id, - }; + let signature = crate::util::mpc_to_sol_signature(signature, big_r); tracing::debug!( ?sign_id, diff --git a/chain-signatures/node/src/stream/mod.rs b/chain-signatures/node/src/stream/mod.rs index c8a2984ed..d0d295bdf 100644 --- a/chain-signatures/node/src/stream/mod.rs +++ b/chain-signatures/node/src/stream/mod.rs @@ -27,6 +27,7 @@ pub fn channel() -> (mpsc::Sender, mpsc::Receiver) { /// Unified event produced by a chain stream #[allow(clippy::large_enum_variant)] +#[derive(Clone)] pub enum ChainEvent { SignRequest(IndexedSignRequest), Respond(SignatureRespondedEvent), diff --git a/chain-signatures/node/src/stream/ops.rs b/chain-signatures/node/src/stream/ops.rs index 55bc277fe..8c4e7682a 100644 --- a/chain-signatures/node/src/stream/ops.rs +++ b/chain-signatures/node/src/stream/ops.rs @@ -171,6 +171,7 @@ impl SignBidirectionalEvent { } } +#[derive(Clone)] pub enum RespondBidirectionalEvent { Solana(signet_program::RespondBidirectionalEvent), Hydration(HydrationRespondBidirectionalEvent), diff --git a/chain-signatures/node/src/util/mod.rs b/chain-signatures/node/src/util/mod.rs index bce4a5e9d..9b839ec07 100644 --- a/chain-signatures/node/src/util/mod.rs +++ b/chain-signatures/node/src/util/mod.rs @@ -71,6 +71,20 @@ impl AffinePointExt for AffinePoint { } } +pub fn mpc_to_sol_signature( + signature: &mpc_primitives::Signature, + big_r: k256::EncodedPoint, +) -> signet_program::Signature { + signet_program::Signature { + big_r: signet_program::AffinePoint { + x: big_r.as_bytes()[1..33].try_into().unwrap(), + y: big_r.as_bytes()[33..65].try_into().unwrap(), + }, + s: signature.s.to_bytes().into(), + recovery_id: signature.recovery_id, + } +} + pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool { if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) { let timeout = Duration::from_millis(timeout); diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 93c42ddd4..0ac253b3b 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -7,6 +7,7 @@ use crate::mpc_fixture::fixture_tasks::MessageFilter; use crate::mpc_fixture::input::FixtureInput; use crate::mpc_fixture::message_collector::CollectMessages; use crate::mpc_fixture::mock_governance::MockGovernance; +use crate::mpc_fixture::mock_stream::MockStream; use crate::mpc_fixture::{fixture_tasks, MpcFixture, MpcFixtureNode}; use cait_sith::protocol::Participant; @@ -57,6 +58,7 @@ struct MpcFixtureNodeBuilder { config: Config, messaging: NodeMessagingBuilder, key_info: Option, + mock_streams: Vec, } /// Config options for the test setup. @@ -421,6 +423,17 @@ impl MpcFixtureBuilder { .with_node_min_triples(0) .with_node_min_presignatures(0) } + + /// Add a mock stream to all nodes. + /// + /// Each node will have a independent deep-clone of the provided stream. + /// Events are thus delivered to all nodes. + pub async fn with_mock_stream(mut self, stream: MockStream) -> Self { + for node in &mut self.prepared_nodes { + node.mock_streams.push(stream.deep_clone().await) + } + self + } } impl MpcFixtureNodeBuilder { @@ -468,6 +481,7 @@ impl MpcFixtureNodeBuilder { config, messaging, key_info: None, + mock_streams: vec![], } } @@ -533,6 +547,16 @@ impl MpcFixtureNodeBuilder { mesh_rx.clone(), )); + let backlog = Backlog::new(); + + fixture_tasks::start_mock_stream_tasks( + &self.mock_streams, + sign_tx.clone(), + backlog.clone(), + context.contract_state, + &mesh_rx, + ); + // handle outbox messages manually, we want them before they are // encrypted and we want to send them directly to other node's inboxes let _mock_network_handle = fixture_tasks::test_mock_network( @@ -543,6 +567,7 @@ impl MpcFixtureNodeBuilder { mesh_tx.clone(), config_tx.clone(), self.messaging.filter, + self.mock_streams.clone(), ); // --- SyncChannel and SyncTask setup --- @@ -564,9 +589,10 @@ impl MpcFixtureNodeBuilder { config: config_tx, sign_tx, msg_channel: self.messaging.channel, + mock_streams: self.mock_streams, triple_storage, presignature_storage, - backlog: Backlog::new(), + backlog, sync_channel, web_handle: None, }; diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 6ec039cb6..7a2a69ef1 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -3,6 +3,7 @@ use crate::containers::Redis; use crate::mpc_fixture::message_collector::{CollectMessages, MessagePrinter}; +use crate::mpc_fixture::mock_stream::MockStream; use cait_sith::protocol::Participant; use mpc_node::backlog::Backlog; use mpc_node::config::Config; @@ -34,6 +35,7 @@ pub struct MpcFixtureNode { pub sign_tx: mpsc::Sender, pub msg_channel: MessageChannel, + pub mock_streams: Vec, pub triple_storage: TripleStorage, pub presignature_storage: PresignatureStorage, diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index c740897e4..9576e2587 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -2,20 +2,26 @@ //! passing between nodes and updates to the governance smart contract. use crate::mpc_fixture::fixture_interface::SharedOutput; +use crate::mpc_fixture::mock_stream::MockStream; use cait_sith::protocol::Participant; use mpc_keys::hpke::Ciphered; +use mpc_node::backlog::Backlog; use mpc_node::config::Config; use mpc_node::mesh::MeshState; +use mpc_node::node_client::NodeClient; use mpc_node::protocol::message::{MessageOutbox, SendMessage, SignedMessage}; -use mpc_node::rpc::RpcAction; +use mpc_node::protocol::Sign; +use mpc_node::rpc::{ContractStateWatcher, RpcAction}; +use mpc_node::stream::run_stream; use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::watch; use tokio::task::JoinHandle; pub type MessageFilter = Box bool + Send>; +#[allow(clippy::too_many_arguments)] pub(super) fn test_mock_network( routing_table: HashMap>, shared_output: &SharedOutput, @@ -24,6 +30,7 @@ pub(super) fn test_mock_network( mesh: watch::Sender, config: watch::Sender, mut filter: MessageFilter, + mock_streams: Vec, ) -> JoinHandle<()> { let msg_log = Arc::clone(&shared_output.msg_log); let rpc_actions = Arc::clone(&shared_output.rpc_actions); @@ -67,7 +74,7 @@ pub(super) fn test_mock_network( } Some(rpc) = rpc_rx.recv() => { - let action_str = match rpc { + let action_str = match &rpc { RpcAction::Publish(publish_action) => { format!( "RpcAction::Publish({:?})", @@ -78,6 +85,10 @@ pub(super) fn test_mock_network( tracing::info!(target: "mock_network", ?action_str, "Received RPC action"); let mut actions_log = rpc_actions.lock().await; actions_log.insert(action_str); + let block = [rpc]; + for stream in &mock_streams { + stream.rpc_actions(&block).await; + } } else => { @@ -89,3 +100,23 @@ pub(super) fn test_mock_network( tracing::info!(target: "mock_network", "Test mock network task exited"); }) } + +pub(super) fn start_mock_stream_tasks( + mock_streams: &[MockStream], + sign_tx: mpsc::Sender, + backlog: Backlog, + contract_watcher: ContractStateWatcher, + mesh_state: &watch::Receiver, +) { + for stream in mock_streams { + tokio::spawn(run_stream( + stream.clone(), + sign_tx.clone(), + backlog.clone(), + contract_watcher.clone(), + mesh_state.clone(), + // Only used for backlog recovery - not implemented in component tests yet + NodeClient::new(&Default::default()), + )); + } +} diff --git a/integration-tests/src/mpc_fixture/mock_stream.rs b/integration-tests/src/mpc_fixture/mock_stream.rs new file mode 100644 index 000000000..19b4912ff --- /dev/null +++ b/integration-tests/src/mpc_fixture/mock_stream.rs @@ -0,0 +1,140 @@ +use elliptic_curve::sec1::ToEncodedPoint; +use mpc_node::protocol::{IndexedSignRequest, SignKind}; +use mpc_node::rpc::RpcAction; +use mpc_node::stream::{ChainEvent, ChainStream}; +use mpc_primitives::Chain; +use solana_sdk::pubkey::Pubkey; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; + +#[derive(Default, Clone)] +pub struct MockStream { + inner: Arc>, +} + +#[derive(Default)] +pub struct InnerMockStream { + block_height: u64, + /// Events for blocks >= `block_height`, not ready to be published, yet. + future_blocks: Vec>, + /// Events already produced < `block_height` but not yet consumed by + /// `next_event()`. + pending_events: Vec, +} + +impl ChainStream for MockStream { + const CHAIN: Chain = Chain::Solana; + + async fn start(&mut self) { + let mut guard = self.inner.lock().await; + guard.pending_events.push(ChainEvent::CatchupCompleted); + } + + async fn next_event(&mut self) -> Option { + loop { + let mut guard = self.inner.lock().await; + let out = guard.pending_events.pop(); + if out.is_some() { + return out; + } + // TODO: would be better to avoid sleep by awaiting new data + tokio::time::sleep(Duration::from_millis(10)).await; + } + } +} + +impl MockStream { + pub async fn deep_clone(&self) -> Self { + let guard = self.inner.lock().await; + let cloned = InnerMockStream { + block_height: guard.block_height, + future_blocks: guard.future_blocks.clone(), + pending_events: guard.pending_events.clone(), + }; + Self { + inner: Arc::new(Mutex::new(cloned)), + } + } + + pub async fn progress_block_height(&self, steps: usize) { + let mut guard = self.inner.lock().await; + guard.progress_block_height(steps) + } + + pub async fn sign_requests(&self, requests: &[IndexedSignRequest]) { + let mut guard = self.inner.lock().await; + guard.sign_requests(requests) + } + + pub async fn rpc_actions(&self, actions: &[RpcAction]) { + let mut guard = self.inner.lock().await; + guard.rpc_actions(actions) + } +} + +impl InnerMockStream { + /// Move events from future blocks tp pending blocks. + pub fn progress_block_height(&mut self, steps: usize) { + let checked_steps = steps.min(self.future_blocks.len()); + for mut block in self.future_blocks.drain(0..checked_steps) { + self.pending_events.append(&mut block); + self.pending_events + .push(ChainEvent::Block(self.block_height)); + self.block_height += 1; + } + } + + /// Add a future block that contains signature requesting events. + pub fn sign_requests(&mut self, requests: &[IndexedSignRequest]) { + let mut block = Vec::new(); + + for request in requests { + // Skip events for other chains + if request.chain != MockStream::CHAIN { + continue; + } + + block.push(ChainEvent::SignRequest(request.clone())) + } + + self.future_blocks.push(block); + } + + /// Add a future block that contains events corresponding to the provided rpc actions. + pub fn rpc_actions(&mut self, actions: &[RpcAction]) { + let mut block = Vec::new(); + + for action in actions { + let RpcAction::Publish(publish_action) = action; + + // Skip events for other chains + if publish_action.indexed.chain != MockStream::CHAIN { + continue; + } + + // for now, the mock stream only converts signature RPC actions to chain events + if !matches!(publish_action.indexed.kind, SignKind::Sign,) { + tracing::warn!( + kind=?publish_action.indexed.kind, + "kind not yet supported in test framework", + ); + continue; + } + + // type conversions that would usually happen in RPC publishing -> Solana contract -> CPI event library + let big_r = publish_action.signature.big_r.to_encoded_point(false); + let sol_event = signet_program::SignatureRespondedEvent { + request_id: publish_action.indexed.id.request_id, + responder: Pubkey::new_unique(), + signature: mpc_node::util::mpc_to_sol_signature(&publish_action.signature, big_r), + }; + + let respond_event = mpc_node::stream::ops::SignatureRespondedEvent::Solana(sol_event); + + block.push(ChainEvent::Respond(respond_event)); + } + + self.future_blocks.push(block); + } +} diff --git a/integration-tests/src/mpc_fixture/mod.rs b/integration-tests/src/mpc_fixture/mod.rs index c867bfb5a..ec8f91e27 100644 --- a/integration-tests/src/mpc_fixture/mod.rs +++ b/integration-tests/src/mpc_fixture/mod.rs @@ -8,6 +8,7 @@ pub mod fixture_tasks; pub mod input; pub mod message_collector; pub mod mock_governance; +pub mod mock_stream; pub use builder::MpcFixtureBuilder; pub use fixture_interface::{MpcFixture, MpcFixtureNode}; diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index 0e6f2c9ea..fb7837de9 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -7,6 +7,7 @@ use mpc_node::protocol::presignature::Presignature; use mpc_node::protocol::triple::Triple; use mpc_node::storage::triple_storage::TriplePair; use mpc_node::storage::{PresignatureStorage, TripleStorage}; +use mpc_primitives::{SignArgs, LATEST_MPC_KEY_VERSION}; pub(crate) fn dummy_presignature(id: u64) -> Presignature { dummy_presignature_with_holders(id, vec![Participant::from(1), Participant::from(2)]) @@ -133,3 +134,15 @@ pub(crate) async fn assert_presig_owned_state( ); } } + +pub fn test_sign_arg(seed: u8) -> SignArgs { + let mut entropy = [1; 32]; + entropy[0] = seed; + SignArgs { + entropy, + epsilon: k256::Scalar::default(), + payload: k256::Scalar::default(), + path: "test".to_owned(), + key_version: LATEST_MPC_KEY_VERSION, + } +} diff --git a/integration-tests/tests/cases/mod.rs b/integration-tests/tests/cases/mod.rs index d396f3302..3ff22333f 100644 --- a/integration-tests/tests/cases/mod.rs +++ b/integration-tests/tests/cases/mod.rs @@ -23,6 +23,7 @@ pub mod ethereum; pub mod ethereum_stream; pub mod helpers; pub mod mpc; +pub mod mpc_with_stream; pub mod nightly; pub mod solana; pub mod solana_stream; diff --git a/integration-tests/tests/cases/mpc.rs b/integration-tests/tests/cases/mpc.rs index 562d169da..dd1c09715 100644 --- a/integration-tests/tests/cases/mpc.rs +++ b/integration-tests/tests/cases/mpc.rs @@ -5,7 +5,7 @@ use integration_tests::mpc_fixture::MpcFixtureBuilder; use mpc_node::protocol::presignature::Presignature; use mpc_node::protocol::{Chain, IndexedSignRequest, ProtocolState, Sign}; use mpc_node::storage::triple_storage::TriplePair; -use mpc_primitives::{SignArgs, SignId, LATEST_MPC_KEY_VERSION}; +use mpc_primitives::SignId; use test_log::test; use tokio::sync::oneshot; use tokio::sync::Mutex; @@ -279,24 +279,12 @@ async fn test_sign_request_during_resharing() { fn sign_request(seed: u8) -> Sign { Sign::Request(IndexedSignRequest::sign( SignId::new([seed; 32]), - sign_arg(seed), + super::helpers::test_sign_arg(seed), Chain::NEAR, 0, )) } -fn sign_arg(seed: u8) -> SignArgs { - let mut entropy = [1; 32]; - entropy[0] = seed; - SignArgs { - entropy, - epsilon: k256::Scalar::default(), - payload: k256::Scalar::default(), - path: "test".to_owned(), - key_version: LATEST_MPC_KEY_VERSION, - } -} - /// drop the first 20 presignature messages on each node and see if the system /// can recover #[test(tokio::test(flavor = "multi_thread"))] diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs new file mode 100644 index 000000000..0e207249b --- /dev/null +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -0,0 +1,47 @@ +//! Component test that combine the MPC network combined with a chain stream as input and output. + +use integration_tests::mpc_fixture::{mock_stream::MockStream, MpcFixtureBuilder}; +use mpc_node::protocol::IndexedSignRequest; +use mpc_primitives::{Chain, SignId}; +use std::time::Duration; +use test_log::test; + +fn sign_request(seed: u8) -> IndexedSignRequest { + IndexedSignRequest::sign( + SignId::new([seed; 32]), + super::helpers::test_sign_arg(seed), + Chain::Solana, + 0, + ) +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_sign() { + let network = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_mock_stream(MockStream::default()) + .await + .build() + .await; + + tracing::info!("sending requests now"); + let request = [sign_request(0)]; + network[0].mock_streams[0].sign_requests(&request).await; + network[1].mock_streams[0].sign_requests(&request).await; + network[2].mock_streams[0].sign_requests(&request).await; + + network[0].mock_streams[0].progress_block_height(1).await; + network[1].mock_streams[0].progress_block_height(1).await; + network[2].mock_streams[0].progress_block_height(1).await; + + let timeout = Duration::from_secs(10); + + let actions = network.assert_actions(1, timeout).await; + + assert_eq!(actions.len(), 1); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); +} From 12a81bb4eb2caeb3ddcbf61799b5a8a06684754f Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 2 Apr 2026 22:39:19 +0200 Subject: [PATCH 2/9] WIP: channel contention test --- chain-signatures/node/src/cli.rs | 2 +- .../node/src/protocol/message/sub.rs | 2 +- .../src/mpc_fixture/fixture_interface.rs | 2 +- .../tests/cases/mpc_with_stream.rs | 52 +++++++++++++++++-- 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 51ca2d230..78e1067a6 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -222,7 +222,7 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { ); crate::metrics::nodes::CONFIGURATION_DIGEST.set(digest); - let (sign_tx, sign_rx) = mpsc::channel(16384); + let (sign_tx, sign_rx) = mpsc::channel(if cfg!(test) { 1 } else { 16384 }); let gcp_service = GcpService::init(&account_id, &storage_options).await?; let key_storage = diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index e947dea18..73bbdf0ef 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -12,7 +12,7 @@ use crate::protocol::presignature::{FullPresignatureId, PresignatureId}; use crate::protocol::triple::TripleId; /// This should be enough to hold a few messages in the inbox. -pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; +pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = if cfg!(test) { 1 } else { 4 * 1024 }; pub enum SubscribeId { Generating, diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 7a2a69ef1..319f9a547 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -180,7 +180,7 @@ impl MpcFixture { let actions: tokio::sync::MutexGuard<'_, HashSet> = self.output.rpc_actions.lock().await; - tracing::info!("All published RPC actions:"); + tracing::info!(count = actions.len(), "All published RPC actions:"); for action in actions.iter() { tracing::info!("{action}"); } diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs index 0e207249b..2b2d62c3c 100644 --- a/integration-tests/tests/cases/mpc_with_stream.rs +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -6,10 +6,11 @@ use mpc_primitives::{Chain, SignId}; use std::time::Duration; use test_log::test; -fn sign_request(seed: u8) -> IndexedSignRequest { +fn sign_request(seed: u16) -> IndexedSignRequest { + let bytes = [seed.to_be_bytes()[0], seed.to_be_bytes()[1]].repeat(16); IndexedSignRequest::sign( - SignId::new([seed; 32]), - super::helpers::test_sign_arg(seed), + SignId::new(bytes.try_into().unwrap()), + super::helpers::test_sign_arg(seed.to_be_bytes()[0]), Chain::Solana, 0, ) @@ -45,3 +46,48 @@ async fn test_sign() { "unexpected rpc action {action_str}" ); } + +// WIP/TODO: This should fill up some channels and assert the effect of it. +// Right now, it just runs through all presignatures and then gets stuck as there are no Ps left. +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention() { + let network = MpcFixtureBuilder::default() + .only_generate_signatures() + .with_mock_stream(MockStream::default()) + .await + .build() + .await; + + // send requests in batches of 50 per block + for outer in 0..1000 { + let requests = (0..50) + .map(|inner| sign_request(outer * 50 + inner)) + .collect::>(); + + tracing::info!(outer, "sending request now"); + network[0].mock_streams[0].sign_requests(&requests).await; + network[1].mock_streams[0].sign_requests(&requests).await; + network[2].mock_streams[0].sign_requests(&requests).await; + } + network[0].mock_streams[0] + .progress_block_height(50_000) + .await; + network[1].mock_streams[0] + .progress_block_height(50_000) + .await; + network[2].mock_streams[0] + .progress_block_height(50_000) + .await; + + // there are only enough presignatures for 75 signatures in the fixture + let actions = network.assert_actions(75, Duration::from_secs(10)).await; + + assert_eq!(actions.len(), 75); + let action_str = actions.iter().next().unwrap(); + assert!( + action_str.contains("RpcAction::Publish"), + "unexpected rpc action {action_str}" + ); + + // TODO: check the system is still operative +} From 2f9b60bbe6c82e5593e2971aecd67e10cf1cb146 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 9 Apr 2026 19:15:03 +0200 Subject: [PATCH 3/9] wip: contention problem found but not with channel capacity? --- chain-signatures/node/src/cli.rs | 2 +- .../node/src/protocol/message/sub.rs | 16 +++- integration-tests/tests/cases/helpers.rs | 5 +- .../tests/cases/mpc_with_stream.rs | 88 +++++++++++++------ 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index 78e1067a6..51ca2d230 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -222,7 +222,7 @@ pub async fn run(cmd: Cli) -> anyhow::Result<()> { ); crate::metrics::nodes::CONFIGURATION_DIGEST.set(digest); - let (sign_tx, sign_rx) = mpsc::channel(if cfg!(test) { 1 } else { 16384 }); + let (sign_tx, sign_rx) = mpsc::channel(16384); let gcp_service = GcpService::init(&account_id, &storage_options).await?; let key_storage = diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index 73bbdf0ef..64c647060 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -12,7 +12,7 @@ use crate::protocol::presignature::{FullPresignatureId, PresignatureId}; use crate::protocol::triple::TripleId; /// This should be enough to hold a few messages in the inbox. -pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = if cfg!(test) { 1 } else { 4 * 1024 }; +pub const MAX_MESSAGE_SUB_CHANNEL_SIZE: usize = 4 * 1024; pub enum SubscribeId { Generating, @@ -110,8 +110,18 @@ impl Subscriber { pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { match self { - Self::Subscribed(tx) => tx.send(msg).await, - Self::Unsubscribed(tx, _) => tx.send(msg).await, + Self::Subscribed(tx) => { + let cap = tx.capacity(); + let max_cap = tx.max_capacity(); + tracing::warn!("Sending to subscribed, capacity {cap}/{max_cap}"); + tx.send(msg).await + } + Self::Unsubscribed(tx, _) => { + let cap = tx.capacity(); + let max_cap = tx.max_capacity(); + tracing::warn!("Sending to unsubscribed, capacity {cap}/{max_cap}"); + tx.send(msg).await + } Self::Unknown => Ok(()), } } diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index fb7837de9..efa0cb1a9 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -135,9 +135,10 @@ pub(crate) async fn assert_presig_owned_state( } } -pub fn test_sign_arg(seed: u8) -> SignArgs { +pub fn test_sign_arg(seed: impl Into) -> SignArgs { + let seed = seed.into(); let mut entropy = [1; 32]; - entropy[0] = seed; + entropy[0..4].copy_from_slice(&seed.to_be_bytes()); SignArgs { entropy, epsilon: k256::Scalar::default(), diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs index 2b2d62c3c..968f2740b 100644 --- a/integration-tests/tests/cases/mpc_with_stream.rs +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -6,11 +6,17 @@ use mpc_primitives::{Chain, SignId}; use std::time::Duration; use test_log::test; -fn sign_request(seed: u16) -> IndexedSignRequest { - let bytes = [seed.to_be_bytes()[0], seed.to_be_bytes()[1]].repeat(16); +fn sign_request(seed: u32) -> IndexedSignRequest { + let bytes = [ + seed.to_be_bytes()[0], + seed.to_be_bytes()[1], + seed.to_be_bytes()[2], + seed.to_be_bytes()[3], + ] + .repeat(8); IndexedSignRequest::sign( SignId::new(bytes.try_into().unwrap()), - super::helpers::test_sign_arg(seed.to_be_bytes()[0]), + super::helpers::test_sign_arg(seed), Chain::Solana, 0, ) @@ -27,6 +33,7 @@ async fn test_sign() { tracing::info!("sending requests now"); let request = [sign_request(0)]; + // TODO: abstraction to send to all network[0].mock_streams[0].sign_requests(&request).await; network[1].mock_streams[0].sign_requests(&request).await; network[2].mock_streams[0].sign_requests(&request).await; @@ -47,10 +54,11 @@ async fn test_sign() { ); } -// WIP/TODO: This should fill up some channels and assert the effect of it. -// Right now, it just runs through all presignatures and then gets stuck as there are no Ps left. -#[test(tokio::test(flavor = "multi_thread"))] -async fn test_channel_contention() { +async fn check_channel_contention( + num_blocks: usize, + req_per_block: usize, + expected_signatures: usize, +) { let network = MpcFixtureBuilder::default() .only_generate_signatures() .with_mock_stream(MockStream::default()) @@ -58,36 +66,60 @@ async fn test_channel_contention() { .build() .await; - // send requests in batches of 50 per block - for outer in 0..1000 { - let requests = (0..50) - .map(|inner| sign_request(outer * 50 + inner)) + let num_nodes = 3; + for outer in 0..(num_blocks as u16) { + let requests = (0..req_per_block) + .map(|inner| sign_request(outer as u32 * req_per_block as u32 + inner as u32)) .collect::>(); - tracing::info!(outer, "sending request now"); - network[0].mock_streams[0].sign_requests(&requests).await; - network[1].mock_streams[0].sign_requests(&requests).await; - network[2].mock_streams[0].sign_requests(&requests).await; + for i in 0..num_nodes { + network[i].mock_streams[0].sign_requests(&requests).await; + } + } + + for i in 0..num_nodes { + network[i].mock_streams[0] + .progress_block_height(num_blocks) + .await; } - network[0].mock_streams[0] - .progress_block_height(50_000) - .await; - network[1].mock_streams[0] - .progress_block_height(50_000) - .await; - network[2].mock_streams[0] - .progress_block_height(50_000) - .await; - // there are only enough presignatures for 75 signatures in the fixture - let actions = network.assert_actions(75, Duration::from_secs(10)).await; + let actions = network + .assert_actions(expected_signatures, Duration::from_secs(60)) + .await; - assert_eq!(actions.len(), 75); + assert_eq!(actions.len(), expected_signatures); let action_str = actions.iter().next().unwrap(); assert!( action_str.contains("RpcAction::Publish"), "unexpected rpc action {action_str}" ); +} + +// WIP: up to 50 seems to work fine + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_ok_a() { + check_channel_contention(1, 50, 50).await; +} +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_ok_b() { + check_channel_contention(5, 10, 50).await; +} + +// WIP: proof that there are enough Ps for more signatures for more than 50 +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_show_limit() { + check_channel_contention(6, 50, 75).await; +} - // TODO: check the system is still operative +// TODO: find out why it stops at 50 signatures +#[should_panic(expected = "should produce enough signatures")] +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_nok_a() { + check_channel_contention(1, 51, 51).await; +} +#[should_panic(expected = "should produce enough signatures")] +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_nok_b() { + check_channel_contention(6, 10, 60).await; } From 6829358587cdae5e93caceffdf4dca41a45080b8 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 16 Apr 2026 15:23:26 +0200 Subject: [PATCH 4/9] fix rebase --- integration-tests/src/mpc_fixture/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index 0ac253b3b..ce3260f3a 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -553,7 +553,7 @@ impl MpcFixtureNodeBuilder { &self.mock_streams, sign_tx.clone(), backlog.clone(), - context.contract_state, + context.contract_state.clone(), &mesh_rx, ); From e46b0fae62ec2b0e17c0c9cd91f2423770220439 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 16 Apr 2026 15:25:11 +0200 Subject: [PATCH 5/9] good entropy in tests otherwise it is always the same participant in round 0 --- integration-tests/tests/cases/helpers.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index efa0cb1a9..17cee20bc 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -8,6 +8,7 @@ use mpc_node::protocol::triple::Triple; use mpc_node::storage::triple_storage::TriplePair; use mpc_node::storage::{PresignatureStorage, TripleStorage}; use mpc_primitives::{SignArgs, LATEST_MPC_KEY_VERSION}; +use sha2::Digest; pub(crate) fn dummy_presignature(id: u64) -> Presignature { dummy_presignature_with_holders(id, vec![Participant::from(1), Participant::from(2)]) @@ -137,8 +138,11 @@ pub(crate) async fn assert_presig_owned_state( pub fn test_sign_arg(seed: impl Into) -> SignArgs { let seed = seed.into(); - let mut entropy = [1; 32]; - entropy[0..4].copy_from_slice(&seed.to_be_bytes()); + // entropy should have well-distributed bits even in tests + let entropy: [u8; 32] = sha2::Sha256::digest(&seed.to_be_bytes()) + .as_slice() + .try_into() + .expect("digest length should be 32"); SignArgs { entropy, epsilon: k256::Scalar::default(), From 08adca0808304e0cb5b2ea1050534149874f069f Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 16 Apr 2026 16:16:48 +0200 Subject: [PATCH 6/9] contention tests are now running locally adding useful cases and checking if they run in ci --- .../tests/cases/mpc_with_stream.rs | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs index 968f2740b..10038446f 100644 --- a/integration-tests/tests/cases/mpc_with_stream.rs +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -95,31 +95,30 @@ async fn check_channel_contention( ); } -// WIP: up to 50 seems to work fine - #[test(tokio::test(flavor = "multi_thread"))] -async fn test_channel_contention_ok_a() { +async fn test_channel_contention_many_requests_per_block() { check_channel_contention(1, 50, 50).await; } #[test(tokio::test(flavor = "multi_thread"))] -async fn test_channel_contention_ok_b() { +async fn test_channel_contention_multiple_blocks_at_once() { check_channel_contention(5, 10, 50).await; } -// WIP: proof that there are enough Ps for more signatures for more than 50 #[test(tokio::test(flavor = "multi_thread"))] async fn test_channel_contention_show_limit() { + // There are exactly enough presignatures in the fixture input for 75 signatures. check_channel_contention(6, 50, 75).await; } -// TODO: find out why it stops at 50 signatures -#[should_panic(expected = "should produce enough signatures")] #[test(tokio::test(flavor = "multi_thread"))] -async fn test_channel_contention_nok_a() { - check_channel_contention(1, 51, 51).await; +async fn test_channel_contention_10k_requests() { + // sending 100 x 100 requests at once + check_channel_contention(100, 100, 75).await; } -#[should_panic(expected = "should produce enough signatures")] + #[test(tokio::test(flavor = "multi_thread"))] -async fn test_channel_contention_nok_b() { - check_channel_contention(6, 10, 60).await; +#[allow(non_snake_case)] +async fn test_channel_contention_1M_requests() { + // sending 1000 x 1000 requests at once + check_channel_contention(1000, 1000, 75).await; } From 4660ded099af5d90f9548547fc3e5726ca10f43f Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 16 Apr 2026 17:30:01 +0200 Subject: [PATCH 7/9] clean up experiments and add delay test --- .../node/src/protocol/message/sub.rs | 14 +--- .../node/src/protocol/signature.rs | 5 ++ integration-tests/src/mpc_fixture/builder.rs | 19 ++++-- .../src/mpc_fixture/fixture_interface.rs | 20 +++++- .../src/mpc_fixture/fixture_tasks.rs | 2 +- .../src/mpc_fixture/mock_stream.rs | 14 ++-- integration-tests/tests/cases/helpers.rs | 2 +- .../tests/cases/mpc_with_stream.rs | 65 +++++++++++++------ 8 files changed, 91 insertions(+), 50 deletions(-) diff --git a/chain-signatures/node/src/protocol/message/sub.rs b/chain-signatures/node/src/protocol/message/sub.rs index 64c647060..e947dea18 100644 --- a/chain-signatures/node/src/protocol/message/sub.rs +++ b/chain-signatures/node/src/protocol/message/sub.rs @@ -110,18 +110,8 @@ impl Subscriber { pub async fn send(&self, msg: T) -> Result<(), mpsc::error::SendError> { match self { - Self::Subscribed(tx) => { - let cap = tx.capacity(); - let max_cap = tx.max_capacity(); - tracing::warn!("Sending to subscribed, capacity {cap}/{max_cap}"); - tx.send(msg).await - } - Self::Unsubscribed(tx, _) => { - let cap = tx.capacity(); - let max_cap = tx.max_capacity(); - tracing::warn!("Sending to unsubscribed, capacity {cap}/{max_cap}"); - tx.send(msg).await - } + Self::Subscribed(tx) => tx.send(msg).await, + Self::Unsubscribed(tx, _) => tx.send(msg).await, Self::Unknown => Ok(()), } } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 0d899baba..d1b1f7870 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -1556,6 +1556,11 @@ impl PendingPresignature { } } +#[cfg(feature = "test-feature")] +pub fn organize_posit_timeout() -> Duration { + ORGANIZE_POSIT_TIMEOUT +} + #[cfg(test)] mod tests { use super::*; diff --git a/integration-tests/src/mpc_fixture/builder.rs b/integration-tests/src/mpc_fixture/builder.rs index ce3260f3a..a89825153 100644 --- a/integration-tests/src/mpc_fixture/builder.rs +++ b/integration-tests/src/mpc_fixture/builder.rs @@ -33,6 +33,7 @@ use mpc_node::protocol::{self, MessageChannel, MpcSignProtocol, ProtocolState}; use mpc_node::rpc::ContractStateWatcher; use mpc_node::rpc::RpcChannel; use mpc_node::storage::{secret_storage, triple_storage::TriplePair, Options}; +use mpc_primitives::Chain; use near_sdk::AccountId; use std::collections::HashMap; use std::sync::Arc; @@ -58,7 +59,7 @@ struct MpcFixtureNodeBuilder { config: Config, messaging: NodeMessagingBuilder, key_info: Option, - mock_streams: Vec, + mock_streams: HashMap, } /// Config options for the test setup. @@ -428,9 +429,14 @@ impl MpcFixtureBuilder { /// /// Each node will have a independent deep-clone of the provided stream. /// Events are thus delivered to all nodes. - pub async fn with_mock_stream(mut self, stream: MockStream) -> Self { + pub async fn with_mock_stream(mut self, chain: Chain, stream: MockStream) -> Self { for node in &mut self.prepared_nodes { - node.mock_streams.push(stream.deep_clone().await) + let cloned = stream.deep_clone().await; + let prev = node.mock_streams.insert(chain, cloned); + assert!( + prev.is_none(), + "test setup only supports one stream per chain" + ); } self } @@ -481,7 +487,7 @@ impl MpcFixtureNodeBuilder { config, messaging, key_info: None, - mock_streams: vec![], + mock_streams: Default::default(), } } @@ -549,8 +555,9 @@ impl MpcFixtureNodeBuilder { let backlog = Backlog::new(); + let flat_mock_streams = self.mock_streams.values().cloned().collect::>(); fixture_tasks::start_mock_stream_tasks( - &self.mock_streams, + &flat_mock_streams, sign_tx.clone(), backlog.clone(), context.contract_state.clone(), @@ -567,7 +574,7 @@ impl MpcFixtureNodeBuilder { mesh_tx.clone(), config_tx.clone(), self.messaging.filter, - self.mock_streams.clone(), + flat_mock_streams.clone(), ); // --- SyncChannel and SyncTask setup --- diff --git a/integration-tests/src/mpc_fixture/fixture_interface.rs b/integration-tests/src/mpc_fixture/fixture_interface.rs index 319f9a547..8817b706a 100644 --- a/integration-tests/src/mpc_fixture/fixture_interface.rs +++ b/integration-tests/src/mpc_fixture/fixture_interface.rs @@ -11,10 +11,11 @@ use mpc_node::mesh::MeshState; use mpc_node::protocol::state::NodeStateWatcher; use mpc_node::protocol::state::NodeStatus; use mpc_node::protocol::sync::{SyncChannel, SyncUpdate}; -use mpc_node::protocol::{MessageChannel, ProtocolState, Sign}; +use mpc_node::protocol::{IndexedSignRequest, MessageChannel, ProtocolState, Sign}; use mpc_node::storage::{PresignatureStorage, TripleStorage}; +use mpc_primitives::Chain; use near_sdk::AccountId; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -35,7 +36,7 @@ pub struct MpcFixtureNode { pub sign_tx: mpsc::Sender, pub msg_channel: MessageChannel, - pub mock_streams: Vec, + pub mock_streams: HashMap, pub triple_storage: TripleStorage, pub presignature_storage: PresignatureStorage, @@ -185,6 +186,19 @@ impl MpcFixture { tracing::info!("{action}"); } } + + /// Add a block that contains signature requesting events, visible to all + /// nodes, then progress the chain to execute it immediately. + pub async fn process_sign_requests(&self, chain: Chain, requests: &[IndexedSignRequest]) { + for node in &self.nodes { + let stream = node + .mock_streams + .get(&chain) + .expect("must have mock stream configured"); + stream.prepare_block_of_sign_requests(requests).await; + stream.progress_block_height(1).await; + } + } } impl MpcFixtureNode { diff --git a/integration-tests/src/mpc_fixture/fixture_tasks.rs b/integration-tests/src/mpc_fixture/fixture_tasks.rs index 9576e2587..0a6d22add 100644 --- a/integration-tests/src/mpc_fixture/fixture_tasks.rs +++ b/integration-tests/src/mpc_fixture/fixture_tasks.rs @@ -87,7 +87,7 @@ pub(super) fn test_mock_network( actions_log.insert(action_str); let block = [rpc]; for stream in &mock_streams { - stream.rpc_actions(&block).await; + stream.prepare_block_of_rpc_actions(&block).await; } } diff --git a/integration-tests/src/mpc_fixture/mock_stream.rs b/integration-tests/src/mpc_fixture/mock_stream.rs index 19b4912ff..44da8c561 100644 --- a/integration-tests/src/mpc_fixture/mock_stream.rs +++ b/integration-tests/src/mpc_fixture/mock_stream.rs @@ -62,14 +62,16 @@ impl MockStream { guard.progress_block_height(steps) } - pub async fn sign_requests(&self, requests: &[IndexedSignRequest]) { + /// Add a future block that contains signature requesting events. + pub async fn prepare_block_of_sign_requests(&self, requests: &[IndexedSignRequest]) { let mut guard = self.inner.lock().await; - guard.sign_requests(requests) + guard.prepare_block_of_sign_requests(requests) } - pub async fn rpc_actions(&self, actions: &[RpcAction]) { + /// Add a future block that contains events corresponding to the provided rpc actions. + pub async fn prepare_block_of_rpc_actions(&self, actions: &[RpcAction]) { let mut guard = self.inner.lock().await; - guard.rpc_actions(actions) + guard.prepare_block_of_rpc_actions(actions) } } @@ -86,7 +88,7 @@ impl InnerMockStream { } /// Add a future block that contains signature requesting events. - pub fn sign_requests(&mut self, requests: &[IndexedSignRequest]) { + pub fn prepare_block_of_sign_requests(&mut self, requests: &[IndexedSignRequest]) { let mut block = Vec::new(); for request in requests { @@ -102,7 +104,7 @@ impl InnerMockStream { } /// Add a future block that contains events corresponding to the provided rpc actions. - pub fn rpc_actions(&mut self, actions: &[RpcAction]) { + pub fn prepare_block_of_rpc_actions(&mut self, actions: &[RpcAction]) { let mut block = Vec::new(); for action in actions { diff --git a/integration-tests/tests/cases/helpers.rs b/integration-tests/tests/cases/helpers.rs index 17cee20bc..069360d0c 100644 --- a/integration-tests/tests/cases/helpers.rs +++ b/integration-tests/tests/cases/helpers.rs @@ -139,7 +139,7 @@ pub(crate) async fn assert_presig_owned_state( pub fn test_sign_arg(seed: impl Into) -> SignArgs { let seed = seed.into(); // entropy should have well-distributed bits even in tests - let entropy: [u8; 32] = sha2::Sha256::digest(&seed.to_be_bytes()) + let entropy: [u8; 32] = sha2::Sha256::digest(seed.to_be_bytes()) .as_slice() .try_into() .expect("digest length should be 32"); diff --git a/integration-tests/tests/cases/mpc_with_stream.rs b/integration-tests/tests/cases/mpc_with_stream.rs index 10038446f..115d75314 100644 --- a/integration-tests/tests/cases/mpc_with_stream.rs +++ b/integration-tests/tests/cases/mpc_with_stream.rs @@ -22,28 +22,22 @@ fn sign_request(seed: u32) -> IndexedSignRequest { ) } +/// Simple test, mostly just here to check the MockStream setup is working. #[test(tokio::test(flavor = "multi_thread"))] async fn test_sign() { let network = MpcFixtureBuilder::default() .only_generate_signatures() - .with_mock_stream(MockStream::default()) + .with_mock_stream(Chain::Solana, MockStream::default()) .await .build() .await; tracing::info!("sending requests now"); - let request = [sign_request(0)]; - // TODO: abstraction to send to all - network[0].mock_streams[0].sign_requests(&request).await; - network[1].mock_streams[0].sign_requests(&request).await; - network[2].mock_streams[0].sign_requests(&request).await; - - network[0].mock_streams[0].progress_block_height(1).await; - network[1].mock_streams[0].progress_block_height(1).await; - network[2].mock_streams[0].progress_block_height(1).await; + network + .process_sign_requests(Chain::Solana, &[sign_request(0)]) + .await; let timeout = Duration::from_secs(10); - let actions = network.assert_actions(1, timeout).await; assert_eq!(actions.len(), 1); @@ -54,33 +48,54 @@ async fn test_sign() { ); } +/// Common checker function called with different parameters in test case below. async fn check_channel_contention( + // number of blocks with requests to send num_blocks: usize, + // number of requests within each block req_per_block: usize, + // how many signatures should be generated successfully, usually + // `num_blocks` * `req_per_block` expected_signatures: usize, + // add an observation delay between nodes + observation_delay: Option, ) { - let network = MpcFixtureBuilder::default() + let num_nodes = 3; + let threshold = 2; + let network = MpcFixtureBuilder::new(num_nodes as u32, threshold) .only_generate_signatures() - .with_mock_stream(MockStream::default()) + .with_mock_stream(Chain::Solana, MockStream::default()) .await .build() .await; - let num_nodes = 3; + // prepare blocks but do not send process them, yet for outer in 0..(num_blocks as u16) { let requests = (0..req_per_block) .map(|inner| sign_request(outer as u32 * req_per_block as u32 + inner as u32)) .collect::>(); for i in 0..num_nodes { - network[i].mock_streams[0].sign_requests(&requests).await; + network[i] + .mock_streams + .get(&Chain::Solana) + .unwrap() + .prepare_block_of_sign_requests(&requests) + .await; } } + // start sending requests, with optional observation delays between nodes for i in 0..num_nodes { - network[i].mock_streams[0] + network[i] + .mock_streams + .get(&Chain::Solana) + .unwrap() .progress_block_height(num_blocks) .await; + if let Some(delay) = observation_delay { + tokio::time::sleep(delay).await; + } } let actions = network @@ -97,28 +112,36 @@ async fn check_channel_contention( #[test(tokio::test(flavor = "multi_thread"))] async fn test_channel_contention_many_requests_per_block() { - check_channel_contention(1, 50, 50).await; + check_channel_contention(1, 50, 50, None).await; } + #[test(tokio::test(flavor = "multi_thread"))] async fn test_channel_contention_multiple_blocks_at_once() { - check_channel_contention(5, 10, 50).await; + check_channel_contention(5, 10, 50, None).await; +} + +#[test(tokio::test(flavor = "multi_thread"))] +async fn test_channel_contention_multiple_blocks_at_once_delayed() { + // delay should be > ORGANIZE_POSIT_TIMEOUT + let delay = mpc_node::protocol::signature::organize_posit_timeout() * 3 / 2; + check_channel_contention(5, 10, 50, Some(delay)).await; } #[test(tokio::test(flavor = "multi_thread"))] async fn test_channel_contention_show_limit() { // There are exactly enough presignatures in the fixture input for 75 signatures. - check_channel_contention(6, 50, 75).await; + check_channel_contention(6, 50, 75, None).await; } #[test(tokio::test(flavor = "multi_thread"))] async fn test_channel_contention_10k_requests() { // sending 100 x 100 requests at once - check_channel_contention(100, 100, 75).await; + check_channel_contention(100, 100, 75, None).await; } #[test(tokio::test(flavor = "multi_thread"))] #[allow(non_snake_case)] async fn test_channel_contention_1M_requests() { // sending 1000 x 1000 requests at once - check_channel_contention(1000, 1000, 75).await; + check_channel_contention(1000, 1000, 75, None).await; } From bd71fd0b4141ea84c0af9d78946d91b05bf39282 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 7 May 2026 21:38:54 +0200 Subject: [PATCH 8/9] feat: only fetch artifacts after posit succeeds WIP: THIS IS NOT COMPLETE. CURRENTLY IT FETCHES THE FULL ARTIFACT, NOT JUST AN ID. Removing an artifact from redis before we know if the posit succeeds can lead to wasted triples and presignatures. This introduces a "reserved" state, meaning that an artifact is reserved for being used. Only owned artifacts can be reserved. This is different from the previous "reserved" state previously renamed to "generating" in https://github.com/sig-net/mpc/pull/738. The extra state allows to avoid wasted P and T without recycling them. For state sync, an artifact with a reserved id is still treated as AVAILABLE. --- .../node/src/protocol/signature.rs | 67 +++--- .../node/src/storage/presignature_storage.rs | 5 +- .../node/src/storage/protocol_storage.rs | 205 +++++++++++++----- 3 files changed, 196 insertions(+), 81 deletions(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index d1b1f7870..40295f58e 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -11,8 +11,9 @@ use crate::protocol::posit::{PositAction, SinglePositCounter}; use crate::protocol::presignature::PresignatureId; use crate::protocol::{Chain, ProtocolState}; use crate::rpc::{ContractStateWatcher, GovernanceInfo, RpcChannel}; -use crate::storage::presignature_storage::{PresignatureTaken, PresignatureTakenDropper}; -use crate::storage::protocol_storage::ProtocolArtifact; +use crate::storage::presignature_storage::{ + PresignatureReserved, PresignatureTaken, PresignatureTakenDropper, +}; use crate::storage::PresignatureStorage; use crate::stream::ops::SignBidirectionalEvent; use crate::types::SignatureProtocol; @@ -212,7 +213,7 @@ struct SignPositor { proposer: Participant, active: BTreeSet, presignature_id: PresignatureId, - presignature: Option, + presignature: Option, } struct SignGenerating { @@ -353,34 +354,27 @@ impl SignOrganizer { let remaining = state.budget.remaining(); let fetch = tokio::time::timeout(remaining, async { loop { - if let Some(taken) = ctx.presignatures.take_mine().await { - let Some(holders) = taken.artifact.holders() else { - tracing::error!( - id = taken.artifact.id, - "holders not set on taken presignature" - ); - continue; - }; - let participants = intersect_vec(&[holders, &active]); + if let Some(reserved) = ctx.presignatures.reserve_mine().await { + let participants = intersect_vec(&[&reserved.holders, &active]); if participants.len() < ctx.governance.threshold { tracing::warn!( ?sign_id, - id = taken.artifact.id, - ?holders, + id = reserved.id, + holders = ?reserved.holders, ?active, - "discarding presignature due to inactive participants" + "releasing reserved presignature due to inactive participants" ); + // drop(reserved) releases the mine-key entry continue; } - - break (taken, participants); + break (reserved, participants); } tokio::time::sleep(Duration::from_millis(500)).await; } }) .await; - let (taken, participants) = match fetch { + let (reserved, participants) = match fetch { Ok(value) => value, Err(_) => { tracing::warn!( @@ -393,7 +387,7 @@ impl SignOrganizer { } }; - let presignature_id = taken.artifact.id; + let presignature_id = reserved.id; tracing::info!(?sign_id, ?presignature_id, "proposer got presignature"); @@ -417,7 +411,7 @@ impl SignOrganizer { // Update active to only include participants that are in both the presignature and active set let active = participants.into_iter().collect::>(); - (presignature_id, Some(taken), active) + (presignature_id, Some(reserved), active) } else { (PresignatureId::default(), None, active) }; @@ -704,12 +698,10 @@ impl SignPositor { if !counter.process_action(from, &action) { continue; } - if counter.enough_rejects(ctx.governance.threshold) { - tracing::warn!(?sign_id, ?round, ?from, "received enough REJECTs, reorganizing"); - if let Some(_taken) = presignature { - tracing::warn!(?sign_id, "discarding presignature due to REJECTs"); - } + // drop(reserved) releases the mine-key entry + let released = presignature.map(|reserved|reserved.id); + tracing::warn!(?sign_id, ?round, ?from, ?released, "received enough REJECTs, reorganizing"); state.bump_round(); return SignPhase::Organizing(SignOrganizer); } @@ -738,16 +730,16 @@ impl SignPositor { } _ = &mut posit_deadline => { if is_proposer { + // drop(reserved) releases the mine-key entry + let released = presignature.map(|reserved|reserved.id); tracing::warn!( ?sign_id, accepts = counter.accepts.len(), threshold = ctx.governance.threshold, ?round, + ?released, "proposer posit deadline reached, expiring round" ); - if let Some(_taken) = presignature { - tracing::warn!(?sign_id, "discarding presignature due to proposer timeout"); - } } else { tracing::warn!(?sign_id, me=?ctx.governance.me, ?proposer, "deliberator posit timeout waiting for Start, reorganizing"); } @@ -772,10 +764,27 @@ impl SignPositor { } }; + // Proposer: Now that posit succeeded, fetch and delete the presignature + // atomically. This shouldn't fail outside of corrupted db problems. + let presignature_taken = match presignature { + Some(reserved) => match reserved.commit(&ctx.presignatures).await { + Some(taken) => Some(taken), + None => { + tracing::warn!( + ?sign_id, + "failed to commit reserved presignature, reorganizing" + ); + state.bump_round(); + return SignPhase::Organizing(SignOrganizer); + } + }, + None => None, + }; + SignPhase::Generating(SignGenerating { proposer, presignature_id, - presignature, + presignature: presignature_taken, accepted_participants, }) } diff --git a/chain-signatures/node/src/storage/presignature_storage.rs b/chain-signatures/node/src/storage/presignature_storage.rs index 393e5518e..e5d0d1e52 100644 --- a/chain-signatures/node/src/storage/presignature_storage.rs +++ b/chain-signatures/node/src/storage/presignature_storage.rs @@ -4,7 +4,9 @@ use redis::{FromRedisValue, RedisError, RedisWrite, ToRedisArgs}; use cait_sith::protocol::Participant; -use super::protocol_storage::{ArtifactSlot, ArtifactTaken, ArtifactTakenDropper, ProtocolStorage}; +use super::protocol_storage::{ + ArtifactReserved, ArtifactSlot, ArtifactTaken, ArtifactTakenDropper, ProtocolStorage, +}; use crate::protocol::presignature::{Presignature, PresignatureId}; use crate::storage::protocol_storage::ProtocolArtifact; @@ -12,6 +14,7 @@ pub type PresignatureStorage = ProtocolStorage; pub type PresignatureSlot = ArtifactSlot; pub type PresignatureTaken = ArtifactTaken; pub type PresignatureTakenDropper = ArtifactTakenDropper; +pub type PresignatureReserved = ArtifactReserved; impl Presignature { pub fn storage(pool: &Pool, account_id: &AccountId) -> PresignatureStorage { diff --git a/chain-signatures/node/src/storage/protocol_storage.rs b/chain-signatures/node/src/storage/protocol_storage.rs index b54dac002..46378db65 100644 --- a/chain-signatures/node/src/storage/protocol_storage.rs +++ b/chain-signatures/node/src/storage/protocol_storage.rs @@ -138,6 +138,9 @@ struct ReservedState { /// IDs taken from Redis and actively consumed by a protocol. /// Value is `true` if this node is the owner of the artifact. using: HashMap, + /// Candidate IDs for a protocol where we are proposer. These are not in + /// `using`, yet. + mine_reserved: HashSet, } impl ReservedState { @@ -145,6 +148,7 @@ impl ReservedState { Self { generating: HashMap::new(), using: HashMap::new(), + mine_reserved: HashSet::new(), } } @@ -153,7 +157,49 @@ impl ReservedState { } fn contains_reserved(&self, id: &Id) -> bool { - self.generating.contains_key(id) || self.using.contains_key(id) + self.generating.contains_key(id) + || self.using.contains_key(id) + || self.mine_reserved.contains(id) + } +} + +/// A handle for a reserved artifact. +/// +/// The mine-key has been removed but the artifact data remains in Redis +/// unfetched. Dropping this without calling [`ArtifactReserved::commit`] +/// restores the mine-key entry. +pub struct ArtifactReserved { + pub id: A::Id, + /// Participants that held this artifact at reserve time (fetched separately, + /// without loading the full artifact data). + pub holders: Vec, + releaser: ArtifactReservedDropper, +} + +struct ArtifactReservedDropper { + id: A::Id, + storage: Option>, +} + +impl Drop for ArtifactReservedDropper { + fn drop(&mut self) { + if let Some(storage) = self.storage.take() { + let id = self.id; + tokio::spawn(async move { + let result = storage.release_reserved(id).await; + if let Err(err) = result { + tracing::error!(?err, id, "failed to release reserved artifact"); + } + }); + } + } +} + +impl ArtifactReserved { + /// Fetch a previously reserved artifact from Redis and delete it atomically. + pub async fn commit(mut self, storage: &ProtocolStorage) -> Option> { + self.releaser.storage = None; // disarm: Drop must not call release_reserved + storage.commit_reserved(self.id).await } } @@ -330,6 +376,7 @@ impl ProtocolStorage { .iter() .filter_map(|(&id, &mine)| mine.then_some(id)), ); + ids.extend(state.mine_reserved.iter().copied()); Ok(ids) } @@ -674,6 +721,7 @@ impl ProtocolStorage { let mut state = self.reserved.write().await; state.generating.clear(); state.using.clear(); + state.mine_reserved.clear(); // if the outcome is None, it means the script failed or there was an error. outcome.is_some() @@ -683,69 +731,124 @@ impl ProtocolStorage { /// It is very important to NOT reuse the same artifact twice for two different /// protocols. pub async fn take_mine(&self) -> Option> { - const SCRIPT: &str = r#" - local artifact_key = KEYS[1] - local mine_key = KEYS[2] - - if redis.call("SCARD", mine_key) < 1 then - return nil - end - - -- pop one artifact from the self owner set and delete it once successfully fetched - local id = redis.call("SPOP", mine_key) - local artifact = redis.call("HGET", artifact_key, id) - if not artifact then - return {err = "WARN unexpected, artifact " .. id .. " is missing"} - end - - -- Delete the artifact from the hash map - redis.call("HDEL", artifact_key, id) - -- delete the artifact from our self owner set - redis.call("SREM", mine_key, id) - - -- Read and delete the holders set - local holders_key = artifact_key .. ':holders:' .. id - local holders = redis.call("SMEMBERS", holders_key) - redis.call("DEL", holders_key) + let mut reserved = self.reserve_mine().await?; + // removing releaser.storage stops the releasing on drop + let storage = reserved.releaser.storage.take()?; + reserved.commit(&storage).await + } - -- Return the artifact and holders - return {artifact, holders} - "#; + pub fn artifact_key(&self) -> &str { + &self.artifact_key + } - let start = Instant::now(); + /// Read an id but not the content of an artifact from Redis, then mark it + /// as used in an in-memory structure. + pub async fn reserve_mine(&self) -> Option> { let mut conn = self.connect().await?; let me = self.me().ok()?; - let result: Result)>, _> = redis::Script::new(SCRIPT) - .key(&self.artifact_key) - .key(owner_key(&self.owner_keys, me)) - .invoke_async(&mut conn) - .await; + let mine_key = owner_key(&self.owner_keys, me); + let start = Instant::now(); + let result: Result, _> = conn.spop(&mine_key).await; let elapsed = start.elapsed(); crate::metrics::storage::REDIS_LATENCY - .with_label_values(&[A::METRIC_LABEL, "take_mine"]) + .with_label_values(&[A::METRIC_LABEL, "reserve_mine"]) .observe(elapsed.as_millis() as f64); - match result { - Ok(Some((mut artifact, holders))) => { - let holders = holders.into_iter().map(Participant::from).collect(); - artifact.set_holders(holders); - let id = artifact.id(); - self.reserved.write().await.using.insert(id, true); - let taken = ArtifactTaken::new(artifact, self.clone()); - tracing::debug!(id, ?elapsed, "took mine artifact"); - Some(taken) - } - Ok(None) => None, + let id = match result { + Ok(Some(id)) => id, + Ok(None) => return None, Err(err) => { - tracing::warn!(?err, ?elapsed, "failed to take mine artifact from storage"); - None + tracing::warn!( + ?err, + ?elapsed, + "failed to reserve mine artifact from storage" + ); + return None; } - } + }; + + let holders_key = format!("{}:holders:{}", self.artifact_key, id); + let raw_holders: Vec = conn.smembers(&holders_key).await.unwrap_or_default(); + let holders = raw_holders.into_iter().map(Participant::from).collect(); + + self.reserved.write().await.mine_reserved.insert(id); + tracing::debug!(id, ?elapsed, "reserved mine artifact"); + Some(ArtifactReserved { + id, + holders, + releaser: ArtifactReservedDropper { + id, + storage: Some(self.clone()), + }, + }) } - pub fn artifact_key(&self) -> &str { - &self.artifact_key + /// Fetch and delete a previously reserved artifact from Redis. + async fn commit_reserved(&self, id: A::Id) -> Option> { + let start = Instant::now(); + let Some(mut conn) = self.connect().await else { + tracing::warn!(id, "failed to commit reserved artifact: connection failed"); + self.reserved.write().await.mine_reserved.remove(&id); + return None; + }; + + let artifact_result: Result, _> = conn.hget(&self.artifact_key, id).await; + let mut artifact = match artifact_result { + Ok(Some(a)) => a, + Ok(None) => { + tracing::warn!(id, "artifact not found for commit"); + self.reserved.write().await.mine_reserved.remove(&id); + return None; + } + Err(err) => { + tracing::warn!(id, ?err, "failed to fetch artifact for commit"); + self.reserved.write().await.mine_reserved.remove(&id); + return None; + } + }; + + let _: Result = conn.hdel(&self.artifact_key, id).await; + let holders_key = format!("{}:holders:{}", self.artifact_key, id); + let raw_holders: Vec = conn.smembers(&holders_key).await.unwrap_or_default(); + let _: Result = conn.del(&holders_key).await; + + let elapsed = start.elapsed(); + crate::metrics::storage::REDIS_LATENCY + .with_label_values(&[A::METRIC_LABEL, "commit_reserved"]) + .observe(elapsed.as_millis() as f64); + + let holders = raw_holders.into_iter().map(Participant::from).collect(); + artifact.set_holders(holders); + let mut state = self.reserved.write().await; + state.mine_reserved.remove(&id); + state.using.insert(id, true); + drop(state); + tracing::debug!(id, ?elapsed, "committed reserved artifact"); + Some(ArtifactTaken::new(artifact, self.clone())) + } + + /// Restore the mine-key entry for a previously reserved artifact. + /// + /// INVARIANT: Never put back an artifact once it has been removed. This is + /// preserved because we never read the artifact itself from redis, only the + /// id. Here we only put back the id. + async fn release_reserved(&self, id: A::Id) -> anyhow::Result<()> { + self.reserved.write().await.mine_reserved.remove(&id); + let me = self.me()?; + let mine_key = owner_key(&self.owner_keys, me); + + let Some(mut conn) = self.connect().await else { + return Err(StorageError::ConnectionFailed)?; + }; + + let result: Result = conn.sadd(&mine_key, id).await; + if let Err(err) = result { + tracing::warn!(id, ?err, "failed to restore mine_key for reserved artifact"); + } else { + tracing::debug!(id, "restored mine_key entry for reserved artifact"); + } + Ok(()) } /// Batch remove a peer from holders for a set of artifact IDs, and prune From ca3cd8bc6cfadca925af1da4abd319fb3ada4ea4 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Thu, 7 May 2026 21:44:14 +0200 Subject: [PATCH 9/9] fix flaky test --- chain-signatures/node/src/protocol/signature.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 40295f58e..054c2d1ef 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -55,7 +55,10 @@ const ORGANIZE_POSIT_TIMEOUT: Duration = Duration::from_secs(if cfg!(feature = " /// /// Use shorter time for tests, as network delays are much smaller. const ACCEPT_POSIT_TIMEOUT: Duration = Duration::from_millis(if cfg!(feature = "test-feature") { - 100 + // TODO(#793): This should work with lower values. But + // `test_sign_no_presignature_waste` becomes unstable when this is too low. + // This should work if we handle non-participants better. + 300 } else { 500 });