diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index a0b4c4b3..7b68abad 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -134,6 +134,7 @@ async fn main() -> eyre::Result<()> { store.clone(), first_validator_id, options.attestation_committee_count, + options.is_aggregator, )); ethlambda_rpc::start_rpc_server(metrics_socket, store) @@ -143,8 +144,8 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); tokio::select! { - _ = p2p_handle => { - panic!("P2P node task has exited unexpectedly"); + result = p2p_handle => { + panic!("P2P node task has exited unexpectedly: {result:?}"); } _ = tokio::signal::ctrl_c() => { // Ctrl-C received, shutting down diff --git a/crates/net/p2p/src/lib.rs b/crates/net/p2p/src/lib.rs index 4e2a4651..2ebec1d3 100644 --- a/crates/net/p2p/src/lib.rs +++ b/crates/net/p2p/src/lib.rs @@ -66,7 +66,8 @@ pub async fn start_p2p( store: Store, validator_id: Option, attestation_committee_count: u64, -) { + is_aggregator: bool, +) -> Result<(), libp2p::gossipsub::SubscriptionError> { let config = libp2p::gossipsub::ConfigBuilder::default() // d .mesh_n(8) @@ -176,22 +177,27 @@ pub async fn start_p2p( .subscribe(&aggregation_topic) .unwrap(); - // Subscribe to attestation subnet topic (validators subscribe to their committee's subnet) + // Build attestation subnet topic (needed for publishing even without subscribing) // attestation_committee_count is validated to be >= 1 by clap at CLI parse time. let subnet_id = validator_id.map(|vid| vid % attestation_committee_count); let attestation_topic_kind = match subnet_id { Some(id) => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_{id}"), - // Non-validators subscribe to subnet 0 to receive attestations + // Non-validators use subnet 0 for publishing None => format!("{ATTESTATION_SUBNET_TOPIC_PREFIX}_0"), }; let attestation_topic_str = format!("/leanconsensus/{network}/{attestation_topic_kind}/ssz_snappy"); let attestation_topic = libp2p::gossipsub::IdentTopic::new(attestation_topic_str); - swarm - .behaviour_mut() - .gossipsub - .subscribe(&attestation_topic) - .unwrap(); + + // Only aggregators subscribe to attestation subnets; non-aggregators + // publish via gossipsub's fanout mechanism without subscribing. + if is_aggregator { + swarm + .behaviour_mut() + .gossipsub + .subscribe(&attestation_topic)?; + info!(%attestation_topic_kind, "Subscribed to attestation subnet"); + } info!(socket=%listening_socket, "P2P node started"); @@ -214,6 +220,7 @@ pub async fn start_p2p( }; event_loop(server).await; + Ok(()) } /// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours