Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain-signatures/contract-sol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub struct SignatureRespondedEvent {
}

#[event]
#[derive(Clone)]
pub struct RespondBidirectionalEvent {
pub request_id: [u8; 32],
pub responder: Pubkey,
Expand Down
109 changes: 87 additions & 22 deletions chain-signatures/node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,24 @@ use tokio::task::JoinHandle;
const ROUND_INTERVAL: usize = 512;

/// The default timeout budget for organizing and posit phases.
const ORGANIZE_POSIT_TIMEOUT: Duration = Duration::from_secs(20);
///
/// Tests have stable network conditions and don't benefit from a longer
/// timeout. It only makes them run for longer.
const ORGANIZE_POSIT_TIMEOUT: Duration = Duration::from_secs(if cfg!(feature = "test-feature") {
5
} else {
20
});

/// A proposer tries to include all eligible deliberators but will go ahead with
/// a subset after this timeout, if above the minimum threshold.
///
/// Use shorter time for tests, as network delays are much smaller.
const ACCEPT_POSIT_TIMEOUT: Duration = Duration::from_millis(if cfg!(feature = "test-feature") {
500
} else {
100
});

/// All relevant info pertaining to an indexed sign request.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -630,6 +647,9 @@ impl SignPositor {
let remaining = state.budget.remaining();
let posit_deadline = tokio::time::sleep(remaining);
tokio::pin!(posit_deadline);
let accept_deadline = tokio::time::sleep(ACCEPT_POSIT_TIMEOUT);
tokio::pin!(accept_deadline);
let mut accept_deadline_reached = false;

let accepted_participants = loop {
tokio::select! {
Expand Down Expand Up @@ -691,27 +711,24 @@ impl SignPositor {
return SignPhase::Organizing(SignOrganizer);
}

// Start as soon as we have enough accepts
if counter.enough_accepts(ctx.threshold) {
let participants = counter.accepts.into_iter().collect::<Vec<_>>();
tracing::info!(?sign_id, ?round, me = ?ctx.me, ?participants, "proposer broadcasting Start");

for &p in &participants {
if p == ctx.me {
continue;
}
ctx.msg
.send(
ctx.me,
p,
PositMessage {
id: PositProtocolId::Signature(sign_id, presignature_id, state.round),
from: ctx.me,
action: PositAction::Start(participants.clone()),
},
)
.await;
}
// Starting as soon as we have enough accepts leaves
// participants accepting a bit later in a bad state.
// They will try to become propose in later rounds,
// wasting Presignatures, memory and CPU time.
//
// Instead, wait for at least the `accept_deadline`,
// only nodes answer slower will be left out. This isn't
// perfect but much better than always forcing nodes
// into the bad state.
let ready_to_go = counter.meets_totality() || accept_deadline_reached;
if ready_to_go && counter.enough_accepts(ctx.threshold) {
let participants = Self::start_with_current_accepts(
ctx,
state,
counter,
sign_id,
presignature_id
).await;
break participants;
}
}
Expand All @@ -735,6 +752,20 @@ impl SignPositor {
state.bump_round();
return SignPhase::Organizing(SignOrganizer);
}
_ = &mut accept_deadline, if is_proposer && !accept_deadline_reached => {
accept_deadline_reached = true;
if counter.enough_accepts(ctx.threshold) {
let participants = Self::start_with_current_accepts(
ctx,
state,
counter,
sign_id,
presignature_id
).await;
break participants;
}
}

}
};

Expand All @@ -745,6 +776,35 @@ impl SignPositor {
accepted_participants,
})
}

async fn start_with_current_accepts(
ctx: &SignTask,
state: &mut SignState,
counter: SinglePositCounter,
sign_id: SignId,
presignature_id: PresignatureId,
) -> Vec<Participant> {
let participants = counter.accepts.into_iter().collect::<Vec<_>>();
tracing::info!(?sign_id, round=?state.round, me = ?ctx.me, ?participants, "proposer broadcasting Start");

for &p in &participants {
if p == ctx.me {
continue;
}
ctx.msg
.send(
ctx.me,
p,
PositMessage {
id: PositProtocolId::Signature(sign_id, presignature_id, state.round),
from: ctx.me,
action: PositAction::Start(participants.clone()),
},
)
.await;
}
participants
}
}

