Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ apollo_proc_macros = { path = "crates/apollo_proc_macros", version = "0.0.0" }
apollo_proc_macros_tests.path = "crates/apollo_proc_macros_tests"
apollo_propeller.path = "crates/apollo_propeller"
apollo_protobuf.path = "crates/apollo_protobuf"
apollo_quic_datagrams = { path = "crates/apollo_quic_datagrams", features = ["tokio"] }
apollo_reverts.path = "crates/apollo_reverts"
apollo_rpc.path = "crates/apollo_rpc"
apollo_rpc_execution.path = "crates/apollo_rpc_execution"
Expand Down
1 change: 1 addition & 0 deletions crates/apollo_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ apollo_config.workspace = true
apollo_metrics.workspace = true
apollo_network_types.workspace = true
apollo_propeller.workspace = true
apollo_quic_datagrams = { path = "../apollo_quic_datagrams", features = ["tokio"] }
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
Expand Down
3 changes: 0 additions & 3 deletions crates/apollo_network/src/mixed_behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@ use crate::peer_manager::PeerManagerConfig;
use crate::{
discovery,
gossipsub_impl,
gossipsub_impl,
peer_manager,
peer_manager,
propeller_impl,
prune_dead_connections,
sqmr,
sqmr,
};

// TODO(Shahak): consider reducing the pulicity of all behaviour to pub(crate)
Expand Down
107 changes: 51 additions & 56 deletions crates/apollo_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,9 +798,7 @@ fn send_now<Item>(
}
}


use std::fs;
use std::io;
use std::{fs, io};

fn read_sysctl_value(path: &str) -> io::Result<u64> {
// Read the file's content into a string
Expand All @@ -822,11 +820,21 @@ fn read_sysctl_value(path: &str) -> io::Result<u64> {
fn check_udp_buffer_size(desired_buffer_size: u32) {
let rmem_max = read_sysctl_value("/proc/sys/net/core/rmem_max").unwrap();
let wmem_max = read_sysctl_value("/proc/sys/net/core/wmem_max").unwrap();
assert!(rmem_max >= desired_buffer_size.into(), "rmem_max is {rmem_max} less than {desired_buffer_size}");
assert!(wmem_max >= desired_buffer_size.into(), "wmem_max is {wmem_max} less than {desired_buffer_size}");

if rmem_max < desired_buffer_size.into() || wmem_max < desired_buffer_size.into() {
panic!("UDP buffer size is less than desired buffer size. rmem_max: {rmem_max}, wmem_max: {wmem_max}, desired_buffer_size: {desired_buffer_size}. This runs the risk of packet loss in high latency lines");
assert!(
rmem_max >= desired_buffer_size.into(),
"rmem_max is {rmem_max} less than {desired_buffer_size}"
);
assert!(
wmem_max >= desired_buffer_size.into(),
"wmem_max is {wmem_max} less than {desired_buffer_size}"
);

if rmem_max < desired_buffer_size.into() || wmem_max < desired_buffer_size.into() {
panic!(
"UDP buffer size is less than desired buffer size. rmem_max: {rmem_max}, wmem_max: \
{wmem_max}, desired_buffer_size: {desired_buffer_size}. This runs the risk of packet \
loss in high latency lines"
);
}
}

Expand Down Expand Up @@ -865,54 +873,41 @@ impl NetworkManager {
const BYTES_IN_THE_AIR: u32 = 1 << 30;
// check_udp_buffer_size(BYTES_IN_THE_AIR);

let mut swarm = SwarmBuilder::with_existing_identity(key_pair)
.with_tokio()
// TODO(AndrewL): .with_quic()
.with_quic_config( |mut quic_config| {
// HIGH THROUGHPUT, HIGH LATENCY OPTIMIZATION:
// Maximize data flow and minimize waiting for acknowledgements

// Set maximum data per stream and connection to allow unlimited flow
quic_config.send_window = Some(BYTES_IN_THE_AIR.into());
quic_config.max_stream_data = BYTES_IN_THE_AIR;
quic_config.max_connection_data = BYTES_IN_THE_AIR;
quic_config.congestion_controller = Some(libp2p::quic::CongestionController::Bbr {
initial_window: Some(BYTES_IN_THE_AIR.into())
});

// // Set handshake timeout to allow time for DNS resolution and connection establishment
// quic_config.handshake_timeout = std::time::Duration::from_secs(10);

// // Reduce idle timeout to prevent connections from lingering
// // but still allow for high-latency scenarios
// quic_config.max_idle_timeout = 30000; // 30 seconds instead of 3000

// // Set aggressive keep-alive to maintain connections over high-latency links
// quic_config.keep_alive_interval = std::time::Duration::from_secs(10);

// // Allow maximum concurrent streams for parallel data transmission
// quic_config.max_concurrent_stream_limit = u32::MAX;

quic_config
})
.with_dns()
.expect("Error building DNS transport")
.with_behaviour(|key| mixed_behaviour::MixedBehaviour::new(
sqmr::Config { session_timeout },
discovery_config,
peer_manager_config,
metrics.as_mut()
.and_then(|m| m.event_metrics.take()),
metrics.as_mut()
.and_then(|m| m.propeller_metrics.take()),
key.clone(),
bootstrap_peer_multiaddr,
chain_id,
node_version
))
.expect("Error while building the swarm")
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(idle_connection_timeout))
.build();
// Configure custom QUIC transport with apollo_quic_datagrams
// HIGH THROUGHPUT, HIGH LATENCY OPTIMIZATION:
// Maximize data flow and minimize waiting for acknowledgements
let quic_config = apollo_quic_datagrams::Config::new(&key_pair)
.with_send_window(BYTES_IN_THE_AIR as u64)
.with_max_stream_data(BYTES_IN_THE_AIR)
.with_max_connection_data(BYTES_IN_THE_AIR)
.with_congestion_controller(apollo_quic_datagrams::CongestionController::Bbr {
initial_window: Some(BYTES_IN_THE_AIR as u64),
});

let quic_transport = apollo_quic_datagrams::tokio::Transport::new(quic_config);

let mut swarm = SwarmBuilder::with_existing_identity(key_pair.clone())
.with_tokio()
.with_other_transport(|_key| quic_transport)
.expect("Error building QUIC transport")
.with_dns()
.expect("Error building DNS transport")
.with_behaviour(|key| {
mixed_behaviour::MixedBehaviour::new(
sqmr::Config { session_timeout },
discovery_config,
peer_manager_config,
metrics.as_mut().and_then(|m| m.event_metrics.take()),
metrics.as_mut().and_then(|m| m.propeller_metrics.take()),
key.clone(),
bootstrap_peer_multiaddr,
chain_id,
node_version,
)
})
.expect("Error while building the swarm")
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(idle_connection_timeout))
.build();

let _ = swarm.listen_on(listen_address.clone());

Expand Down
1 change: 1 addition & 0 deletions crates/apollo_propeller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ strum_macros.workspace = true
tracing.workspace = true

[dev-dependencies]
apollo_quic_datagrams.workspace = true
libp2p = { workspace = true, features = ["macros", "plaintext", "quic", "tcp", "tokio", "yamux"] }
libp2p-swarm-test.workspace = true
quickcheck.workspace = true
Expand Down
31 changes: 20 additions & 11 deletions crates/apollo_propeller/tests/e2e_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ fn create_swarm(

let identity = Keypair::generate_ed25519();

let builder = libp2p::SwarmBuilder::with_existing_identity(identity).with_tokio();
let builder = libp2p::SwarmBuilder::with_existing_identity(identity.clone()).with_tokio();

match transport_type {
TransportType::Memory => builder
Expand All @@ -212,16 +212,25 @@ fn create_swarm(
c.with_idle_connection_timeout(Duration::from_secs(3600)) // 1 hour
})
.build(),
TransportType::Quic => builder
.with_quic()
.with_behaviour(|_| HandlerTestBehaviour::new(max_wire_message_size))
.expect("Failed to create behaviour")
.with_swarm_config(|c| {
// Use a much longer idle connection timeout to prevent disconnections during long
// tests
c.with_idle_connection_timeout(Duration::from_secs(3600)) // 1 hour
})
.build(),
TransportType::Quic => {
// Use custom apollo_quic_datagrams implementation
let quic_config = apollo_quic_datagrams::Config::new(&identity);
let quic_transport = apollo_quic_datagrams::tokio::Transport::new(quic_config);

builder
.with_other_transport(|_key| quic_transport)
.expect("Failed to build QUIC transport")
.with_dns()
.expect("Failed to build DNS transport")
.with_behaviour(|_| HandlerTestBehaviour::new(max_wire_message_size))
.expect("Failed to create behaviour")
.with_swarm_config(|c| {
// Use a much longer idle connection timeout to prevent disconnections during
// long tests
c.with_idle_connection_timeout(Duration::from_secs(3600)) // 1 hour
})
.build()
}
}
}

Expand Down
118 changes: 0 additions & 118 deletions crates/apollo_quic_datagrams/CHANGELOG.md

This file was deleted.

1 change: 1 addition & 0 deletions crates/apollo_quic_datagrams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ libp2p = { workspace = true, features = [
"yamux",
] }
quinn = { version = "0.11.6", default-features = false, features = ["futures-io", "rustls"] }
quinn-proto = "0.11.13"
rand = { workspace = true }
rustls = { version = "0.23.9", default-features = false }
thiserror = { workspace = true }
Expand Down
Loading
Loading