impl SignGenerating {
Expand Down Expand Up @@ -1510,3 +1570,8 @@ impl PendingPresignature {
}
}
}

#[cfg(feature = "test-feature")]
pub fn organize_posit_timeout() -> Duration {
ORGANIZE_POSIT_TIMEOUT
}
11 changes: 1 addition & 10 deletions chain-signatures/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,8 +1600,6 @@ use signet_program::accounts::Respond as SolanaRespondAccount;
use signet_program::accounts::RespondBidirectional as SolanaRespondBidirectionalAccount;
use signet_program::instruction::Respond as SolanaRespond;
use signet_program::instruction::RespondBidirectional as SolanaRespondBidirectional;
use signet_program::AffinePoint as SolanaContractAffinePoint;
use signet_program::Signature as SolanaContractSignature;
use solana_sdk::signature::Signer as SolanaSigner;
async fn try_publish_sol(
sol: &SolanaClient,
Expand All @@ -1614,14 +1612,7 @@ async fn try_publish_sol(
let sign_id = action.indexed.id;
let request_ids = vec![action.indexed.id.request_id];
let big_r = signature.big_r.to_encoded_point(false);
let signature = SolanaContractSignature {
big_r: SolanaContractAffinePoint {
x: big_r.as_bytes()[1..33].try_into().unwrap(),
y: big_r.as_bytes()[33..65].try_into().unwrap(),
},
s: signature.s.to_bytes().into(),
recovery_id: signature.recovery_id,
};
let signature = crate::util::mpc_to_sol_signature(signature, big_r);

tracing::debug!(
?sign_id,
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub fn channel() -> (mpsc::Sender<ChainEvent>, mpsc::Receiver<ChainEvent>) {

/// Unified event produced by a chain stream
#[allow(clippy::large_enum_variant)]
#[derive(Clone)]
pub enum ChainEvent {
SignRequest(IndexedSignRequest),
Respond(SignatureRespondedEvent),
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/node/src/stream/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl SignBidirectionalEvent {
}
}

#[derive(Clone)]
pub enum RespondBidirectionalEvent {
Solana(signet_program::RespondBidirectionalEvent),
Hydration(HydrationRespondBidirectionalEvent),
Expand Down
14 changes: 14 additions & 0 deletions chain-signatures/node/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ impl AffinePointExt for AffinePoint {
}
}

pub fn mpc_to_sol_signature(
signature: &mpc_primitives::Signature,
big_r: k256::EncodedPoint,
) -> signet_program::Signature {
signet_program::Signature {
big_r: signet_program::AffinePoint {
x: big_r.as_bytes()[1..33].try_into().unwrap(),
y: big_r.as_bytes()[33..65].try_into().unwrap(),
},
s: signature.s.to_bytes().into(),
recovery_id: signature.recovery_id,
}
}

pub fn is_elapsed_longer_than_timeout(timestamp_sec: u64, timeout: u64) -> bool {
if let LocalResult::Single(msg_timestamp) = Utc.timestamp_opt(timestamp_sec as i64, 0) {
let timeout = Duration::from_millis(timeout);
Expand Down
35 changes: 34 additions & 1 deletion integration-tests/src/mpc_fixture/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::mpc_fixture::fixture_tasks::MessageFilter;
use crate::mpc_fixture::input::FixtureInput;
use crate::mpc_fixture::message_collector::CollectMessages;
use crate::mpc_fixture::mock_governance::MockGovernance;
use crate::mpc_fixture::mock_stream::MockStream;
use crate::mpc_fixture::{fixture_tasks, MpcFixture, MpcFixtureNode};
use cait_sith::protocol::Participant;
use mpc_contract::config::{
Expand All @@ -31,6 +32,7 @@ use mpc_node::protocol::{self, MessageChannel, MpcSignProtocol, ProtocolState};
use mpc_node::rpc::ContractStateWatcher;
use mpc_node::rpc::RpcChannel;
use mpc_node::storage::{secret_storage, triple_storage::TriplePair, Options};
use mpc_primitives::Chain;
use near_sdk::AccountId;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -56,6 +58,7 @@ struct MpcFixtureNodeBuilder {
config: Config,
messaging: NodeMessagingBuilder,
key_info: Option<NodeKeyInfo>,
mock_streams: HashMap<Chain, MockStream>,
}

/// Config options for the test setup.
Expand Down Expand Up @@ -420,6 +423,22 @@ impl MpcFixtureBuilder {
.with_node_min_triples(0)
.with_node_min_presignatures(0)
}

/// Add a mock stream to all nodes.
///
/// Each node will have a independent deep-clone of the provided stream.
/// Events are thus delivered to all nodes.
pub async fn with_mock_stream(mut self, chain: Chain, stream: MockStream) -> Self {
for node in &mut self.prepared_nodes {
let cloned = stream.deep_clone().await;
let prev = node.mock_streams.insert(chain, cloned);
assert!(
prev.is_none(),
"test setup only supports one stream per chain"
);
}
self
}
}

impl MpcFixtureNodeBuilder {
Expand Down Expand Up @@ -467,6 +486,7 @@ impl MpcFixtureNodeBuilder {
config,
messaging,
key_info: None,
mock_streams: Default::default(),
}
}

Expand Down Expand Up @@ -532,6 +552,17 @@ impl MpcFixtureNodeBuilder {
mesh_rx.clone(),
));

let backlog = Backlog::new();

let flat_mock_streams = self.mock_streams.values().cloned().collect::<Vec<_>>();
fixture_tasks::start_mock_stream_tasks(
&flat_mock_streams,
sign_tx.clone(),
backlog.clone(),
context.contract_state.clone(),
&mesh_rx,
);

// handle outbox messages manually, we want them before they are
// encrypted and we want to send them directly to other node's inboxes
let _mock_network_handle = fixture_tasks::test_mock_network(
Expand All @@ -542,6 +573,7 @@ impl MpcFixtureNodeBuilder {
mesh_tx.clone(),
config_tx.clone(),
self.messaging.filter,
flat_mock_streams.clone(),
);

// --- SyncChannel and SyncTask setup ---
Expand All @@ -563,9 +595,10 @@ impl MpcFixtureNodeBuilder {
config: config_tx,
sign_tx,
msg_channel: self.messaging.channel,
mock_streams: self.mock_streams,
triple_storage,
presignature_storage,
backlog: Backlog::new(),
backlog,
sync_channel,
web_handle: None,
};
Expand Down
22 changes: 19 additions & 3 deletions integration-tests/src/mpc_fixture/fixture_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

use crate::containers::Redis;
use crate::mpc_fixture::message_collector::{CollectMessages, MessagePrinter};
use crate::mpc_fixture::mock_stream::MockStream;
use cait_sith::protocol::Participant;
use mpc_node::backlog::Backlog;
use mpc_node::config::Config;
use mpc_node::mesh::MeshState;
use mpc_node::protocol::state::NodeStateWatcher;
use mpc_node::protocol::sync::{SyncChannel, SyncUpdate};
use mpc_node::protocol::{MessageChannel, ProtocolState, Sign};
use mpc_node::protocol::{IndexedSignRequest, MessageChannel, ProtocolState, Sign};
use mpc_node::storage::{PresignatureStorage, TripleStorage};
use mpc_primitives::Chain;
use near_sdk::AccountId;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
Expand All @@ -33,6 +35,7 @@ pub struct MpcFixtureNode {

pub sign_tx: mpsc::Sender<Sign>,
pub msg_channel: MessageChannel,
pub mock_streams: HashMap<Chain, MockStream>,

pub triple_storage: TripleStorage,
pub presignature_storage: PresignatureStorage,
Expand Down Expand Up @@ -131,11 +134,24 @@ impl MpcFixture {
let actions: tokio::sync::MutexGuard<'_, HashSet<String>> =
self.output.rpc_actions.lock().await;

tracing::info!("All published RPC actions:");
tracing::info!(count = actions.len(), "All published RPC actions:");
for action in actions.iter() {
tracing::info!("{action}");
}
}

/// Add a block that contains signature requesting events, visible to all
/// nodes, then progress the chain to execute it immediately.
pub async fn process_sign_requests(&self, chain: Chain, requests: &[IndexedSignRequest]) {
for node in &self.nodes {
let stream = node
.mock_streams
.get(&chain)
.expect("must have mock stream configured");
stream.prepare_block_of_sign_requests(requests).await;
stream.progress_block_height(1).await;
}
}
}

impl MpcFixtureNode {
Expand Down
Loading
Loading