diff --git a/Cargo.lock b/Cargo.lock index 24ba6e28aa0..9bb950983de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1969,6 +1969,7 @@ dependencies = [ "sysinfo", "thiserror 1.0.69", "tokio", + "tokio-metrics", "tokio-retry", "tokio-stream", "tracing", diff --git a/crates/apollo_network/Cargo.toml b/crates/apollo_network/Cargo.toml index ffc7892dc5c..4f98941110c 100644 --- a/crates/apollo_network/Cargo.toml +++ b/crates/apollo_network/Cargo.toml @@ -42,6 +42,7 @@ strum_macros.workspace = true sysinfo.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full", "sync"] } +tokio-metrics = { workspace = true, features = ["metrics-rs-integration", "rt"] } tokio-retry.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md index 2889f2a83f4..9ecf4851d99 100644 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/README.md @@ -1,41 +1,451 @@ -# Network Stress Test +# Broadcast Network Stress Test Node -## Setup and Run Stress Test +A comprehensive network stress testing tool for the Apollo network that tests P2P communication, measures performance metrics, and validates network behavior under various load patterns and conditions. -1. **Create Remote Engines** +## Overview - Create 5 gcloud VM instances. Make sure to have the necessary RAM and disk space. Each instance should be named in the following pattern: +The broadcast network stress test node is designed to stress test the P2P communication layer of the Apollo network. It creates a network of nodes with configurable broadcasting patterns, measuring latency, throughput, message ordering, and overall network performance. The tool supports both local testing (using the provided Python scripts) and distributed deployment via Kubernetes with optional network throttling. - ``` - -0, ... ,-4 - ``` +## Features -2. **Set Bootstrap Node** +- **Multiple Broadcasting Modes**: Supports different message broadcasting patterns (all nodes, single broadcaster, round-robin) +- **Advanced Performance Metrics**: Measures message latency, throughput, delivery rates, ordering, and duplicate detection +- **Comprehensive System Monitoring**: Tracks CPU usage, memory consumption, network I/O, and gossipsub protocol metrics +- **Message Ordering Analysis**: Tracks out-of-order messages, missing messages, and duplicates +- **Prometheus Integration**: Exports detailed metrics with proper labels for monitoring and analysis +- **Network Throttling**: Supports bandwidth and latency gating for realistic network conditions +- **Configurable Parameters**: Customizable message sizes, send intervals, buffer sizes, test duration, and system metrics intervals +- **Multi-Node Support**: Can run multiple coordinated nodes with different broadcasting patterns +- **Kubernetes Deployment**: Includes YAML templates for cluster deployment with traffic shaping and auto-cleanup +- **Deterministic Peer IDs**: Generates consistent peer identities for reproducible tests +- **Performance Profiling**: Integrated support for CPU, memory, and heap profiling with perf and valgrind +- **Automatic Timeout**: Configurable test duration with automatic termination +- **Resource Management**: Kubernetes deployments with configurable CPU/memory limits and dedicated node pools +- **Resilient Infrastructure**: Auto-retry mechanisms for Prometheus port forwarding and automatic namespace cleanup - Find the internal IP of your bootstrap node in the VM instances chart on google cloud console. Paste it into the test_config.json file into the bootstrap_peer_multaddr value instead of its placeholder. +## Building -3. **Install Rust and clone repository** +Build the stress test node binary: - For all 5 instances run: +```bash +# For basic functionality +cargo build --release --bin broadcast_network_stress_test_node - ``` - gcloud compute ssh -0 --project -- 'cd && sudo apt install -y git unzip clang && curl https://sh.rustup.rs -sSf | sh -s -- -y && source "$HOME/.cargo/env" && git clone https://github.com/starkware-libs/sequencer.git; cd sequencer && sudo scripts/dependencies.sh cargo build --release -p apollo_network --bin network_stress_test' - ``` +# For additional tokio metrics (used by local script) +RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin broadcast_network_stress_test_node +``` -4. **Run test** +## Command Line Arguments - ``` - PROJECT_ID= BASE_INSTANCE_NAME= ZONE= ./run_broadcast_stress_test.sh - ``` +**Note**: Default values shown are for direct binary usage. Python scripts (`local.py`, `cluster_start.py`) may override some defaults. -5. **Results** +| Argument | Description | Default | Environment Variable | +|----------|-------------|---------|---------------------| +| `--id` | Node ID for identification and metrics | Required | `ID` | +| `--num-nodes` | Total number of nodes in the network | 3 | `NUM_NODES` | +| `--metric-port` | Prometheus metrics server port | 2000 | `METRIC_PORT` | +| `--p2p-port` | P2P network port | 10000 | `P2P_PORT` | +| `--bootstrap` | Bootstrap peer addresses (comma-separated) | None | `BOOTSTRAP` | +| `--verbosity` | Log verbosity (0-5: None, ERROR, WARN, INFO, DEBUG, TRACE) | 0 (2 in Python scripts) | `VERBOSITY` | +| `--buffer-size` | Broadcast topic buffer size | 10000 | `BUFFER_SIZE` | +| `--message-size-bytes` | Message payload size in bytes | 1024 | `MESSAGE_SIZE_BYTES` | +| `--heartbeat-millis` | Interval between messages (milliseconds) | 1 | `HEARTBEAT_MILLIS` | +| `--mode` | Broadcasting mode: `all`, `one`, `rr`, or `explore` | `all` | `MODE` | +| `--broadcaster` | In mode `one` or `explore`, which node ID should do the broadcasting | 1 (last node in Python scripts) | `BROADCASTER` | +| `--round-duration-seconds` | Duration per node in RoundRobin mode | 3 | `ROUND_DURATION_SECONDS` | +| `--explore-cool-down-duration-seconds` | Cool down duration between configuration changes in Explore mode | Required for explore mode | `EXPLORE_COOL_DOWN_DURATION_SECONDS` | +| `--explore-run-duration-seconds` | Duration to run each configuration in Explore mode | Required for explore mode | `EXPLORE_RUN_DURATION_SECONDS` | +| `--explore-min-throughput-byte-per-seconds` | Minimum throughput in bytes per second for Explore mode | Required for explore mode | `EXPLORE_MIN_THROUGHPUT_BYTE_PER_SECONDS` | +| `--system-metrics-interval-seconds` | Interval for collecting process metrics (CPU, memory) | 1 | `SYSTEM_METRICS_INTERVAL_SECONDS` | +| `--timeout` | Timeout in seconds for the node (when exceeded, node is killed) | None (7200 in Python scripts) | `TIMEOUT` | +| `--enable-libp2p-metrics` | Enable libp2p built-in bandwidth and transport metrics | false | `ENABLE_LIBP2P_METRICS` | +| `--tcp` | Sets the multi-addresses to use TCP instead of UDP/QUIC | true | `TCP` | - Results are retrieved from VM instances and saved to /output.csv. You can change the default path by adjusting the config file. +## Broadcasting Modes -## Pull repo updates to virtual machines +### All Broadcast (`all`) +All nodes continuously broadcast messages simultaneously. Best for testing network capacity and concurrent message handling. -1. **Run** +### Single Broadcaster (`one`) +Only the node specified by `--broadcaster` sends messages, while others act as receivers. Ideal for testing message propagation and network topology. - ``` - PROJECT_ID= BASE_INSTANCE_NAME= ZONE= ./pull_stress_test.sh - ``` +### Round Robin (`rr`) +Nodes take turns broadcasting in sequential order based on their ID. Each node broadcasts for `--round-duration-seconds` before passing to the next. Useful for testing network behavior under changing load patterns. + +### Explore (`explore`) +A specialized single broadcaster mode that automatically explores different combinations of message sizes and throughput rates over time. Only the node specified by `--broadcaster` sends messages, but the configuration (message size and send interval) changes every `--explore-run-duration-seconds` with a cooldown period of `--explore-cool-down-duration-seconds`. Ideal for automated performance testing across various network conditions and finding optimal throughput configurations. + +## Running Locally + +### Recommended: Multi-Node Network using Local Script + +The best way to run locally is using the local script. First, navigate to the run directory: + +```bash +cd crates/apollo_network/src/bin/broadcast_network_stress_test_node/run +python local.py --num-nodes 3 --verbosity 3 --mode rr +``` + +This will: +- Compile the binary if needed +- Start 3 nodes with sequential ports (10000, 10001, 10002) +- Automatically configure bootstrap peers for all nodes +- Launch Prometheus in Docker for metrics collection +- Provide a web interface at http://localhost:9090 + +### Manual Single Node (Advanced) + +For direct binary testing (not recommended for most use cases): + +```bash +./target/release/broadcast_network_stress_test_node \ + --id 0 \ + --metric-port 2000 \ + --p2p-port 10000 \ + --verbosity 3 \ + --mode all +``` + +### Advanced Local Testing + +All commands should be run from the run directory: + +```bash +cd crates/apollo_network/src/bin/broadcast_network_stress_test_node/run + +# Test round-robin mode with custom timing +python local.py --num-nodes 5 --mode rr --round-duration-seconds 10 --heartbeat-millis 100 + +# Test single broadcaster mode +python local.py --num-nodes 3 --mode one --broadcaster 0 --message-size-bytes 4096 + +# Enable profiling with perf (CPU profiling) +python local.py --num-nodes 3 --profile --profile-mode cpu --mode all + +# Enable memory profiling with perf +python local.py --num-nodes 3 --profile --profile-mode mem --mode rr + +# Use DHAT memory profiler (requires valgrind) +python local.py --num-nodes 3 --profile --profile-mode dhat --mode all + +# Test explore mode with automatic configuration changes +python local.py --num-nodes 3 --mode explore --broadcaster 0 --explore-run-duration-seconds 30 --explore-cool-down-duration-seconds 5 --explore-min-throughput-byte-per-seconds 1000 +``` + +## Kubernetes Deployment + +### Prerequisites + +- Kubernetes cluster access +- Docker registry access +- kubectl configured + +### Deploy to Cluster + +```bash +cd crates/apollo_network/src/bin/broadcast_network_stress_test_node/run +python cluster_start.py --num-nodes 5 --latency 50 --throughput 1000 --mode rr +``` + +This will: +- Build and push a Docker image +- Create Kubernetes StatefulSet with 5 nodes +- Apply network throttling (50ms latency, 1000 KB/s throughput) +- Deploy to a timestamped namespace +- Set up automatic namespace deletion after timeout + +### Advanced Cluster Deployment Options + +```bash +# Use a pre-built image instead of building a new one +python cluster_start.py --image us-central1-docker.pkg.dev/starkware-dev/sequencer/broadcast-network-stress-test-node:2024-01-15-10-30-00 --num-nodes 3 + +# Deploy to dedicated node pool with custom resource limits +python cluster_start.py --num-nodes 4 --dedicated-node-pool --node-pool-name production --cpu-requests 2000m --memory-requests 4Gi + +# Custom timeout and resource configuration +python cluster_start.py --num-nodes 3 --timeout 3600 --cpu-limits 2000m --memory-limits 2Gi +``` + +### Access Prometheus + +```bash +python cluster_port_forward_prometheus.py +``` + +Then visit http://localhost:9090 for metrics visualization. + +### View Logs + +```bash +python cluster_log.py +``` + +This saves logs from all deployed nodes to `/tmp/broadcast-network-stress-test-*.logs.txt` files for offline analysis. + +### Cleanup + +```bash +python cluster_stop.py +``` + +## Network Throttling + +The Docker deployment supports network traffic shaping to simulate realistic network conditions: + +- **Latency Gating**: Add artificial delay to packets (via `LATENCY` environment variable in ms) +- **Throughput Limiting**: Cap bandwidth to test under constrained conditions (via `THROUGHPUT` environment variable in KB/s) + +The entrypoint script uses Linux traffic control (`tc`) with HTB (Hierarchical Token Bucket) for bandwidth limiting and NetEm for latency simulation. + +## Metrics + +The tool exports comprehensive Prometheus metrics with proper labels: + +### Message Flow Metrics +- `broadcast_messages_sent_total`: Total messages sent by this node via broadcast topic +- `broadcast_bytes_sent_total`: Total bytes sent via broadcast topic +- `receive_messages_total`: Total messages received +- `receive_bytes_total`: Total bytes received across all messages + +### Configuration Metrics (Explore Mode) +- `broadcast_message_size`: Current size of the stress test message in bytes +- `broadcast_message_heartbeat_millis`: Current number of milliseconds between consecutive broadcasts +- `broadcast_message_throughput`: Current throughput in bytes per second of broadcasted messages + +### Performance Metrics +- `receive_message_delay_seconds`: End-to-end message latency histogram +- `receive_message_negative_delay_seconds`: Negative message delay histogram (clock synchronization issues) +- `broadcast_message_send_delay_seconds`: Time taken to send messages (local sending latency) + +### Message Ordering Metrics +- `receive_messages_out_of_order_total`: Messages received out of sequence +- `receive_messages_missing_total`: Messages that appear to be missing +- `receive_messages_duplicate_total`: Duplicate messages detected +- `receive_messages_missing_retrieved_total`: Previously missing messages that arrived late + +### Network Connection Metrics +- `network_connected_peers`: Number of connected peers +- `network_blacklisted_peers`: Number of blacklisted peers +- `network_stress_test_sent_messages`: Messages sent via broadcast topic +- `network_stress_test_received_messages`: Messages received via broadcast topic + +### Gossipsub Protocol Metrics +- `gossipsub_mesh_peers`: Number of mesh peers +- `gossipsub_all_peers`: Total number of known peers +- `gossipsub_subscribed_topics`: Number of subscribed topics +- `gossipsub_protocol_peers`: Number of gossipsub protocol peers +- `gossipsub_messages_received`: Number of gossipsub messages received +- `gossipsub_positive_score_peers`: Peers with positive scores +- `gossipsub_negative_score_peers`: Peers with negative scores + +### System Resource Metrics +- `process_cpu_usage_percent`: CPU usage percentage of the current process +- `process_memory_usage_bytes`: Memory usage in bytes of the current process +- `process_virtual_memory_usage_bytes`: Virtual memory usage in bytes +- `system_total_memory_bytes`: Total system memory +- `system_available_memory_bytes`: Available system memory +- `system_network_bytes_sent_total`: Total network bytes sent +- `system_network_bytes_received_total`: Total network bytes received + +All metrics are properly labeled for detailed analysis, and the tool automatically collects system-level metrics at configurable intervals. + +## Configuration + +### Message Structure + +Each stress test message contains: +- **Sender ID**: Node identifier (8 bytes) +- **Message Index**: Sequential message number from sender (8 bytes) +- **Timestamp**: Send time as nanoseconds since UNIX epoch (16 bytes) +- **Payload Length**: Size of variable payload (8 bytes) +- **Payload**: Configurable data (remaining bytes) + +### Network Topology + +- All nodes join the same gossipsub topic: `stress_test_topic` +- Node 0 typically acts as the bootstrap peer for network discovery +- Deterministic peer IDs based on node ID ensure consistent network formation +- Secret keys are generated deterministically from node ID for reproducibility + +## Example Use Cases + +### Latency Testing +```bash +# Test with 100ms network latency +python cluster_start.py --num-nodes 3 --latency 100 --message-size-bytes 512 --mode all +``` + +### Throughput Testing +```bash +# Test with 500 KB/s bandwidth limit +python cluster_start.py --num-nodes 5 --throughput 500 --heartbeat-millis 10 --mode rr +``` + +### Large Message Testing +```bash +# Test with 64KB messages in single broadcaster mode (run from the run directory) +cd crates/apollo_network/src/bin/broadcast_network_stress_test_node/run +python local.py --num-nodes 3 --message-size-bytes 65536 --heartbeat-millis 100 --mode one +``` + +### Network Resilience Testing +```bash +# Test round-robin with constrained network +python cluster_start.py --num-nodes 4 --latency 200 --throughput 100 --mode rr --round-duration-seconds 30 +``` + +### Performance Exploration Testing +```bash +# Automatically explore different message sizes and throughput configurations +python cluster_start.py --num-nodes 3 --mode explore --broadcaster 0 --explore-run-duration-seconds 60 --explore-cool-down-duration-seconds 10 --explore-min-throughput-byte-per-seconds 500 + +# Local exploration with custom parameters (run from the run directory) +cd crates/apollo_network/src/bin/broadcast_network_stress_test_node/run +python local.py --num-nodes 3 --mode explore --broadcaster 1 --explore-run-duration-seconds 45 --explore-cool-down-duration-seconds 15 --explore-min-throughput-byte-per-seconds 1000 +``` + +## Development + +### File Structure + +The codebase is organized into modular components for better maintainability and testing. This modular architecture provides several benefits: + +- **Separation of Concerns**: Each module has a single, well-defined responsibility +- **Testability**: Individual modules can be unit tested independently +- **Maintainability**: Changes to one module don't affect others, reducing regression risk +- **Reusability**: Modules can be reused across different parts of the application +- **Code Organization**: Related functionality is grouped together logically + +#### Core Modules +- `main.rs`: Application entry point and initialization +- `args.rs`: CLI argument parsing and configuration types (`Args`, `Mode`, `NetworkProtocol`) +- `stress_test_node.rs`: Main application logic and `BroadcastNetworkStressTestNode` implementation +- `network_channels.rs`: Network protocol abstraction and channel management (`NetworkChannels`) +- `message_handling.rs`: Message sending/receiving abstractions (`MessageSender`, `MessageReceiver`) +- `metrics.rs`: Comprehensive metrics definitions, monitoring functions, and system resource tracking +- `explore_config.rs`: Exploration mode configuration and phase management (`ExploreConfiguration`) + +#### Supporting Modules +- `converters.rs`: Message serialization/deserialization with ordering support +- `converters_test.rs`: Unit tests for message conversion +- `utils.rs`: Configuration utilities and helper functions + +#### Deployment Scripts +- `run/`: Deployment scripts and configurations + - `local.py`: Local multi-node testing with Prometheus and profiling support + - `cluster_start.py`: Kubernetes deployment with throttling and resource management + - `cluster_stop.py`: Cleanup deployed resources and namespaces + - `cluster_port_forward_prometheus.py`: Resilient Prometheus access with auto-retry + - `cluster_log.py`: Log viewing and management for cluster deployments + - `yaml_maker.py`: Kubernetes YAML generation with RBAC and auto-cleanup + - `args.py`: Shared argument parsing for Python scripts + - `utils.py`: Common utility functions and peer ID management + - `Dockerfile`: Container image with traffic shaping capabilities + - `entrypoint.sh`: Container startup script with network throttling + +### Adding New Metrics + +1. Add new metrics definitions to `metrics.rs` using the `define_metrics!` macro +2. Implement metric recording functions in `metrics.rs` +3. Call metric recording functions from appropriate handlers in `stress_test_node.rs` or `message_handling.rs` +4. Use appropriate labels for detailed analysis +5. Update Prometheus configuration in deployment scripts if needed + +### Adding New Broadcasting Modes + +1. Extend the `Mode` enum in `args.rs` +2. Update the mode-specific logic in `stress_test_node.rs` in the `send_stress_test_messages_impl()` method +3. Add corresponding argument parsing logic in `run/args.py` for Python scripts +4. Update documentation and examples + +### Adding New Network Protocols + +1. Add new protocol variant to `NetworkProtocol` enum in `args.rs` +2. Extend `NetworkChannels` enum in `network_channels.rs` to support the new protocol +3. Update `create_network_manager_with_channels()` function in `network_channels.rs` +4. Add corresponding message handling logic in `message_handling.rs` +5. Update the `take_sender()` and `take_receiver()` methods in `network_channels.rs` + +### Network Configuration + +Modify `NetworkConfig` parameters in `stress_test_node.rs` within the `create_network_config()` method for different P2P behaviors: +- Connection limits and timeouts +- Heartbeat intervals +- Gossipsub parameters (mesh size, fanout, etc.) +- Discovery mechanisms and protocols + +### Module Dependencies + +The modules have the following dependency structure: +- `main.rs` → imports `args.rs` and `stress_test_node.rs` +- `stress_test_node.rs` → imports `args.rs`, `network_channels.rs`, `message_handling.rs`, `metrics.rs`, `explore_config.rs` +- `network_channels.rs` → imports `args.rs`, `message_handling.rs`, `metrics.rs` +- `message_handling.rs` → imports `network_channels.rs` +- `explore_config.rs` → imports `args.rs`, `metrics.rs` +- `metrics.rs` → imports `converters.rs` + +## Troubleshooting + +### Common Issues + +**Nodes not connecting**: Check bootstrap peer address and ensure firewall allows UDP traffic on P2P ports. Verify that node 0 is started first as the bootstrap peer. + +**High or inconsistent latency readings**: Verify system clocks are synchronized across test nodes. Consider NTP setup for distributed testing. + +**Out-of-order messages**: This is normal in P2P networks. Monitor the `messages_out_of_order_total` metric to understand network behavior patterns. + +**Prometheus not scraping**: Confirm metric ports are accessible and Prometheus configuration includes all node endpoints. When using the local script, Prometheus runs in Docker and automatically configures all node endpoints. Check firewall rules and ensure Docker is running properly. + +**Docker permission errors for throttling**: Ensure privileged mode is enabled for network traffic shaping. The container needs CAP_NET_ADMIN capability. + +**Message size errors**: Ensure `--message-size-bytes` is at least 40 bytes (metadata size). Check the calculation in `converters.rs` if issues persist. + +### Debugging + +Enable verbose logging for detailed P2P communication: +```bash +# For local script (default verbosity is 2) +python local.py --verbosity 5 + +# For cluster script (default verbosity is 2) +python cluster_start.py --verbosity 5 --num-nodes 3 + +# For direct binary usage (default verbosity is 0) +./target/release/broadcast_network_stress_test_node --verbosity 5 + +# Set timeout for automatic termination +python local.py --timeout 3600 --verbosity 3 +``` + +Check individual node logs in Kubernetes: +```bash +kubectl logs -n broadcast-network-stress-test-{timestamp} broadcast-network-stress-test-0 -f +``` + +Monitor live metrics during testing: +```bash +# View all metrics from a node +curl http://localhost:2000/metrics + +# Monitor specific metrics +curl -s http://localhost:2000/metrics | grep receive_messages_total +``` + +Use Prometheus queries for analysis: +```promql +# Average message latency by sender +rate(receive_message_delay_seconds_sum[5m]) / rate(receive_message_delay_seconds_count[5m]) + +# Message loss rate +rate(receive_messages_missing_total[5m]) / rate(broadcast_messages_sent_total[5m]) + +# Network throughput +rate(receive_bytes_total[5m]) + +# Explore mode: current throughput +broadcast_message_throughput + +# Monitor specific metrics +curl -s http://localhost:2000/metrics | grep receive_messages_total +``` diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs new file mode 100644 index 00000000000..a036b2dbbd4 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/args.rs @@ -0,0 +1,125 @@ +use std::fmt::Display; + +use clap::{Parser, ValueEnum}; + +#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)] +pub enum Mode { + /// All nodes broadcast messages + #[value(name = "all")] + AllBroadcast, + /// Only the node specified by --broadcaster broadcasts messages + #[value(name = "one")] + OneBroadcast, + /// Nodes take turns broadcasting in round-robin fashion + #[value(name = "rr")] + RoundRobin, + /// Only the node specified by --broadcaster broadcasts messages, + /// Every explore_run_duration_seconds + explore_cool_down_duration_seconds seconds + /// a new combination of MPS and message size is explored. + /// Increases the throughput with each new trial. + /// Configurations are filtered by minimum throughput and minimum message size. + #[value(name = "explore")] + Explore, +} + +#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)] +pub enum NetworkProtocol { + /// Use gossipsub for broadcasting (default) + #[value(name = "gossipsub")] + Gossipsub, + /// Use SQMR for point-to-point communication + #[value(name = "sqmr")] + Sqmr, + /// Use Reversed SQMR where receivers initiate requests to broadcasters + #[value(name = "reversed-sqmr")] + ReveresedSqmr, +} + +impl Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_possible_value().unwrap().get_name()) + } +} + +impl Display for NetworkProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.to_possible_value().unwrap().get_name()) + } +} + +#[derive(Parser, Debug, Clone)] +#[command(version, about, long_about = None)] +pub struct Args { + /// ID for Prometheus logging + #[arg(short, long, env)] + pub id: u64, + + /// Total number of nodes in the network + #[arg(long, env)] + pub num_nodes: u64, + + /// The port to run the Prometheus metrics server on + #[arg(long, env)] + pub metric_port: u16, + + /// The port to run the P2P network on + #[arg(short, env, long)] + pub p2p_port: u16, + + /// The addresses of the bootstrap peers (can specify multiple) + #[arg(long, env, value_delimiter = ',')] + pub bootstrap: Vec, + + /// Set the verbosity level of the logger, the higher the more verbose + #[arg(short, long, env)] + pub verbosity: u8, + + /// Buffer size for the broadcast topic + #[arg(long, env)] + pub buffer_size: usize, + + /// The mode to use for the stress test. + #[arg(long, env)] + pub mode: Mode, + + /// The network protocol to use for communication (default: gossipsub) + #[arg(long, env)] + pub network_protocol: NetworkProtocol, + + /// Which node ID should do the broadcasting - for OneBroadcast and Explore modes + #[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "explore")]))] + pub broadcaster: Option, + + /// Duration each node broadcasts before switching (in seconds) - for RoundRobin mode + #[arg(long, env, required_if_eq("mode", "rr"))] + pub round_duration_seconds: Option, + + /// Size of StressTestMessage + #[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))] + pub message_size_bytes: Option, + + /// The time to sleep between broadcasts of StressTestMessage in milliseconds + #[arg(long, env, required_if_eq_any([("mode", "one"), ("mode", "all"), ("mode", "rr")]))] + pub heartbeat_millis: Option, + + /// Cool down duration between configuration changes in seconds - for Explore mode + #[arg(long, env, required_if_eq("mode", "explore"))] + pub explore_cool_down_duration_seconds: Option, + + /// Duration to run each configuration in seconds - for Explore mode + #[arg(long, env, required_if_eq("mode", "explore"))] + pub explore_run_duration_seconds: Option, + + /// Minimum throughput in bytes per second - for Explore mode + #[arg(long, env, required_if_eq("mode", "explore"))] + pub explore_min_throughput_byte_per_seconds: Option, + + /// Minimum message size in bytes - for Explore mode + #[arg(long, env, required_if_eq("mode", "explore"))] + pub explore_min_message_size_bytes: Option, + + /// The timeout in seconds for the node. + /// When the node runs for longer than this, it will be killed. + #[arg(long, env)] + pub timeout: u64, +} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/explore_config.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/explore_config.rs new file mode 100644 index 00000000000..a1da78c1455 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/explore_config.rs @@ -0,0 +1,114 @@ +use std::time::Duration; + +use crate::args::Args; +use crate::metrics::{get_throughput, seconds_since_epoch}; + +const EXPLORE_MESSAGE_SIZES_BYTES: [usize; 13] = [ + 1 << 10, + 1 << 11, + 1 << 12, + 1 << 13, + 1 << 14, + 1 << 15, + 1 << 16, + 1 << 17, + 1 << 18, + 1 << 19, + 1 << 20, + 1 << 21, + 1 << 22, +]; +const EXPLORE_MESSAGE_HEARTBEAT_MILLIS: [u64; 16] = + [1, 2, 3, 4, 5, 10, 20, 30, 40, 50, 100, 150, 200, 250, 500, 1000]; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ExplorePhase { + /// In cooldown period - no broadcasting should occur + CoolDown, + /// In running period - broadcasting should occur (if this node is the broadcaster) + Running, +} + +#[derive(Clone)] +pub struct ExploreConfiguration { + sorted_configurations: Vec<(usize, Duration)>, + /// The broadcaster configuration index + configuration_index: usize, + /// Duration of the Running phase of the cycle + run_duration_seconds: u64, + /// Total duration for one complete cycle (cooldown + run_duration_seconds) + cycle_duration_seconds: u64, +} + +impl ExploreConfiguration { + pub fn new( + cool_down_duration_seconds: u64, + run_duration_seconds: u64, + min_throughput_byte_per_seconds: f64, + min_message_size_bytes: usize, + ) -> ExploreConfiguration { + let mut sorted_configurations = Vec::with_capacity( + EXPLORE_MESSAGE_SIZES_BYTES.len() * EXPLORE_MESSAGE_HEARTBEAT_MILLIS.len(), + ); + for message_size in EXPLORE_MESSAGE_SIZES_BYTES { + for heartbeat_millis in EXPLORE_MESSAGE_HEARTBEAT_MILLIS { + sorted_configurations.push((message_size, Duration::from_millis(heartbeat_millis))); + } + } + sorted_configurations.retain(|(size, duration)| { + *size >= min_message_size_bytes + && get_throughput(*size, *duration) >= min_throughput_byte_per_seconds + }); + sorted_configurations + .sort_by_cached_key(|(size, duration)| get_throughput(*size, *duration) as u64); + + let cycle_duration_seconds = cool_down_duration_seconds + run_duration_seconds; + + Self { + sorted_configurations, + configuration_index: 0, + run_duration_seconds, + cycle_duration_seconds, + } + } + + /// Gets the current phase within the current configuration cycle + pub fn get_current_phase(&self) -> ExplorePhase { + let now_seconds = seconds_since_epoch(); + let position_in_cycle_seconds = now_seconds % self.cycle_duration_seconds; + + if position_in_cycle_seconds < self.run_duration_seconds { + ExplorePhase::Running + } else { + ExplorePhase::CoolDown + } + } + + /// Gets the current message size and duration based on synchronized time + pub fn get_current_size_and_heartbeat(&mut self) -> (usize, Duration) { + let config_index = self.configuration_index; + self.configuration_index += 1; + if self.configuration_index >= self.sorted_configurations.len() { + self.configuration_index = 0; + } + self.sorted_configurations[config_index] + } +} + +/// Extracts explore mode parameters from arguments with validation +pub fn extract_explore_params(args: &Args) -> (u64, u64, f64, usize) { + let cool_down = args + .explore_cool_down_duration_seconds + .expect("explore_cool_down_duration_seconds required for explore mode"); + let run_duration = args + .explore_run_duration_seconds + .expect("explore_run_duration_seconds required for explore mode"); + let min_throughput = args + .explore_min_throughput_byte_per_seconds + .expect("explore_min_throughput_byte_per_seconds required for explore mode"); + let min_message_size = args + .explore_min_message_size_bytes + .expect("explore_min_message_size_bytes required for explore mode"); + + (cool_down, run_duration, min_throughput, min_message_size) +} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs index 50a4f3a8d09..426aa73016a 100644 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/main.rs @@ -1,163 +1,32 @@ //! Runs a node that stress tests the p2p communication of the network. - -use std::convert::Infallible; +#![allow(clippy::as_conversions)] use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::str::FromStr; -use std::time::SystemTime; -use std::vec; +use std::time::Duration; -use apollo_network::network_manager::{ - BroadcastTopicChannels, - BroadcastTopicClient, - BroadcastTopicClientTrait, - BroadcastTopicServer, - NetworkManager, -}; -use apollo_network::NetworkConfig; -use apollo_network_types::network_types::BroadcastedMessageMetadata; use clap::Parser; -use converters::{StressTestMessage, METADATA_SIZE}; -use futures::future::join_all; -use futures::StreamExt; -use libp2p::gossipsub::{Sha256Topic, Topic}; -use libp2p::Multiaddr; -// use metrics::{counter, gauge}; +use converters::METADATA_SIZE; use metrics_exporter_prometheus::PrometheusBuilder; -use tokio::time::Duration; -use tracing::{info, trace, Level}; +use tokio_metrics::RuntimeMetricsReporterBuilder; +use tracing::Level; #[cfg(test)] mod converters_test; +mod args; mod converters; +mod explore_config; +mod message_handling; +mod message_index_detector; pub mod metrics; +mod network_channels; +mod stress_test_node; mod utils; -lazy_static::lazy_static! { - static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string()); -} - -#[derive(Parser, Debug, Clone)] -#[command(version, about, long_about = None)] -struct Args { - /// ID for Prometheus logging - #[arg(short, long, env)] - id: usize, - - /// The port to run the Prometheus metrics server on - #[arg(long, env, default_value_t = 2000)] - metric_port: u16, - - /// The port to run the P2P network on - #[arg(short, env, long, default_value_t = 10000)] - p2p_port: u16, - - /// The address to the bootstrap peer - #[arg(long, env)] - bootstrap: Option, - - /// Set the verbosity level of the logger, the higher the more verbose - #[arg(short, long, env, default_value_t = 0)] - verbosity: u8, - - /// Buffer size for the broadcast topic - // Default from crates/apollo_consensus_manager/src/config.rs - #[arg(short, long, env, default_value_t = 10000)] - buffer_size: usize, - - /// Size of StressTestMessage - #[arg(short, long, env, default_value_t = 1 << 10)] - message_size_bytes: usize, - - /// The time to sleep between broadcasts of StressTestMessage in milliseconds - #[arg(long, env, default_value_t = 1)] - heartbeat_millis: u64, - - /// Maximum duration in seconds to run the node for - #[arg(short, long, env, default_value_t = 3_600)] - timeout: u64, -} - -async fn send_stress_test_messages( - mut broadcast_topic_client: BroadcastTopicClient, - args: &Args, - _peer_id: String, -) { - let mut message = StressTestMessage::new( - args.id.try_into().unwrap(), - 0, // message_index, will be updated in loop - vec![0; args.message_size_bytes - *METADATA_SIZE], - ); - let duration = Duration::from_millis(args.heartbeat_millis); - - for i in 0.. { - message.metadata.time = SystemTime::now(); - message.metadata.message_index = i; - broadcast_topic_client.broadcast_message(message.clone()).await.unwrap(); - trace!("Sent message {i}: {:?}", message); - // counter!("sent_messages").increment(1); - tokio::time::sleep(duration).await; - } -} - -fn receive_stress_test_message( - message_result: Result, - _metadata: BroadcastedMessageMetadata, -) { - let end_time = SystemTime::now(); - - let received_message = message_result.unwrap(); - let start_time = received_message.metadata.time; - let duration = match end_time.duration_since(start_time) { - Ok(duration) => duration, - Err(_) => panic!("Got a negative duration, the clocks are not synced!"), - }; - - let _delay_seconds = duration.as_secs_f64(); - // let delay_micros = duration.as_micros().try_into().unwrap(); - - // TODO(AndrewL): Concentrate all string metrics to constants in a different file - // counter!("message_received").increment(1); - // counter!(format!("message_received_from_{}", - // received_message.metadata.sender_id)).increment(1); - - // TODO(AndrewL): This should be a historgram - // gauge!("message_received_delay_seconds").set(delay_seconds); - // gauge!(format!("message_received_delay_seconds_from_{}", - // received_message.metadata.sender_id)) .set(delay_seconds); - - // counter!("message_received_delay_micros_sum").increment(delay_micros); - // counter!(format!( - // "message_received_delay_micros_sum_from_{}", - // received_message.metadata.sender_id - // )) - // .increment(delay_micros); - // TODO(AndrewL): Figure out what to log here -} - -async fn receive_stress_test_messages( - broadcasted_messages_receiver: BroadcastTopicServer, -) { - broadcasted_messages_receiver - .for_each(|result| async { - let (message_result, metadata) = result; - tokio::task::spawn_blocking(|| receive_stress_test_message(message_result, metadata)); - }) - .await; - unreachable!("BroadcastTopicServer stream should never terminate..."); -} - -fn create_peer_private_key(peer_index: usize) -> [u8; 32] { - let peer_index: u64 = peer_index.try_into().expect("Failed converting usize to u64"); - let array = peer_index.to_le_bytes(); - assert_eq!(array.len(), 8); - let mut private_key = [0u8; 32]; - private_key[0..8].copy_from_slice(&array); - private_key -} +use args::Args; +use stress_test_node::BroadcastNetworkStressTestNode; #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let args = Args::parse(); let level = match args.verbosity { @@ -176,11 +45,12 @@ async fn main() { println!("Starting network stress test with args:\n{args:?}"); assert!( - args.message_size_bytes >= *METADATA_SIZE, + args.message_size_bytes.unwrap_or(*METADATA_SIZE) >= *METADATA_SIZE, "Message size must be at least {} bytes", *METADATA_SIZE ); + // Set up metrics let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::V4(SocketAddrV4::new( Ipv4Addr::UNSPECIFIED, args.metric_port, @@ -188,56 +58,14 @@ async fn main() { builder.install().expect("Failed to install prometheus recorder/exporter"); - let peer_private_key = create_peer_private_key(args.id); - let peer_private_key_hex = - peer_private_key.iter().map(|byte| format!("{byte:02x}")).collect::(); - info!("Secret Key: {peer_private_key_hex:#?}"); - - let mut network_config = NetworkConfig { - port: args.p2p_port, - secret_key: Some(peer_private_key.to_vec()), - ..Default::default() - }; - if let Some(peer) = &args.bootstrap { - let bootstrap_peer: Multiaddr = Multiaddr::from_str(peer).unwrap(); - network_config.bootstrap_peer_multiaddr = Some(vec![bootstrap_peer]); - } - - let mut network_manager = NetworkManager::new(network_config, None, None); - - let peer_id = network_manager.get_local_peer_id(); - info!("My PeerId: {peer_id}"); - - let network_channels = network_manager - .register_broadcast_topic::(TOPIC.clone(), args.buffer_size) - .unwrap(); - let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = - network_channels; - - let mut tasks = Vec::new(); - - tasks.push(tokio::spawn(async move { - // Start the network manager to handle incoming connections and messages. - network_manager.run().await.unwrap(); - unreachable!("Network manager should not exit"); - })); - - tasks.push(tokio::spawn(async move { - receive_stress_test_messages(broadcasted_messages_receiver).await; - unreachable!("Broadcast topic receiver should not exit"); - })); - - let args_clone = args.clone(); - tasks.push(tokio::spawn(async move { - send_stress_test_messages(broadcast_topic_client, &args_clone, peer_id).await; - unreachable!("Broadcast topic client should not exit"); - })); + // Start the tokio runtime metrics reporter to automatically collect and export runtime metrics + tokio::spawn( + RuntimeMetricsReporterBuilder::default() + .with_interval(Duration::from_secs(1)) + .describe_and_run(), + ); - let test_timeout = Duration::from_secs(args.timeout); - match tokio::time::timeout(test_timeout, join_all(tasks.into_iter())).await { - Ok(_) => unreachable!(), - Err(e) => { - info!("Test timeout after {e}"); - } - } + // Create and run the stress test node + let stress_test_node = BroadcastNetworkStressTestNode::new(args).await; + stress_test_node.run().await } diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs new file mode 100644 index 00000000000..e2bacccec94 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_handling.rs @@ -0,0 +1,152 @@ +use apollo_network::network_manager::{ + BroadcastTopicClient, + BroadcastTopicClientTrait, + BroadcastTopicServer, + SqmrClientSender, + SqmrServerReceiver, +}; +use futures::StreamExt; +use libp2p::PeerId; +use tracing::{error, info, trace}; + +use crate::network_channels::TopicType; + +/// Message sender abstraction for different protocols +pub enum MessageSender { + Gossipsub(BroadcastTopicClient), + Sqmr(SqmrClientSender), + ReveresedSqmr(ReveresedSqmrSender), +} + +/// Wrapper for ReveresedSqmr that maintains the last active query +pub struct ReveresedSqmrSender { + server: SqmrServerReceiver, + active_query: Option>, +} + +impl ReveresedSqmrSender { + pub fn new(server: SqmrServerReceiver) -> Self { + Self { server, active_query: None } + } + + async fn collect_new_queries(&mut self) { + // Non-blocking check for new queries, keeping only the last one + while let Ok(query) = + tokio::time::timeout(tokio::time::Duration::from_millis(1), self.server.next()).await + { + if let Some(query) = query { + info!("ReveresedSqmr: Received new query, replacing previous query"); + self.active_query = Some(query); + } else { + break; + } + } + } + + async fn broadcast_to_queries(&mut self, message: TopicType) { + if let Some(query) = &mut self.active_query { + match query.send_response(message).await { + Ok(()) => { + trace!("ReveresedSqmr: Sent response to active query"); + } + Err(e) => { + // Query failed, remove it + error!("ReveresedSqmr: Active query failed, removing it, error: {:?}", e); + self.active_query = None; + } + } + } + } +} + +impl MessageSender { + pub async fn send_message(&mut self, _peers: &[PeerId], message: TopicType) { + match self { + MessageSender::Gossipsub(client) => { + client.broadcast_message(message).await.unwrap(); + } + MessageSender::Sqmr(client) => { + // Send query and properly handle the response manager to avoid session warnings + match client.send_new_query(message).await { + Ok(mut response_manager) => { + // Consume the response manager to properly close the session + // This prevents the "finished with no messages" warning + tokio::spawn(async move { + while let Some(_response) = response_manager.next().await { + // Process any responses if they come, but don't block the sender + } + }); + } + Err(e) => { + error!("Failed to send SQMR query: {:?}", e); + } + } + } + MessageSender::ReveresedSqmr(sender) => { + // Collect any new queries first + sender.collect_new_queries().await; + // Then broadcast the message to all active queries + sender.broadcast_to_queries(message).await; + } + } + } +} + +pub enum MessageReceiver { + Gossipsub(BroadcastTopicServer), + Sqmr(SqmrServerReceiver), + ReveresedSqmr(SqmrClientSender), +} + +impl MessageReceiver { + pub async fn for_each(self, mut f: F) + where + F: FnMut(TopicType, Option) + Copy, + { + match self { + MessageReceiver::Gossipsub(receiver) => { + receiver + .for_each(|message| async move { + let peer_id = message.1.originator_id.private_get_peer_id(); + f(message.0.unwrap(), Some(peer_id)); + }) + .await + } + MessageReceiver::Sqmr(receiver) => { + receiver + .for_each(|x| async move { + f(x.query().as_ref().unwrap().to_vec(), None); + }) + .await + } + MessageReceiver::ReveresedSqmr(mut client) => loop { + match client.send_new_query(vec![]).await { + Ok(mut response_manager) => loop { + let response_result = response_manager.next().await; + match response_result { + Some(Ok(response_data)) => { + f(response_data, None); + } + Some(Err(_)) => { + error!("ReveresedSqmr: Failed to parse response"); + break; + } + None => { + error!("ReveresedSqmr: Response stream ended"); + break; + } + } + }, + Err(e) => { + error!( + "Failed to establish ReveresedSqmr connection, keeping client alive, \ + error: {:?}", + e + ); + // sleep(Duration::from_secs(1)).await; + } + } + }, + } + } +} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_index_detector.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_index_detector.rs new file mode 100644 index 00000000000..131902ad5ce --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/message_index_detector.rs @@ -0,0 +1,32 @@ +#[derive(Default)] +pub struct MessageIndexTracker { + seen_messages_count: u64, + max_message_index: Option, + min_message_index: Option, +} + +impl MessageIndexTracker { + pub fn seen_message(&mut self, message_index: u64) { + self.seen_messages_count += 1; + if self.max_message_index.is_none() || self.max_message_index.unwrap() < message_index { + self.max_message_index = Some(message_index); + } + if self.min_message_index.is_none() || self.min_message_index.unwrap() > message_index { + self.min_message_index = Some(message_index); + } + } + + pub fn pending_messages_count(&self) -> u64 { + if self.seen_messages_count == 0 { + return 0; + } + + let min_message_index = self.min_message_index.unwrap(); + let max_message_index = self.max_message_index.unwrap(); + if min_message_index == max_message_index { + return 0; + } + + max_message_index - min_message_index + 1 - self.seen_messages_count + } +} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs index f86d1419cb3..57cab65d3ad 100644 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/metrics.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use apollo_metrics::define_metrics; @@ -12,11 +13,13 @@ use apollo_network::network_manager::metrics::{ NETWORK_BROADCAST_DROP_LABELS, }; use libp2p::gossipsub::{Sha256Topic, Topic}; +use libp2p::PeerId; use sysinfo::{Networks, System}; use tokio::time::interval; use tracing::warn; use crate::converters::StressTestMessage; +use crate::message_index_detector::MessageIndexTracker; lazy_static::lazy_static! { pub static ref TOPIC: Sha256Topic = Topic::new("stress_test_topic".to_string()); @@ -32,6 +35,7 @@ define_metrics!( MetricHistogram { BROADCAST_MESSAGE_SEND_DELAY_SECONDS, "broadcast_message_send_delay_seconds", "Message sending delay in seconds" }, MetricGauge { RECEIVE_MESSAGE_BYTES, "receive_message_bytes", "Size of the stress test received message in bytes" }, + MetricGauge { RECEIVE_MESSAGE_PENDING_COUNT, "receive_message_pending_count", "Number of stress test messages pending to be received" }, MetricCounter { RECEIVE_MESSAGE_COUNT, "receive_message_count", "Number of stress test messages received via broadcast", init = 0 }, MetricCounter { RECEIVE_MESSAGE_BYTES_SUM, "receive_message_bytes_sum", "Sum of the stress test messages received via broadcast", init = 0 }, MetricHistogram { RECEIVE_MESSAGE_DELAY_SECONDS, "receive_message_delay_seconds", "Message delay in seconds" }, @@ -67,7 +71,11 @@ pub fn update_broadcast_metrics(message_size_bytes: usize, broadcast_heartbeat: BROADCAST_MESSAGE_THROUGHPUT.set(get_throughput(message_size_bytes, broadcast_heartbeat)); } -pub fn receive_stress_test_message(received_message: Vec) { +pub fn receive_stress_test_message( + received_message: Vec, + sender_peer_id: Option, + message_index_tracker: Arc>>, +) { let end_time = SystemTime::now(); let received_message: StressTestMessage = received_message.into(); @@ -93,6 +101,18 @@ pub fn receive_stress_test_message(received_message: Vec) { } else { RECEIVE_MESSAGE_NEGATIVE_DELAY_SECONDS.record(-delay_seconds); } + + let pending = { + let mut lock = message_index_tracker.lock().unwrap(); + let tracker = lock.entry(sender_peer_id.unwrap()).or_default(); + tracker.seen_message(received_message.metadata.message_index); + let mut pending = 0; + for (_, tracker) in lock.iter() { + pending += tracker.pending_messages_count(); + } + pending + }; + RECEIVE_MESSAGE_PENDING_COUNT.set(pending.into_f64()); } pub fn seconds_since_epoch() -> u64 { diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/network_channels.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/network_channels.rs new file mode 100644 index 00000000000..2c522d601dd --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/network_channels.rs @@ -0,0 +1,140 @@ +use apollo_network::network_manager::{ + BroadcastTopicChannels, + BroadcastTopicClient, + BroadcastTopicServer, + NetworkManager, + SqmrClientSender, + SqmrServerReceiver, +}; +use apollo_network::NetworkConfig; + +use crate::args::NetworkProtocol; +use crate::message_handling::{MessageReceiver, MessageSender}; +use crate::metrics::{create_network_metrics, TOPIC}; + +pub type TopicType = Vec; + +pub const SQMR_PROTOCOL_NAME: &str = "/stress-test/1.0.0"; + +/// Network communication channels for different protocols +pub enum NetworkChannels { + Gossipsub { + broadcast_topic_client: Option>, + broadcasted_messages_receiver: Option>, + }, + Sqmr { + sqmr_client: Option>, + sqmr_server: Option>, + }, + ReveresedSqmr { + sqmr_client: Option>, + sqmr_server: Option>, + }, +} + +impl NetworkChannels { + pub fn take_sender(&mut self) -> MessageSender { + match self { + NetworkChannels::Gossipsub { + broadcast_topic_client, + broadcasted_messages_receiver: _, + } => MessageSender::Gossipsub( + broadcast_topic_client.take().expect("broadcast_topic_client should be available"), + ), + NetworkChannels::Sqmr { sqmr_client, sqmr_server: _ } => { + MessageSender::Sqmr(sqmr_client.take().expect("sqmr_client should be available")) + } + NetworkChannels::ReveresedSqmr { sqmr_server, sqmr_client: _ } => { + MessageSender::ReveresedSqmr(crate::message_handling::ReveresedSqmrSender::new( + sqmr_server.take().expect("sqmr_server should be available"), + )) + } + } + } + + pub fn take_receiver(&mut self) -> MessageReceiver { + match self { + NetworkChannels::Gossipsub { + broadcasted_messages_receiver, + broadcast_topic_client: _, + } => MessageReceiver::Gossipsub( + broadcasted_messages_receiver + .take() + .expect("broadcasted_messages_receiver should be available"), + ), + NetworkChannels::Sqmr { sqmr_server, sqmr_client: _ } => { + MessageReceiver::Sqmr(sqmr_server.take().expect("sqmr_server should be available")) + } + NetworkChannels::ReveresedSqmr { sqmr_client, sqmr_server: _ } => { + MessageReceiver::ReveresedSqmr( + sqmr_client.take().expect("sqmr_client should be available"), + ) + } + } + } +} + +/// Creates and sets up a network manager with protocol registration +#[allow(clippy::type_complexity)] +pub fn create_network_manager_with_channels( + network_config: &NetworkConfig, + buffer_size: usize, + protocol: &NetworkProtocol, +) -> (NetworkManager, NetworkChannels) { + let network_metrics = create_network_metrics(); + let mut network_manager = + NetworkManager::new(network_config.clone(), None, Some(network_metrics)); + + let channels = match protocol { + NetworkProtocol::Gossipsub => { + let network_channels = network_manager + .register_broadcast_topic::(TOPIC.clone(), buffer_size) + .expect("Failed to register broadcast topic"); + let BroadcastTopicChannels { + broadcasted_messages_receiver, + broadcast_topic_client, + } = network_channels; + + NetworkChannels::Gossipsub { + broadcast_topic_client: Some(broadcast_topic_client), + broadcasted_messages_receiver: Some(broadcasted_messages_receiver), + } + } + NetworkProtocol::Sqmr => { + let sqmr_client = network_manager + .register_sqmr_protocol_client::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + let sqmr_server = network_manager + .register_sqmr_protocol_server::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + + NetworkChannels::Sqmr { + sqmr_client: Some(sqmr_client), + sqmr_server: Some(sqmr_server), + } + } + NetworkProtocol::ReveresedSqmr => { + let sqmr_client = network_manager + .register_sqmr_protocol_client::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + let sqmr_server = network_manager + .register_sqmr_protocol_server::( + SQMR_PROTOCOL_NAME.to_string(), + buffer_size, + ); + + NetworkChannels::ReveresedSqmr { + sqmr_client: Some(sqmr_client), + sqmr_server: Some(sqmr_server), + } + } + }; + + (network_manager, channels) +} diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile deleted file mode 100644 index b58d4bf57f5..00000000000 --- a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -# syntax = devthefuture/dockerfile-x -# -# Run with the project root context: -# docker build -f crates/apollo_network/src/bin/network_stress_test/cluster/Dockerfile . - -INCLUDE deployments/images/base/Dockerfile - -FROM base AS builder -WORKDIR /usr/src/rust_code -COPY . . -# TODO(AndrewL): use cargo chef for better caching -RUN cargo build --release --bin network_stress_test - -FROM ubuntu:24.04 AS final_stage -COPY --from=builder /usr/src/rust_code/target/release/network_stress_test /usr/local/bin/network_stress_test - - -ENTRYPOINT ["network_stress_test"] diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.fast b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.fast new file mode 100644 index 00000000000..7c541a13056 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.fast @@ -0,0 +1,11 @@ +# does not build, only copies the binary and entrypoint.sh from the local machine +FROM ubuntu:24.04 + +RUN apt update && apt -y install iproute2 kmod chrony && apt clean && rm -rf /var/lib/apt/lists/* + +COPY --from=tmp broadcast_network_stress_test_node /usr/local/bin/broadcast_network_stress_test_node +COPY ./crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/entrypoint.sh /entrypoint.sh + +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.slow b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.slow new file mode 100644 index 00000000000..8a6542191df --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/Dockerfile.slow @@ -0,0 +1,37 @@ +# syntax = devthefuture/dockerfile-x +# +# Run with the project root context: +# docker build -f crates/apollo_network/src/bin/broadcast_network_stress_test_node/cluster/Dockerfile . +# +# For time synchronization, you can either: +# 1. Use the built-in chrony service (configured below), or +# 2. Mount host time at runtime with: -v /etc/localtime:/etc/localtime:ro -v /etc/timezone:/etc/timezone:ro + +INCLUDE deployments/images/base/Dockerfile + +# --- Stage 1: Install cargo-chef and prepare recipe --- +FROM base AS planner +WORKDIR /app +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + +# --- Stage 2: Build dependencies --- +FROM base AS builder +WORKDIR /app +COPY --from=planner /app/recipe.json recipe.json +RUN cargo chef cook --recipe-path recipe.json +COPY . . +# using tokio_unstable to get additional tokio metrics +RUN RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin broadcast_network_stress_test_node + +# --- Final Stage: Runtime image --- +FROM ubuntu:24.04 AS final_stage + +RUN apt update && apt -y install iproute2 kmod chrony && apt clean && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /app/target/release/broadcast_network_stress_test_node /usr/local/bin/broadcast_network_stress_test_node +COPY --from=builder /app/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/entrypoint.sh /entrypoint.sh + +RUN chmod +x /entrypoint.sh + +ENTRYPOINT ["/entrypoint.sh"] diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/args.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/args.py new file mode 100644 index 00000000000..809ddaff315 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/args.py @@ -0,0 +1,172 @@ +import argparse + + +def add_shared_args_to_parser(parser: argparse.ArgumentParser): + """ + Adds the arguments that are shared between the local and cluster deployment scripts + """ + parser.add_argument( + "--num-nodes", help="Number of nodes to run", type=int, default=3 + ) + parser.add_argument( + "--verbosity", + help="Verbosity level for logging (0: None, 1: ERROR, 2: WARN, 3: INFO, 4: DEBUG, 5..: TRACE)", + type=int, + default=2, + ) + parser.add_argument( + "--buffer-size", + help="Buffer size to use by default.", + type=int, + default=100000, + ) + parser.add_argument( + "--message-size-bytes", + help="Buffer size to use by default.", + type=int, + default=1 << 10, + ) + parser.add_argument( + "--heartbeat-millis", + help="Number of milliseconds to wait between consecutive broadcasts.", + type=int, + default=1000, + ) + parser.add_argument( + "--mode", + help="The mode to use for the stress test.", + choices=["all", "one", "rr", "explore"], + default="all", + ) + parser.add_argument( + "--network-protocol", + help="The network protocol to use for communication.", + choices=["gossipsub", "sqmr", "reversed-sqmr"], + default="gossipsub", + ) + parser.add_argument( + "--broadcaster", + help="In mode `one`, which node ID should do the broadcasting, default is the last node.", + type=int, + default=None, + ) + parser.add_argument( + "--round-duration-seconds", + help="Duration each node broadcasts before switching (in seconds) - for RoundRobin mode", + type=int, + default=3, + ) + parser.add_argument( + "--quic", + help="Sets the multi-addresses to use UDP/QUIC instead of TCP", + action="store_true", + default=False, + ) + parser.add_argument( + "--explore-cool-down-duration-seconds", + help="Cool down duration between configuration changes in seconds - for Explore mode", + type=int, + default=100, + ) + parser.add_argument( + "--explore-run-duration-seconds", + help="Duration to run each configuration in seconds - for Explore mode", + type=int, + default=100, + ) + parser.add_argument( + "--explore-min-throughput-byte-per-seconds", + help="Minimum throughput in bytes per second - for Explore mode", + type=float, + default=100 * (1 << 10), # 100 KB/s + ) + parser.add_argument( + "--explore-min-message-size-bytes", + help="Minimum message size in bytes - for Explore mode", + type=int, + default=1 << 10, # 1 KB + ) + parser.add_argument( + "--timeout", + help="The timeout in seconds for the node. Note than when running in a cluster the pod will be redeployed.", + type=int, + default=4000, + ) + + +def get_arguments( + id: int | None, + metric_port: int, + p2p_port: int, + bootstrap_nodes: list[str], + args: argparse.Namespace, +) -> list[tuple[str, str]]: + result = [ + ("--metric-port", str(metric_port)), + ("--p2p-port", str(p2p_port)), + ("--bootstrap", ",".join(bootstrap_nodes)), + ("--timeout", str(args.timeout)), + ("--verbosity", str(args.verbosity)), + ("--buffer-size", str(args.buffer_size)), + ("--message-size-bytes", str(args.message_size_bytes)), + ("--heartbeat-millis", str(args.heartbeat_millis)), + ("--mode", str(args.mode)), + ("--network-protocol", str(args.network_protocol)), + ( + "--broadcaster", + ( + str(args.broadcaster) + if args.broadcaster is not None + else str(args.num_nodes - 1) + ), + ), + ("--round-duration-seconds", str(args.round_duration_seconds)), + ( + "--explore-cool-down-duration-seconds", + str(args.explore_cool_down_duration_seconds), + ), + ("--explore-run-duration-seconds", str(args.explore_run_duration_seconds)), + ( + "--explore-min-throughput-byte-per-seconds", + str(args.explore_min_throughput_byte_per_seconds), + ), + ( + "--explore-min-message-size-bytes", + str(args.explore_min_message_size_bytes), + ), + ("--num-nodes", str(args.num_nodes)), + ] + if id is not None: + result.insert(0, ("--id", str(id))) + return result + + +def get_env_vars( + id: int | None, + metric_port: int, + p2p_port: int, + bootstrap_nodes: list[str], + args: argparse.Namespace, +) -> list[dict[str, str]]: + arguments = get_arguments( + id=id, + metric_port=metric_port, + p2p_port=p2p_port, + bootstrap_nodes=bootstrap_nodes, + args=args, + ) + + env_vars = [] + + # Convert arguments to environment variables + for name, value in arguments: + env_name = name[2:].replace("-", "_").upper() + env_vars.append({"name": env_name, "value": str(value)}) + + # Add latency and throughput if provided + for arg_name, env_name in [("latency", "LATENCY"), ("throughput", "THROUGHPUT")]: + value = getattr(args, arg_name) + if value is not None: + env_vars.append({"name": env_name, "value": str(value)}) + + return env_vars diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_logs.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_logs.py new file mode 100644 index 00000000000..3dce7bb15fc --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_logs.py @@ -0,0 +1,26 @@ +from cluster_stop import open_deployment_file +from utils import run_cmd + + +def cluster_log(): + deployment_data = open_deployment_file() + num_nodes = deployment_data["args"]["num_nodes"] + namespace_name = deployment_data["namespace"] + + run_cmd(f"kubectl get pods -n {namespace_name}", hint="Check if pods are running") + for i in range(num_nodes): + # For Indexed Jobs, pods are named: job-name-index-randomsuffix + # Use label selector to find the pod for this index + run_cmd( + f"timeout 5 kubectl logs -n {namespace_name} -l app=broadcast-network-stress-test,batch.kubernetes.io/job-completion-index={i} > /tmp/broadcast-network-stress-test-{i}.logs.txt", + hint=f"Check logs for node {i}", + may_fail=True, + ) + run_cmd(f"kubectl get pods -n {namespace_name}", hint="Check if pods are running") + + +if __name__ == "__main__": + cluster_log() + print( + "Cluster logs have been saved to /tmp/broadcast-network-stress-test-*.logs.txt" + ) diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_grafana.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_grafana.py new file mode 100644 index 00000000000..224eec932c9 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_grafana.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +Port forward Grafana service from Kubernetes cluster to local machine. +This script reads the deployment information and sets up port forwarding for Grafana. +""" + +import json +import os +import signal +import sys +from cluster_stop import broadcast_network_stress_test_deployment_file_name +from utils import run_cmd, pr + + +def cluster_port_forward_grafana(): + """Set up port forwarding for Grafana service.""" + if not os.path.exists(broadcast_network_stress_test_deployment_file_name): + pr("No deployment file found. Please run cluster_start.py first.") + return False + + # Read deployment information + with open(broadcast_network_stress_test_deployment_file_name, "r") as f: + deployment_info = json.load(f) + + namespace = deployment_info.get("namespace") + if not namespace: + pr("No namespace found in deployment file.") + return False + + pr(f"Setting up port forwarding for Grafana in namespace: {namespace}") + pr("Grafana will be available at: http://localhost:3000") + pr("Default credentials: admin/admin") + pr("Press Ctrl+C to stop port forwarding") + + try: + # Set up port forwarding for Grafana + run_cmd( + f"kubectl port-forward service/grafana-service 3000:3000 -n {namespace}" + ) + except KeyboardInterrupt: + pr("Port forwarding stopped.") + return True + except Exception as e: + pr(f"Failed to set up port forwarding: {e}") + return False + + +def main(): + """Main entry point.""" + success = cluster_port_forward_grafana() + sys.exit(0 if success else 1) + + +if __name__ == "__main__": + main() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_prometheus.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_prometheus.py new file mode 100644 index 00000000000..43cd365dadf --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_port_forward_prometheus.py @@ -0,0 +1,34 @@ +import json +import os +from utils import ( + pr, + connect_to_cluster, + run_cmd, +) +from yaml_maker import prometheus_service_name +from cluster_stop import broadcast_network_stress_test_deployment_file_name + + +def cluster_port_forward_prometheus(): + assert os.path.exists( + broadcast_network_stress_test_deployment_file_name + ), "Deployment file does not exist. Have you started a network stress test?" + + with open(broadcast_network_stress_test_deployment_file_name, "r") as f: + deployment_data: dict = json.load(f) + + name_space_name = deployment_data.get("namespace") + if name_space_name == None: + print("ERROR: No namespace found in deployment file") + return + + connect_to_cluster() + + pr("Access Prometheus at: http://localhost:9090") + run_cmd( + f"kubectl port-forward service/{prometheus_service_name} 9090:9090 -n {name_space_name}" + ) + + +if __name__ == "__main__": + cluster_port_forward_prometheus() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_start.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_start.py new file mode 100644 index 00000000000..c07be29b3db --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_start.py @@ -0,0 +1,307 @@ +import argparse +import os +from time import sleep +import json +from cluster_port_forward_grafana import cluster_port_forward_grafana +from utils import ( + make_timestamp, + run_cmd, + pr, + connect_to_cluster, + project_root, + get_commit, +) +from yaml_maker import ( + get_prometheus_yaml_file, + get_prometheus_deployment_yaml_file, + get_prometheus_service_yaml_file, + get_prometheus_headless_service_yaml_file, + get_network_stress_test_deployment_yaml_file, + get_network_stress_test_headless_service_yaml_file, + get_grafana_configmap_yaml_file, + get_grafana_deployment_yaml_file, + get_grafana_service_yaml_file, + get_grafana_headless_service_yaml_file, + # get_namespace_deletion_job_yaml_file, + # get_namespace_deleter_rbac_yaml_file, +) +from args import add_shared_args_to_parser +from cluster_stop import ( + cluster_working_directory, + broadcast_network_stress_test_deployment_file_name, + stop_last_cluster_run, +) + + +def login_to_docker_registry(): + run_cmd("gcloud auth configure-docker us-central1-docker.pkg.dev") + + +def make_image_tag(timestamp: str) -> str: + return f"us-central1-docker.pkg.dev/starkware-dev/sequencer/broadcast-network-stress-test-node:{timestamp}" + + +def build_image(image_tag: str, fast: bool = True): + if fast: + pr("Compiling broadcast_network_stress_test_node node without Docker...") + run_cmd( + f'RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin broadcast_network_stress_test_node', + hint="Make sure you have Rust and Cargo installed.", + ) + # Copy binary to /tmp to avoid .dockerignore issues + pr("Copying binary to /tmp for Docker build...") + run_cmd( + f"cp {project_root()}/target/release/broadcast_network_stress_test_node /tmp/" + ) + dockerfile_path = os.path.abspath("Dockerfile.fast") + run_cmd( + f"docker build -t {image_tag} -f {dockerfile_path} --build-context tmp=/tmp {project_root()}" + ) + else: + dockerfile_path = os.path.abspath("Dockerfile.slow") + run_cmd(f"docker build -t {image_tag} -f {dockerfile_path} {project_root()}") + + +def upload_image_to_registry(image_tag: str): + run_cmd( + f"docker push {image_tag}", + hint="Make sure you are logged in to the Docker registry. If so, contact the dev team to resolve any issues (maybe a permissions issue).", + ) + + +def write_deployment_file(deployment_data: dict): + with open(broadcast_network_stress_test_deployment_file_name, "w") as f: + json.dump(deployment_data, f, indent=4) + + +def write_yaml_file(file_name: str, file_content: str): + with open(os.path.join(cluster_working_directory, file_name), "w") as f: + f.write(file_content) + + +def write_yaml_files( + image_tag: str, + args: argparse.Namespace, + namespace_name: str, + delay_seconds: int, +) -> list[str]: + num_nodes = args.num_nodes + files = { + "broadcast-network-stress-test-deployment.yaml": get_network_stress_test_deployment_yaml_file( + image_tag, args=args + ), + "broadcast-network-stress-test-headless-service.yaml": get_network_stress_test_headless_service_yaml_file(), + "prometheus-config.yaml": get_prometheus_yaml_file(num_nodes), + "prometheus-statefulset.yaml": get_prometheus_deployment_yaml_file(), + "prometheus-service.yaml": get_prometheus_service_yaml_file(), + "prometheus-headless-service.yaml": get_prometheus_headless_service_yaml_file(), + "grafana-config.yaml": get_grafana_configmap_yaml_file(), + "grafana-statefulset.yaml": get_grafana_deployment_yaml_file(), + "grafana-service.yaml": get_grafana_service_yaml_file(), + "grafana-headless-service.yaml": get_grafana_headless_service_yaml_file(), + # "namespace-deleter-rbac.yaml": get_namespace_deleter_rbac_yaml_file( + # namespace_name + # ), + # "namespace-deletion-cronjob.yaml": get_namespace_deletion_job_yaml_file( + # namespace_name, delay_seconds + # ), + } + + for file_name, file_content in files.items(): + write_yaml_file(file_name, file_content) + return list(files.keys()) + + +class ExperimentRunner: + def __enter__(self): + self.timestamp = make_timestamp() + run_cmd(f"mkdir -p {cluster_working_directory}") + self.deployment_file = {"cluster_working_directory": cluster_working_directory} + return self + + def __exit__(self, exc_type, exc_value, traceback): + write_deployment_file(self.deployment_file) + + def create_namespace(self, name_space_name: str): + pr(f"Creating namespace {name_space_name}") + self.deployment_file["namespace"] = name_space_name + write_deployment_file(self.deployment_file) + run_cmd( + f"kubectl create namespace {name_space_name}", + ) + + def deploy_yaml_files(self, name_space_name: str): + for file_name in self.deployment_file["yaml_files"]: + pr(f"Deploying {file_name} to cluster") + file_path = os.path.join(cluster_working_directory, file_name) + + # RBAC resources and namespace deletion cronjob go to default namespace + if file_name in [ + "namespace-deleter-rbac.yaml", + "namespace-deletion-cronjob.yaml", + ]: + raise NotImplementedError( + "RBAC resources and namespace deletion cronjob are not supported anymore" + ) + run_cmd( + f"kubectl apply --wait -f {file_path} -n default", + ) + else: + run_cmd( + f"kubectl apply --wait -f {file_path} -n {name_space_name}", + ) + + def run_experiment(self, args: argparse.Namespace): + pr(str(args)) + self.deployment_file["args"] = vars(args) + image_tag = args.image if args.image else make_image_tag(self.timestamp) + pr(f"timestamp: {self.timestamp}") + self.deployment_file["timestamp"] = self.timestamp + + if args.image: + self.deployment_file["was_image_built"] = False + else: + pr("Building image") + build_image(image_tag, fast=args.fast_docker) + self.deployment_file["commit"] = get_commit() + self.deployment_file["was_image_built"] = True + pr(f"Image tag: {image_tag}") + self.deployment_file["image_tag"] = image_tag + run_cmd( + f"docker image inspect {image_tag} > /dev/null", + hint="Make sure the image exists before proceeding.", + ) + + connect_to_cluster() + login_to_docker_registry() + upload_image_to_registry(image_tag=image_tag) + + namespace_name = f"broadcast-network-stress-test-{self.timestamp}" + delay_seconds = args.timeout + self.deployment_file["delay_seconds"] = delay_seconds + + self.create_namespace(namespace_name) + file_names = write_yaml_files( + image_tag, + args=args, + namespace_name=namespace_name, + delay_seconds=delay_seconds, + ) + self.deployment_file["yaml_files"] = file_names + self.deploy_yaml_files(namespace_name) + + sleep(10) + + run_cmd( + f"kubectl get pods -n {namespace_name}", hint="Check if pods are running" + ) + pr("Prometheus and Grafana deployment complete!") + pr("To access Prometheus, run: python cluster_port_forward_prometheus.py") + pr( + "To access Grafana, run: kubectl port-forward service/grafana-service 3000:3000 (no login required)" + ) + pr( + f"Deployment files saved to: `{broadcast_network_stress_test_deployment_file_name}`" + ) + pr( + "WARNING: Please don't forget to delete the namespace manually after the experiment is complete !!!" + ) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--image", + help="Previously built image tag to use instead of re-building the docker image.", + type=str, + default=None, + ) + parser.add_argument( + "--latency", + help="Min latency to use when gating the network in milliseconds.", + type=int, + default=None, + ) + parser.add_argument( + "--throughput", + help="Max throughput to use when gating the network in KB/s.", + type=int, + default=None, + ) + parser.add_argument( + "--dedicated-node-pool", + help="Whether to run the pods on a dedicated node pool or not", + action="store_true", + default=False, + ) + parser.add_argument( + "--node-pool-name", + help="Name of the dedicated node pool to use (only used if --dedicated-node-pool is set)", + type=str, + default="general-services", + ) + parser.add_argument( + "--node-pool-role", + help="Role selector for the dedicated node pool (only used if --dedicated-node-pool is set)", + type=str, + default="general-services", + ) + parser.add_argument( + "--cpu-requests", + help="CPU requests for each network stress test pod (in Kubernetes format, e.g., '1000m' for 1 core)", + type=str, + default="975m", # running on machine with 3.92 CPU allocatable 3.9 / 4 + ) + parser.add_argument( + "--memory-requests", + help="Memory requests for each network stress test pod (in Kubernetes format, e.g., '1Gi' for 1 GiB)", + type=str, + default="3Gi", + ) + parser.add_argument( + "--cpu-limits", + help="CPU limit for each network stress test pod (in Kubernetes format, e.g., '1000m' for 1 core)", + type=str, + default="975m", # running on machine with 3.92 CPU allocatable 3.9 / 4 + ) + parser.add_argument( + "--memory-limits", + help="Memory limit for each network stress test pod (in Kubernetes format, e.g., '1Gi' for 1 GiB)", + type=str, + default="3Gi", + ) + parser.add_argument( + "--fast-docker", + help="Use fast Docker build instead of slow Docker build", + action="store_true", + default=False, + ) + + add_shared_args_to_parser(parser=parser) + args = parser.parse_args() + + if os.path.exists(broadcast_network_stress_test_deployment_file_name): + x = input( + "Deployment file already exists. Do you want to stop the last run? (y/N): " + ) + if x.lower() == "y": + pr("Stopping last cluster run...") + stop_last_cluster_run() + else: + pr("Exiting without running the experiment.") + return + + assert not os.path.exists( + broadcast_network_stress_test_deployment_file_name + ), "Deployment file already exists. Please run cluster_stop.py before running the experiment." + + with ExperimentRunner() as runner: + runner.run_experiment(args) + + pr("Running cluster_port_forward_grafana.py") + + cluster_port_forward_grafana() + + +if __name__ == "__main__": + main() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_stop.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_stop.py new file mode 100644 index 00000000000..3b7b33d442d --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/cluster_stop.py @@ -0,0 +1,52 @@ +import os +import json +from utils import ( + pr, + run_cmd, + connect_to_cluster, +) + +cluster_working_directory: str = os.path.join( + os.path.expanduser("~"), + "apollo_broadcast_network_stress_test", +) +broadcast_network_stress_test_deployment_file_name: str = os.path.join( + cluster_working_directory, f"broadcast_network_stress_test_deployment_file.json" +) + + +def open_deployment_file() -> dict: + assert os.path.exists( + broadcast_network_stress_test_deployment_file_name + ), "Deployment file does not exist. Have you started a network stress test?" + + with open(broadcast_network_stress_test_deployment_file_name, "r") as f: + deployment_data: dict = json.load(f) + + return deployment_data + + +def stop_last_cluster_run(): + deployment_data = open_deployment_file() + name_space_name = deployment_data.get("namespace") + if name_space_name != None: + connect_to_cluster() + # remove and re-create the namespace to ensure a clean state + # from + run_cmd(f"kubectl delete namespace {name_space_name}", may_fail=True) + run_cmd( + f"kubectl create namespace {name_space_name}", + ) + run_cmd( + f"kubectl delete namespace {name_space_name}", + ) + + assert broadcast_network_stress_test_deployment_file_name.startswith( + f"{cluster_working_directory}/" + ) + run_cmd(f"rm -rf {cluster_working_directory}") + pr("Network stress test stopped successfully.") + + +if __name__ == "__main__": + stop_last_cluster_run() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/entrypoint.sh b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/entrypoint.sh new file mode 100644 index 00000000000..6df5083c32a --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/entrypoint.sh @@ -0,0 +1,94 @@ +#!/bin/bash + +set -e + +echo "Starting container with hostname: $(hostname)" + +# For Indexed Jobs: Set stable hostname based on JOB_COMPLETION_INDEX +# This enables stable DNS names like: broadcast-network-stress-test-0.broadcast-network-stress-test-headless +if [ ! -z "$JOB_COMPLETION_INDEX" ]; then + NEW_HOSTNAME="broadcast-network-stress-test-${JOB_COMPLETION_INDEX}" + hostname "$NEW_HOSTNAME" || echo "Warning: Could not set hostname to $NEW_HOSTNAME" + echo "Set hostname to: $(hostname) (based on JOB_COMPLETION_INDEX=$JOB_COMPLETION_INDEX)" +fi + +# ********************************* machine information ********************************* + +echo "Machine identification:" +echo " Container ID: $(cat /proc/self/cgroup 2>/dev/null | head -1 | cut -d/ -f3 | cut -c1-12 || echo 'N/A')" +echo " Host IP addresses:" +ip addr show | grep -E 'inet [0-9]' | awk '{print " " $2}' || echo " IP info unavailable" +echo " Machine ID: $(cat /etc/machine-id 2>/dev/null || echo 'N/A')" +echo " Kernel: $(uname -r)" +echo " Architecture: $(uname -m)" +if [ -n "$NODE_NAME" ]; then + echo " Kubernetes Node: $NODE_NAME" +fi +if [ -n "$KUBERNETES_NODE_NAME" ]; then + echo " K8s Node Name: $KUBERNETES_NODE_NAME" +fi + +# ***************************** throttling connection start ***************************** + +set -e + +INTERFACE="eth0" # Default Docker interface + +# Load ifb module for ingress shaping +modprobe ifb || echo "ifb module already loaded or not needed" + +# Set up ifb0 for ingress +ip link add ifb0 type ifb || true +ip link set ifb0 up || true + +# Redirect all ingress traffic to ifb0 +tc qdisc add dev $INTERFACE ingress handle ffff: || true +tc filter add dev $INTERFACE parent ffff: protocol ip u32 match u32 0 0 action mirred egress redirect dev ifb0 || true + +# Function to apply shaping (htb for bandwidth + netem for latency) +apply_shaping() { + local dev=$1 + local parent=$2 + local handle=$3 # e.g., 1 (no trailing :) + + # If throughput is set, calculate rate in kbit/s (assuming THROUGHPUT in KB/s) + if [ ! -z "${THROUGHPUT}" ]; then + RATE=$((THROUGHPUT * 8)) + tc qdisc add dev $dev $parent handle ${handle}: htb default 1 || true + tc class add dev $dev parent ${handle}: classid ${handle}:1 htb rate ${RATE}kbit ceil ${RATE}kbit || true + netem_parent="${handle}:1" + else + netem_parent="root" + fi + + # If latency is set, add netem (delay in ms) + if [ ! -z "${LATENCY}" ]; then + tc qdisc add dev $dev parent $netem_parent netem delay ${LATENCY}ms || true + fi +} + +# Apply to egress (eth0) +# apply_shaping $INTERFACE "root" "1" + +# Apply to ingress (ifb0) +apply_shaping ifb0 "root" "1" + +# ***************************** throttling connection end ***************************** + +# Call broadcast_network_stress_test_node +# Use ID from environment variable if set, otherwise use JOB_COMPLETION_INDEX (for Indexed Jobs) +if [ -z "$ID" ]; then + if [ ! -z "$JOB_COMPLETION_INDEX" ]; then + # Used when running in Indexed Job + export ID=$JOB_COMPLETION_INDEX + echo "ID not set in environment, using JOB_COMPLETION_INDEX: $ID" + else + # Fallback: extract from hostname (legacy StatefulSet support) + export ID=$(hostname | grep -o '[0-9]*$') + echo "ID not set in environment, extracted from hostname: $ID" + fi +else + # Used when running locally + echo "Using ID from environment variable: $ID" +fi +exec broadcast_network_stress_test_node \ No newline at end of file diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/grafana_config.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/grafana_config.py new file mode 100644 index 00000000000..8db9461a75b --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/grafana_config.py @@ -0,0 +1,566 @@ +""" +Grafana dashboard configuration for the broadcast network stress test. +Data-driven approach using SECTIONS and ALERTS variables for maintainable configuration. +""" + +# Define sections with their queries and explicit units - much easier to maintain! +SECTIONS = { + "📊 Key Stats": [ + ("network_connected_peers", "short"), + ( + "rate(receive_message_delay_seconds_sum[1m]) / rate(receive_message_delay_seconds_count[1m])", + "s", + ), + ("system_process_cpu_usage_percent", "percent"), + ("system_process_memory_usage_bytes", "bytes"), + ("broadcast_message_theoretical_throughput", "binBps"), + ("rate(receive_message_bytes_sum[20s])", "binBps"), + ("receive_message_pending_count", "short"), + ("receive_message_negative_delay_seconds_count", "short"), + ], + "🔍 Performance Comparison": [ + ("broadcast_message_theoretical_throughput", "binBps"), + ("rate(receive_message_bytes_sum[20s])", "binBps"), + ("broadcast_message_theoretical_heartbeat_millis", "ms"), + ("rate(broadcast_message_count[20s])", "ops"), + ("rate(receive_message_count[20s])", "ops"), + ], + "📈 Latency Metrics": [ + ('receive_message_delay_seconds{quantile="0.5"}', "s"), + ('receive_message_delay_seconds{quantile="0.95"}', "s"), + ('receive_message_delay_seconds{quantile="0.99"}', "s"), + ('receive_message_delay_seconds{quantile="0.999"}', "s"), + # average latency by quantile in one panel: + ("avg(receive_message_delay_seconds) by (quantile)", "s"), + ("receive_message_negative_delay_seconds_count", "short"), + ], + "📤 Broadcast Metrics": [ + ("broadcast_message_theoretical_heartbeat_millis", "ms"), + ("broadcast_message_theoretical_throughput", "binBps"), + ("broadcast_message_bytes", "bytes"), + ("rate(broadcast_message_count[1m])", "ops"), + ("rate(broadcast_message_bytes_sum[1m])", "binBps"), + ( + "histogram_quantile(0.95, rate(broadcast_message_send_delay_seconds_bucket[1m]))", + "s", + ), + ], + "📥 Receive Metrics": [ + ("receive_message_bytes", "bytes"), + ("receive_message_pending_count", "short"), + ("rate(receive_message_count[1m])", "ops"), + ("rate(receive_message_bytes_sum[1m])", "binBps"), + ("rate(receive_message_delay_seconds_count[1m])", "ops"), + ("rate(receive_message_negative_delay_seconds_count[1m])", "ops"), + ( + "histogram_quantile(0.95, rate(receive_message_delay_seconds_bucket[1m]))", + "s", + ), + ( + "histogram_quantile(0.95, rate(receive_message_negative_delay_seconds_bucket[1m]))", + "s", + ), + ], + "🌐 Network Metrics": [ + ("network_connected_peers", "short"), + ("network_blacklisted_peers", "short"), + ("network_active_inbound_sessions", "short"), + ("network_active_outbound_sessions", "short"), + ("rate(network_stress_test_sent_messages[1m])", "ops"), + ("rate(network_stress_test_received_messages[1m])", "ops"), + ("network_reset_total", "short"), + ("rate(network_dropped_broadcast_messages[1m])", "ops"), + ("rate(network_event_counter[1m])", "ops"), + ], + "💻 System Metrics": [ + ("system_process_cpu_usage_percent", "percent"), + ("system_process_memory_usage_bytes", "bytes"), + ("system_process_virtual_memory_usage_bytes", "bytes"), + ("system_total_memory_bytes", "bytes"), + ("system_available_memory_bytes", "bytes"), + ("system_used_memory_bytes", "bytes"), + ("system_cpu_count", "short"), + ("rate(system_network_bytes_sent_total[1m])", "binBps"), + ("rate(system_network_bytes_received_total[1m])", "binBps"), + ("system_network_bytes_sent_current", "binBps"), + ("system_network_bytes_received_current", "binBps"), + ], + "🔔 Network Events": [ + ('network_event_counter{event_type="connections_established"}', "short"), + ('network_event_counter{event_type="connections_closed"}', "short"), + ('network_event_counter{event_type="dial_failure"}', "short"), + ('network_event_counter{event_type="listen_failure"}', "short"), + ('network_event_counter{event_type="listen_error"}', "short"), + ('network_event_counter{event_type="address_change"}', "short"), + ('network_event_counter{event_type="new_listeners"}', "short"), + ('network_event_counter{event_type="new_listen_addrs"}', "short"), + ('network_event_counter{event_type="expired_listen_addrs"}', "short"), + ('network_event_counter{event_type="listener_closed"}', "short"), + ('network_event_counter{event_type="new_external_addr_candidate"}', "short"), + ('network_event_counter{event_type="external_addr_confirmed"}', "short"), + ('network_event_counter{event_type="external_addr_expired"}', "short"), + ('network_event_counter{event_type="new_external_addr_of_peer"}', "short"), + ('network_event_counter{event_type="inbound_connections_handled"}', "short"), + ('network_event_counter{event_type="outbound_connections_handled"}', "short"), + ('network_event_counter{event_type="connection_handler_events"}', "short"), + ], +} + +# Define alerts - easy to add new ones! +ALERTS = [ + "min(network_connected_peers) < 1", + "rate(receive_message_negative_delay_seconds_count[1m]) > 0.001", + "system_process_cpu_usage_percent > 90", + "system_process_memory_usage_bytes > 8000000000", # 8GB + "system_process_virtual_memory_usage_bytes > 16000000000", # 16GB virtual memory + "rate(network_dropped_broadcast_messages[1m]) > 10", # High message drop rate + # "network_reset_total > 5", # Too many network resets +] + + +def get_grafana_dashboard_json() -> str: + """Generate Grafana dashboard using data-driven approach with SECTIONS and ALERTS. + + This approach is much more maintainable - just add queries to SECTIONS or ALERTS + instead of writing hundreds of lines of JSON configuration. + """ + panels = [] + panel_id = 100 + y_pos = 0 + + # Generate key stats as stat panels + key_stats_queries = SECTIONS["📊 Key Stats"] + panels.append( + { + "id": panel_id, + "title": "📊 Key Stats Overview", + "type": "row", + "gridPos": {"h": 1, "w": 24, "x": 0, "y": y_pos}, + "collapsed": False, + } + ) + panel_id += 1 + y_pos += 1 + + # Create stat panels for key metrics (2 per row) + for i, (query, unit) in enumerate(key_stats_queries): + x_pos = (i % 4) * 6 + if i > 0 and i % 4 == 0: + y_pos += 6 + + panel_title = _get_panel_title_from_query(query) + + panels.append( + { + "id": panel_id, + "title": panel_title, + "type": "stat", + "targets": [{"expr": query, "refId": "A"}], + "fieldConfig": { + "defaults": { + "unit": unit, + "thresholds": _get_thresholds_from_query(query), + } + }, + "options": { + "reduceOptions": { + "values": False, + "calcs": ["lastNotNull"], + }, + "orientation": "auto", + "textMode": "auto", + "colorMode": "value", + "graphMode": "area", + }, + "gridPos": {"h": 6, "w": 6, "x": x_pos, "y": y_pos}, + } + ) + panel_id += 1 + + y_pos += 7 + + # Generate sections with timeseries panels + for section_name, queries in SECTIONS.items(): + if section_name == "📊 Key Stats": + continue # Already handled above + + # Create panels for this section + section_panels = [] + section_y_pos = 0 + + # Add panels for each query in the section + for i, (query, unit) in enumerate(queries): + panel_title = _get_panel_title_from_query(query) + + # Smart layout: 2 panels per row for most sections, 3 for larger sections + panels_per_row = 3 if len(queries) > 4 else 2 + width = 24 // panels_per_row + x_pos = (i % panels_per_row) * width + if i > 0 and i % panels_per_row == 0: + section_y_pos += 8 + + section_panels.append( + { + "id": panel_id, + "title": panel_title, + "type": "timeseries", + "targets": [{"expr": query, "refId": "A"}], + "fieldConfig": {"defaults": {"unit": unit}}, + "options": { + "tooltip": {"mode": "single", "sort": "none"}, + "legend": { + "showLegend": True, + "displayMode": "list", + "placement": "bottom", + }, + }, + "gridPos": {"h": 8, "w": width, "x": x_pos, "y": section_y_pos}, + } + ) + panel_id += 1 + + # Add section row with nested panels + panels.append( + { + "id": panel_id, + "title": section_name, + "type": "row", + "gridPos": {"h": 1, "w": 24, "x": 0, "y": y_pos}, + "collapsed": True, + "panels": section_panels, + } + ) + panel_id += 1 + y_pos += 1 + + # Generate dashboard JSON + dashboard = { + "id": 1, + "uid": "broadcast-network-stress-test", + "title": "Broadcast Network Stress Test - Data-Driven Dashboard", + "tags": ["network", "stress-test", "apollo", "data-driven"], + "timezone": "browser", + "panels": panels, + "time": {"from": "now-15m", "to": "now"}, + "refresh": "5s", + } + + import json + + return json.dumps(dashboard, indent=2) + + +def _get_panel_title_from_query(query: str) -> str: + """Extract panel title directly from the query - simple and informative.""" + # Use the query itself, truncated if too long + return query[:60] + "..." if len(query) > 60 else query + + +def _get_thresholds_from_query(query: str) -> dict: + """Get appropriate thresholds for a query.""" + if "cpu" in query: + return { + "steps": [ + {"color": "green", "value": None}, + {"color": "yellow", "value": 70}, + {"color": "red", "value": 90}, + ] + } + elif "connected_peers" in query: + return { + "steps": [{"color": "red", "value": None}, {"color": "green", "value": 1}] + } + elif "negative_delay" in query: + return { + "steps": [ + {"color": "green", "value": None}, + {"color": "red", "value": 0.001}, + ] + } + elif "delay" in query: + return { + "steps": [ + {"color": "green", "value": None}, + {"color": "yellow", "value": 0.1}, + {"color": "red", "value": 1.0}, + ] + } + + # Default thresholds + return {"steps": [{"color": "green", "value": None}]} + + +def get_grafana_alerts_json() -> str: + """Generate Grafana alerting rules from ALERTS list.""" + rules = [] + + for i, alert_query in enumerate(ALERTS): + rule_id = f"stress_test_alert_{i}" + alert_title = _get_alert_title_from_query(alert_query) + + rules.append( + { + "uid": rule_id, + "title": alert_title, + "condition": "B", + "data": [ + { + "refId": "A", + "queryType": "", + "relativeTimeRange": {"from": 300, "to": 0}, + "model": { + "expr": alert_query.split(" ")[0], # Extract base query + "interval": "", + "refId": "A", + }, + }, + { + "refId": "B", + "queryType": "", + "relativeTimeRange": {"from": 0, "to": 0}, + "model": { + "conditions": [ + { + "evaluator": { + "params": [ + _get_alert_threshold_from_query(alert_query) + ], + "type": _get_alert_operator_from_query( + alert_query + ), + }, + "operator": {"type": "and"}, + "query": {"params": ["A"]}, + "reducer": {"params": [], "type": "last"}, + "type": "query", + } + ], + "refId": "B", + }, + }, + ], + "noDataState": "NoData", + "execErrState": "Alerting", + "for": "30s", + "annotations": { + "description": f"Alert triggered: {alert_title}", + "summary": alert_title, + }, + "labels": {"severity": "critical", "team": "network"}, + } + ) + + alert_config = { + "groups": [ + { + "name": "stress_test_alerts", + "orgId": 1, + "folder": "alerts", + "rules": rules, + } + ] + } + + import json + + return json.dumps(alert_config, indent=2) + + +def _get_alert_title_from_query(alert_query: str) -> str: + """Generate alert title from query.""" + if "connected_peers" in alert_query: + return "Network Connectivity Alert" + elif "negative_delay" in alert_query: + return "Negative Message Delay Alert" + elif "cpu" in alert_query: + return "High CPU Usage Alert" + elif "memory" in alert_query: + return "High Memory Usage Alert" + return f"Alert: {alert_query}" + + +def _get_alert_threshold_from_query(alert_query: str) -> float: + """Extract threshold value from alert query.""" + parts = alert_query.split() + for part in parts: + try: + return float(part) + except ValueError: + continue + return 1.0 + + +def _get_alert_operator_from_query(alert_query: str) -> str: + """Extract comparison operator from alert query.""" + if " < " in alert_query: + return "lt" + elif " > " in alert_query: + return "gt" + elif " <= " in alert_query: + return "le" + elif " >= " in alert_query: + return "ge" + return "gt" + + +def get_grafana_datasource_config() -> str: + """Get Grafana datasource configuration for Prometheus.""" + return """apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://localhost:9090 + isDefault: true + editable: true + basicAuth: false + withCredentials: false + jsonData: + httpMethod: GET + timeInterval: "5s" +""" + + +def get_grafana_datasource_config_cluster() -> str: + """Get Grafana datasource configuration for cluster deployment.""" + return """apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus-service:9090 + isDefault: true + editable: true + basicAuth: false + withCredentials: false + jsonData: + httpMethod: GET + timeInterval: "5s" +""" + + +def get_grafana_dashboard_provisioning_config() -> str: + """Get Grafana dashboard provisioning configuration.""" + return """apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /etc/grafana/provisioning/dashboards + folderUid: "" + folderId: null +""" + + +def get_grafana_redirect_html() -> str: + """Get HTML page that redirects to the dashboard.""" + return """ + + + Network Stress Test Dashboard + + + + +

🚀 Network Stress Test Dashboard

+
Loading dashboard...
+

If you're not redirected automatically, + click here +

+ +""" + + +def get_grafana_preferences_json() -> str: + """Get Grafana preferences to set the default home dashboard.""" + return """{ + "homeDashboardUID": "broadcast-network-stress-test", + "theme": "dark", + "timezone": "browser" +}""" + + +def get_grafana_config() -> str: + """Get Grafana configuration settings.""" + return """[analytics] +reporting_enabled = false +check_for_updates = false + +[security] +admin_user = admin +admin_password = admin +allow_sign_up = false +disable_gravatar = true + +[auth.anonymous] +enabled = true +org_name = Main Org. +org_role = Admin +hide_version = true + +[dashboards] +default_home_dashboard_path = "" + +[database] +type = sqlite3 +path = grafana.db + +[session] +provider = memory + +[users] +default_theme = dark +allow_sign_up = false +allow_org_create = false +auto_assign_org = true +auto_assign_org_role = Admin + +[unified_alerting] +enabled = false + +[alerting] +enabled = false + +[auth] +disable_login_form = true +disable_signout_menu = true + +[server] +serve_from_sub_path = false +http_port = 3000 +protocol = http + +[log] +mode = console +level = info + +[security] +disable_initial_admin_creation = true +cookie_secure = false +cookie_samesite = lax +""" diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/local.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/local.py new file mode 100644 index 00000000000..1e0c97bc75f --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/local.py @@ -0,0 +1,609 @@ +import argparse +from multiprocessing.dummy import Process +import os +import subprocess +from time import sleep +import time +import docker +from utils import ( + make_multi_address, + run_cmd, + pr, + get_peer_id_from_node_id, + project_root, + make_timestamp, +) +from yaml_maker import get_prometheus_config +from args import add_shared_args_to_parser, get_arguments, get_env_vars +from cluster_start import make_image_tag, build_image + + +def check_docker(): + pr("Checking if Docker works...") + run_cmd( + "docker run --name hello-world hello-world", + hint="Make sure you have Docker installed and running.", + ) + run_cmd("docker rm hello-world") + pr("Docker is working correctly.") + + +class ExperimentRunner: + + def __init__( + self, + ): + """ + Prometheus logging does not work without docker because the container cannot make make + requests to scrap the processes that are running... + """ + self.client = docker.from_env() + self.prometheus_url = "http://localhost:9090" + self.grafana_url = "http://localhost:3000" + self.prometheus_self_scrape = False # If true, Prometheus will scrape itself + self.docker_containers = [] + self.running_processes: list[subprocess.Popen] = [] + self.metric_ports = [] + self.docker_image_tag = None + self.bootstrap_multi_address = "" + self.tmp_dir_name = "experiment_runner" + self.timestamp = make_timestamp() + self.tmp_dir = f"/tmp/broadcast-network-stress-test-{self.timestamp}" + self.metric_port_base = 2000 + self.p2p_port_base = 10000 + self.custom_network = None + self.network_name = f"stress-test-net-{self.timestamp}" + self.container_ips = {} # Map container index to IP address + + def __enter__(self): + print("Starting ExperimentRunner...") + os.makedirs(self.tmp_dir) + pr(f"Using temporary directory: {self.tmp_dir}") + return self + + def create_docker_network(self): + """Create a custom bridge network for the stress test containers.""" + pr(f"Creating custom Docker network: {self.network_name}") + try: + self.custom_network = self.client.networks.create( + name=self.network_name, + driver="bridge", + check_duplicate=True, + ) + pr(f"Created network: {self.network_name}") + except docker.errors.APIError as e: + if "already exists" in str(e): + pr(f"Network {self.network_name} already exists, using it") + self.custom_network = self.client.networks.get(self.network_name) + else: + raise + + def cleanup_docker_network(self): + """Remove the custom Docker network.""" + if self.custom_network: + try: + pr(f"Removing custom network: {self.network_name}") + self.custom_network.remove() + pr(f"Network {self.network_name} removed") + except Exception as e: + pr(f"Warning: Failed to remove network {self.network_name}: {e}") + + def __exit__(self, exc_type, exc_value, traceback): + print("Stopping ExperimentRunner...") + for cont in self.docker_containers: + try: + pr(f"Force stopping and removing container {cont}...") + # Kill the container immediately (don't wait for graceful shutdown) + cont.kill() + except Exception as e: + pr(f"Warning: Failed to kill container {cont}: {e}") + + try: + # Force remove the container (even if it's still running) + cont.remove(force=True) + pr(f"Successfully removed container {cont}") + except Exception as e: + pr(f"Warning: Failed to remove container {cont}: {e}") + # Try one more time with the Docker CLI as a fallback + try: + run_cmd(f"docker rm -f {cont.id}") + pr(f"Removed container {cont} using docker CLI") + except Exception as e2: + pr(f"Final warning: Could not remove container {cont}: {e2}") + + self.docker_containers.clear() + + pr("Stopping broadcast_network_stress_test_node nodes...") + for p in self.running_processes: + pr(f"Stopping process {p}...") + p.kill() + self.running_processes.clear() + + # Clean up Docker network + self.cleanup_docker_network() + + def write_prometheus_config(self, use_custom_network: bool = False) -> str: + if use_custom_network: + # Use container names for custom network + metric_urls = [ + f"broadcast-network-stress-test-node-{i}:{port}" + for i, port in self.metric_ports + ] + else: + # Use localhost for host network or local processes + metric_urls = [f"localhost:{port}" for _, port in self.metric_ports] + + config = get_prometheus_config( + self_scrape=self.prometheus_self_scrape, + metric_urls=metric_urls, + ) + prometheus_config_path = os.path.join(self.tmp_dir, "prometheus.yml") + pr(f"Writing Prometheus configuration to {prometheus_config_path}...") + with open(prometheus_config_path, "w") as f: + f.write(config) + return prometheus_config_path + + def generate_grafana_dashboard(self) -> str: + """Generate Grafana dashboard JSON using static configuration.""" + pr("Generating Grafana dashboard configuration...") + from grafana_config import get_grafana_dashboard_json + + dashboard_json = get_grafana_dashboard_json() + dashboard_path = os.path.join(self.tmp_dir, "dashboard.json") + with open(dashboard_path, "w") as f: + f.write(dashboard_json) + + pr(f"Dashboard configuration saved to {dashboard_path}") + return dashboard_path + + def write_grafana_datasource_config(self, use_custom_network: bool = False) -> str: + """Write Grafana datasource configuration.""" + from grafana_config import ( + get_grafana_datasource_config, + get_grafana_datasource_config_cluster, + ) + + if use_custom_network: + # Use container name on custom network + datasource_config = """apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus_network_stress_test:9090 + isDefault: true + editable: true + basicAuth: false + withCredentials: false + jsonData: + httpMethod: GET + timeInterval: "5s" +""" + else: + # Use localhost on host network + datasource_config = get_grafana_datasource_config() + + datasource_path = os.path.join(self.tmp_dir, "datasource.yml") + with open(datasource_path, "w") as f: + f.write(datasource_config) + return datasource_path + + def write_grafana_dashboard_config(self, dashboard_path: str) -> str: + """Write Grafana dashboard provisioning configuration.""" + from grafana_config import get_grafana_dashboard_provisioning_config + + dashboard_config = get_grafana_dashboard_provisioning_config() + config_path = os.path.join(self.tmp_dir, "dashboard_config.yml") + with open(config_path, "w") as f: + f.write(dashboard_config) + return config_path + + def run_grafana(self, use_custom_network: bool = False): + """Run Grafana container with provisioned dashboard.""" + pr("Running Grafana...") + + # Generate dashboard and config files + dashboard_path = self.generate_grafana_dashboard() + datasource_path = self.write_grafana_datasource_config( + use_custom_network=use_custom_network + ) + dashboard_config_path = self.write_grafana_dashboard_config(dashboard_path) + + # Remove existing Grafana container + run_cmd("docker rm -f grafana_network_stress_test", may_fail=True) + + # Generate Grafana configuration + from grafana_config import get_grafana_config, get_grafana_preferences_json + + grafana_config = get_grafana_config() + grafana_config_path = os.path.join(self.tmp_dir, "grafana.ini") + with open(grafana_config_path, "w") as f: + f.write(grafana_config) + + # Generate Grafana preferences + preferences_json = get_grafana_preferences_json() + preferences_path = os.path.join(self.tmp_dir, "preferences.json") + with open(preferences_path, "w") as f: + f.write(preferences_json) + + # Determine network configuration + if use_custom_network: + network_config = self.network_name + ports = {"3000/tcp": 3000} # Expose Grafana port to host + else: + network_config = "host" + ports = None + + # Create Grafana container with provisioned dashboard and datasource + cont = self.client.containers.run( + image="grafana/grafana:latest", + detach=True, + name="grafana_network_stress_test", + network=network_config, + ports=ports, + environment={ + "GF_PATHS_CONFIG": "/etc/grafana/grafana.ini", + }, + volumes={ + grafana_config_path: { + "bind": "/etc/grafana/grafana.ini", + "mode": "ro", + }, + datasource_path: { + "bind": "/etc/grafana/provisioning/datasources/datasource.yml", + "mode": "ro", + }, + dashboard_config_path: { + "bind": "/etc/grafana/provisioning/dashboards/dashboard_config.yml", + "mode": "ro", + }, + dashboard_path: { + "bind": "/etc/grafana/provisioning/dashboards/dashboard.json", + "mode": "ro", + }, + }, + extra_hosts=( + {"host.docker.internal": "host-gateway"} + if not use_custom_network + else None + ), + ) + self.docker_containers.append(cont) + pr(f"Grafana available at {self.grafana_url} (no login required)") + pr( + f"Direct dashboard link: {self.grafana_url}/d/broadcast-network-stress-test/broadcast-network-stress-test" + ) + + def run_prometheus(self, use_custom_network: bool = False): + pr("Running Prometheus...") + prometheus_config_path = self.write_prometheus_config( + use_custom_network=use_custom_network + ) + run_cmd("docker rm -f prometheus_network_stress_test", may_fail=True) + + # Determine network configuration + if use_custom_network: + network_config = self.network_name + ports = {"9090/tcp": 9090} # Expose Prometheus port to host + else: + network_config = "host" + ports = None + + cont = self.client.containers.run( + image="prom/prometheus", + detach=True, + name="prometheus_network_stress_test", + network=network_config, + ports=ports, + volumes={ + prometheus_config_path: { + "bind": "/etc/prometheus/prometheus.yml", + "mode": "ro", + } + }, + extra_hosts=( + {"host.docker.internal": "host-gateway"} + if not use_custom_network + else None + ), + ) + self.docker_containers.append(cont) + + def compile_network_stress_test_node(self, args: argparse.Namespace): + if args.docker: + pr("Building Docker image for broadcast_network_stress_test_node...") + # Build or use existing image + self.docker_image_tag = ( + args.image if args.image else make_image_tag(self.timestamp) + ) + + if not args.image: + build_image(self.docker_image_tag) + else: + # Check if provided image exists + try: + self.client.images.get(self.docker_image_tag) + pr(f"Using existing image: {self.docker_image_tag}") + except docker.errors.ImageNotFound: + raise RuntimeError( + f"Specified image '{self.docker_image_tag}' not found. " + f"Please build the image first or run without --image to build automatically." + ) + else: + pr("Compiling broadcast_network_stress_test_node node without Docker...") + run_cmd( + f'RUSTFLAGS="--cfg tokio_unstable" cargo build --release --bin broadcast_network_stress_test_node', + hint="Make sure you have Rust and Cargo installed.", + ) + + def run_network_stress_test_node(self, i: int, args: argparse.Namespace): + pr(f"Running node {i}...") + assert i >= 0 + metric_port = self.metric_port_base + i + p2p_port = self.p2p_port_base + i + exe: str = os.path.abspath( + f"{project_root()}/target/release/broadcast_network_stress_test_node" + ) + + if args.profile: + perf_data_file = str( + os.path.join( + self.tmp_dir, f"broadcast_network_stress_test_node{i}.perf.data" + ) + ) + if args.profile_mode == "cpu": + arguments = ["perf", "record", "-o", perf_data_file, exe] + elif args.profile_mode == "mem": + arguments = [ + "perf", + "mem", + "record", + "-o", + perf_data_file, + exe, + ] + elif args.profile_mode == "dhat": + arguments = [ + "valgrind", + "--tool=dhat", + f"--dhat-out-file={perf_data_file}.dhat.out", + exe, + ] + else: + raise Exception(f"Unrecognized profile mode {args.profile_mode}") + else: + arguments = [exe] + + # Generate bootstrap peers for all other nodes using list comprehension + bootstrap_nodes = [ + make_multi_address( + network_address="/ip4/127.0.0.1", + port=self.p2p_port_base + j, + peer_id=get_peer_id_from_node_id(j), + args=args, + ) + for j in range(args.num_nodes) + ] + + arguments_tuples = get_arguments( + id=i, + metric_port=metric_port, + p2p_port=p2p_port, + bootstrap_nodes=bootstrap_nodes, + args=args, + ) + arguments += [s for pair in arguments_tuples for s in pair] + pr(f"Running {' '.join(arguments)}") + # write stdout and stderr to files + # stdout_file = os.path.join(self.tmp_dir, f"node_{i}_stdout.log") + # stderr_file = os.path.join(self.tmp_dir, f"node_{i}_stderr.log") + # with open(stdout_file, "w") as stdout, open(stderr_file, "w") as stderr: + p = subprocess.Popen(args=arguments) + self.running_processes.append(p) + self.metric_ports.append((i, metric_port)) + + def run_network_stress_test_node_container( + self, i: int, args: argparse.Namespace, use_custom_network: bool = False + ): + pr(f"Running node {i} in Docker container...") + assert i >= 0 + metric_port = self.metric_port_base + i + p2p_port = self.p2p_port_base + i + + container_name = f"broadcast-network-stress-test-node-{i}" + + # Determine network address based on whether we're using custom network + if use_custom_network: + # Use container DNS names on custom network - resolve to container IPs + bootstrap_nodes = [ + make_multi_address( + network_address=f"/dns4/broadcast-network-stress-test-node-{j}", + port=self.p2p_port_base + j, + peer_id=get_peer_id_from_node_id(j), + args=args, + ) + for j in range(args.num_nodes) + ] + else: + # Use localhost on host network + bootstrap_nodes = [ + make_multi_address( + network_address="/ip4/127.0.0.1", + port=self.p2p_port_base + j, + peer_id=get_peer_id_from_node_id(j), + args=args, + ) + for j in range(args.num_nodes) + ] + + # Get command arguments + env_vars = get_env_vars( + id=i, + metric_port=metric_port, + p2p_port=p2p_port, + bootstrap_nodes=bootstrap_nodes, + args=args, + ) + + # Build environment dict from env_vars + env_dict = {x["name"]: x["value"] for x in env_vars} + + # Add network throttling parameters if specified + if args.latency is not None: + env_dict["LATENCY"] = str(args.latency) + if args.throughput is not None: + env_dict["THROUGHPUT"] = str(args.throughput) + + pr(f"Starting container {container_name}") + pr(f"Environment variables: {env_dict}") + + # Remove existing container if it exists + run_cmd(f"docker rm -f {container_name}", may_fail=True) + + # Determine network configuration + if use_custom_network: + network_config = self.network_name + # Don't expose ports - containers communicate internally + ports = None + else: + network_config = "host" + ports = None + + # Run the container with network capabilities for traffic control + cont = self.client.containers.run( + image=self.docker_image_tag, + detach=True, + name=container_name, + network=network_config, + ports=ports, + cap_add=["NET_ADMIN"], # Required for tc (traffic control) commands + environment=env_dict, + remove=False, + ) + + self.docker_containers.append(cont) + self.metric_ports.append((i, metric_port)) + + def run_network_stress_test_nodes(self, args: argparse.Namespace): + # Determine if we need to use custom network for throttling + use_custom_network = args.docker and ( + args.latency is not None or args.throughput is not None + ) + + if use_custom_network: + # Create custom network for traffic control + self.create_docker_network() + + if args.docker: + pr( + "Running broadcast_network_stress_test_node nodes in Docker containers..." + ) + for i in range(args.num_nodes): + self.run_network_stress_test_node_container( + i, args, use_custom_network=use_custom_network + ) + else: + pr("Running broadcast_network_stress_test_node nodes without Docker...") + for i in range(args.num_nodes): + self.run_network_stress_test_node(i, args=args) + + return use_custom_network + + def check_still_running(self): + pr("Checking if broadcast_network_stress_test_node nodes are still running...") + for p in self.running_processes: + if p.poll() is not None: + raise Exception(f"Process {p} has stopped.") + for cont in self.docker_containers: + cont.reload() + if cont.status != "running": + run_cmd(f"docker logs {cont.name}", may_fail=True) + raise Exception(f"Container {cont.name} has stopped.") + + def run_experiment(self, args: argparse.Namespace): + check_docker() + self.compile_network_stress_test_node(args) + use_custom_network = self.run_network_stress_test_nodes(args=args) + self.run_prometheus(use_custom_network=use_custom_network) + self.run_grafana(use_custom_network=use_custom_network) + + deployment_mode = "Docker containers" if args.docker else "local processes" + if use_custom_network: + deployment_mode += " (custom network with traffic control)" + + pr(f"Running broadcast_network_stress_test_nodes in {deployment_mode}...") + pr(f"Visit {self.prometheus_url} to see the metrics.") + pr( + f"Visit {self.grafana_url} to see the Grafana dashboard (no login required)." + ) + pr( + f"Direct dashboard URL: {self.grafana_url}/d/broadcast-network-stress-test/broadcast-network-stress-test" + ) + while True: + self.check_still_running() + sleep(10) + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--profile", + help="Whether to run perf profiling on each node (files will show up in the tmp directory)", + action="store_true", + default=False, + ) + parser.add_argument( + "--profile-mode", + help="The mode to run perf in. Options are 'cpu' and 'mem'.", + choices=["cpu", "mem", "dhat"], + default="cpu", + ) + parser.add_argument( + "--docker", + help="Run nodes in Docker containers instead of local processes", + action="store_true", + default=False, + ) + parser.add_argument( + "--image", + help="Previously built image tag to use instead of re-building the docker image (only used with --docker)", + type=str, + default=None, + ) + parser.add_argument( + "--latency", + help="Min latency to use when gating the network in milliseconds (only used with --docker)", + type=int, + default=None, + ) + parser.add_argument( + "--throughput", + help="Max throughput to use when gating the network in KB/s (only used with --docker)", + type=int, + default=None, + ) + add_shared_args_to_parser(parser=parser) + args = parser.parse_args() + print(args) + + pr("Starting network stress test experiment...") + deployment_mode = ( + "Docker containers with network controls" if args.docker else "local processes" + ) + pr(f"This will run {args.num_nodes} nodes using {deployment_mode}.") + + if args.docker and (args.latency or args.throughput): + controls = [] + if args.latency: + controls.append(f"latency: {args.latency}ms") + if args.throughput: + controls.append(f"throughput: {args.throughput}KB/s") + pr(f"Network controls: {', '.join(controls)}") + + with ExperimentRunner() as runner: + runner.run_experiment(args=args) + + +if __name__ == "__main__": + main() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/test.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/test.py new file mode 100755 index 00000000000..b9d28778c3c --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/test.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python3 +""" +Test script for network protocols: Gossipsub, SQMR, and ReveresedSqmr. + +This script: +1. Runs the broadcast network stress test with different protocols +2. Waits 60 seconds for messages to be exchanged +3. Verifies network connections are established via network_connected_peers metric +4. Analyzes sent vs received message metrics to identify broadcaster vs receiver issues +5. Checks for successful message reception and provides detailed failure diagnostics +6. Cleans up the running process + +Usage: + cd /home/andrew/workspace/sequencer/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run + + # Test all protocols + python3 test.py + + # Test specific protocol + python3 test.py --protocol gossipsub + python3 test.py --protocol sqmr + python3 test.py --protocol reversed-sqmr + +The test supports three protocols: +- Gossipsub: Traditional pub/sub broadcasting +- SQMR: Query/response protocol where broadcaster sends queries +- ReveresedSqmr: Query/response protocol where receivers send queries +""" + +import subprocess +import time +import requests +import sys +import signal +import os +import argparse + + +def cleanup_process(process): + """Clean up a running process gracefully.""" + if not process: + return + + print("Cleaning up process...") + try: + # Send SIGINT (Ctrl+C) to allow graceful cleanup of containers + process.send_signal(signal.SIGINT) + print("Sent SIGINT to process, waiting for graceful shutdown...") + + # Wait up to 30 seconds for graceful shutdown + try: + process.wait(timeout=30) + print(f"Process {process.pid} terminated gracefully") + except subprocess.TimeoutExpired: + print("Process didn't terminate gracefully, sending SIGTERM...") + process.terminate() + try: + process.wait(timeout=10) + print(f"Process {process.pid} terminated with SIGTERM") + except subprocess.TimeoutExpired: + print("Process didn't respond to SIGTERM, sending SIGKILL...") + process.kill() + process.wait() + print(f"Process {process.pid} killed with SIGKILL") + + # Print any output for debugging + try: + stdout, stderr = process.communicate(timeout=1) + if stdout: + print("Process stdout (last 1000 chars):") + print(stdout.decode()[-1000:]) + if stderr: + print("Process stderr (last 1000 chars):") + print(stderr.decode()[-1000:]) + except subprocess.TimeoutExpired: + pass + + except ProcessLookupError: + print("Process already terminated") + except Exception as cleanup_error: + print(f"Error during cleanup: {cleanup_error}") + + +def check_network_connections(): + """Check if nodes have established network connections via Prometheus metrics.""" + try: + # Query Prometheus for connected peers metric + prometheus_url = "http://localhost:9090/api/v1/query" + params = {"query": "network_connected_peers"} + + response = requests.get(prometheus_url, params=params, timeout=10) + response.raise_for_status() + data = response.json() + + if data.get("status") != "success": + print(f"❌ Prometheus query failed: {data}") + return False + + results = data.get("data", {}).get("result", []) + + if not results: + print("❌ network_connected_peers metric not found in Prometheus") + return False + + # Check connection metrics for each node + total_connections = 0 + nodes_with_connections = 0 + + for result in results: + value = float(result.get("value", [0, "0"])[1]) + instance = result.get("metric", {}).get("instance", "unknown") + print(f"Node {instance}: {value} connected peers") + + total_connections += value + if value > 0: + nodes_with_connections += 1 + + # For a 2-node test, we expect at least 1 connection total (bidirectional connection) + # and at least 1 node should have connections + if total_connections >= 1 and nodes_with_connections >= 1: + print( + f"✅ PASS: Network connections established (total: {total_connections}, nodes with connections: {nodes_with_connections})" + ) + return True + else: + print( + f"❌ FAIL: Insufficient network connections (total: {total_connections}, nodes with connections: {nodes_with_connections})" + ) + return False + + except requests.exceptions.RequestException as e: + print(f"❌ Failed to query Prometheus for connections: {e}") + return False + except Exception as e: + print(f"❌ Error checking network connections: {e}") + return False + + +def check_broadcaster_vs_receiver(): + """ + Check if the issue is with broadcaster (not sending) or receiver (not receiving). + + Returns: + tuple: (broadcaster_issue, receiver_issue) where True indicates an issue + """ + try: + prometheus_url = "http://localhost:9090/api/v1/query" + + # Query both sent and received messages metrics (actual metrics used by stress test) + sent_params = {"query": "broadcast_message_count"} + received_params = {"query": "receive_message_count"} + + # Get sent messages + sent_response = requests.get(prometheus_url, params=sent_params, timeout=10) + sent_response.raise_for_status() + sent_data = sent_response.json() + + # Get received messages + received_response = requests.get( + prometheus_url, params=received_params, timeout=10 + ) + received_response.raise_for_status() + received_data = received_response.json() + + # Parse sent messages + total_sent = 0 + if sent_data.get("status") == "success": + sent_results = sent_data.get("data", {}).get("result", []) + for result in sent_results: + value = float(result.get("value", [0, "0"])[1]) + instance = result.get("metric", {}).get("instance", "unknown") + print(f"Node {instance}: {value} messages sent") + total_sent += value + + # Parse received messages + total_received = 0 + if received_data.get("status") == "success": + received_results = received_data.get("data", {}).get("result", []) + for result in received_results: + value = float(result.get("value", [0, "0"])[1]) + instance = result.get("metric", {}).get("instance", "unknown") + print(f"Node {instance}: {value} messages received") + total_received += value + + print( + f"📊 Summary: {total_sent} messages sent, {total_received} messages received" + ) + + # Determine issues + broadcaster_issue = total_sent == 0 # No messages being sent + receiver_issue = ( + total_sent > 0 and total_received == 0 + ) # Messages sent but none received + + if broadcaster_issue: + print("❌ Broadcaster Issue: No messages are being sent") + elif receiver_issue: + print("❌ Receiver Issue: Messages are being sent but not received") + elif total_sent > 0 and total_received > 0: + print("✅ Both broadcaster and receiver are working") + else: + print("⚠️ No message activity detected") + + return broadcaster_issue, receiver_issue + + except requests.exceptions.RequestException as e: + print(f"❌ Failed to query Prometheus for message metrics: {e}") + return True, True # Assume both have issues if we can't check + except Exception as e: + print(f"❌ Error checking broadcaster vs receiver: {e}") + return True, True + + +def run_protocol_test(protocol_name): + """Test a specific network protocol.""" + print(f"Starting {protocol_name} test...") + + # 1. Run the command in the background + cmd = [ + "python3", + "local.py", + "--mode", + "one", + "--num-nodes", + "2", + "--network-protocol", + protocol_name, + ] + + print(f"Running command: {' '.join(cmd)}") + process = subprocess.Popen( + cmd, + text=True, + preexec_fn=os.setsid, + ) + + try: + + print(f"Process started with PID: {process.pid}") + + # 2. Sleep for 60 seconds + print("Waiting 60 seconds for message exchange...") + for i in range(60, 0, -10): + print(f" {i} seconds remaining...") + time.sleep(10) + if process.poll() is not None: + raise Exception( + f"Process terminated early with return code {process.returncode}" + ) + + # 3. Query Prometheus for connection metrics first + print("Checking if nodes have established connections...") + + connection_success = check_network_connections() + if not connection_success: + raise Exception("Nodes failed to establish connections") + + # 4. Check broadcaster vs receiver issue for detailed diagnostics + print("Checking if the issue is in broadcaster or receiver...") + + broadcaster_issue, receiver_issue = check_broadcaster_vs_receiver() + + # 5. Query Prometheus for the message metric + print("Querying Prometheus for receive_message_count metric...") + + # Query Prometheus API + prometheus_url = "http://localhost:9090/api/v1/query" + params = {"query": "receive_message_count"} + + try: + response = requests.get(prometheus_url, params=params, timeout=10) + response.raise_for_status() + except requests.exceptions.RequestException as e: + raise Exception( + f"Failed to query Prometheus: {e}. Make sure Prometheus is running on localhost:9090" + ) + + try: + data = response.json() + except Exception as e: + raise Exception(f"Error parsing Prometheus response: {e}") + + if data.get("status") != "success": + raise Exception(f"Prometheus query failed: {data}") + + results = data.get("data", {}).get("result", []) + + if not results: + raise Exception("receive_message_count metric not found in Prometheus") + + # Check if any of the metric values is > 0 + total_messages = 0 + for result in results: + value = float(result.get("value", [0, "0"])[1]) + total_messages += value + print( + f"Found metric value: {value} for instance {result.get('metric', {})}" + ) + + if total_messages <= 0: + # Provide more detailed error message based on broadcaster/receiver analysis + if broadcaster_issue and receiver_issue: + raise Exception( + "Both broadcaster and receiver have issues - no messages sent or received" + ) + elif broadcaster_issue: + raise Exception( + "Broadcaster issue: No messages being sent by the broadcaster node" + ) + elif receiver_issue: + raise Exception( + "Receiver issue: Messages are being sent but not received by receiver nodes" + ) + else: + raise Exception(f"receive_message_count = {total_messages} (not > 0)") + + print(f"✅ PASS: receive_message_count = {total_messages} (> 0)") + return True + + except Exception as e: + print(f"❌ FAIL: {e}") + + # Get process output on any failure if not already captured + if process: + try: + stdout, stderr = process.communicate(timeout=1) + print(f"Process stdout:\n{stdout}") + print(f"Process stderr:\n{stderr}") + except: + pass + + return False + + finally: + # 5. Clean up the running process + cleanup_process(process) + + +def run_all_tests(): + """Run tests for all network protocols.""" + protocols = ["gossipsub", "sqmr", "reversed-sqmr"] + results = {} + + for protocol in protocols: + print("=" * 80) + print(f"Testing {protocol.upper()} Protocol") + print("=" * 80) + + success = run_protocol_test(protocol) + results[protocol] = success + + print("=" * 80) + if success: + print(f"✅ {protocol.upper()} TEST PASSED!") + else: + print(f"❌ {protocol.upper()} TEST FAILED!") + print("=" * 80) + print() + + # Wait a bit between tests to ensure clean separation + if protocol != protocols[-1]: # Don't wait after the last test + print("Waiting 10 seconds before next test...") + time.sleep(10) + + return results + + +def main(): + """Entry point.""" + parser = argparse.ArgumentParser(description="Network Protocol Test Suite") + parser.add_argument( + "--protocol", + choices=["gossipsub", "sqmr", "reversed-sqmr", "all"], + default="all", + help="Protocol to test (default: all)", + ) + args = parser.parse_args() + + if args.protocol == "all": + print("=" * 80) + print("Network Protocol Comparison Test") + print("Testing: Gossipsub, SQMR, and ReveresedSqmr") + print("=" * 80) + print() + + results = run_all_tests() + + print("=" * 80) + print("FINAL RESULTS:") + print("=" * 80) + + all_passed = True + for protocol, success in results.items(): + status = "✅ PASSED" if success else "❌ FAILED" + print(f"{protocol.upper():15} : {status}") + if not success: + all_passed = False + + print("=" * 80) + if all_passed: + print("🎉 ALL TESTS PASSED!") + sys.exit(0) + else: + print("💥 SOME TESTS FAILED!") + sys.exit(1) + else: + # Run single protocol test + print("=" * 80) + print(f"Testing {args.protocol.upper()} Protocol") + print("=" * 80) + + success = run_protocol_test(args.protocol) + + print("=" * 80) + if success: + print(f"🎉 {args.protocol.upper()} TEST PASSED!") + sys.exit(0) + else: + print(f"💥 {args.protocol.upper()} TEST FAILED!") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/utils.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/utils.py new file mode 100644 index 00000000000..9f127ebc285 --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/utils.py @@ -0,0 +1,101 @@ +import os +import time +import subprocess +import functools + + +def make_timestamp() -> str: + return time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime()) + + +def project_root() -> str: + result = os.path.abspath(f"{__file__}/../../../../../../../") + assert result.endswith( + "/sequencer" + ), f"Project root must end in '/sequencer' but {result}" + return result + + +def run_cmd(cmd: str, hint: str = "none", may_fail: bool = False): + print(f"🔧🔧🔧 CMD: {cmd}", flush=True) + try: + result = subprocess.run(cmd, shell=True, check=False) + if result.returncode != 0 and not may_fail: + raise RuntimeError( + f"Command failed with exit code {result.returncode}: {cmd}\n ⚠️ ⚠️ ⚠️ Hint: {hint}" + ) + except KeyboardInterrupt: + print("\n⚠️ Command interrupted by user (Ctrl+C)") + raise + + +def pr(string: str): + print(f"🔔 INFO: {string}", flush=True) + + +def connect_to_cluster(): + run_cmd( + "gcloud container clusters get-credentials sequencer-dev --region us-central1 --project starkware-dev", + hint="Make sure you have gcloud installed and you are logged in (run `gcloud auth login`).", + ) + + +def make_multi_address(network_address: str, port: int, peer_id: str, args) -> str: + if args.quic: + return f"{network_address}/udp/{port}/quic-v1/p2p/{peer_id}" + else: + return f"{network_address}/tcp/{port}/p2p/{peer_id}" + + +def __get_peer_id_from_secret_key(secret_key: str) -> str: + """Get peer ID by running the cargo command with the given secret key.""" + cmd = f"cargo run --bin get_peer_id_from_secret_key {secret_key}" + try: + result = subprocess.run( + cmd, + shell=True, + capture_output=True, + text=True, + check=True, + cwd=project_root(), + ) + return result.stdout.strip().replace("Peer ID: ", "") + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get peer ID from secret key {secret_key}: {e}") + + +@functools.lru_cache(maxsize=128) +def get_peer_id_from_node_id(node_id: int) -> str: + """Get peer ID for a node by converting node ID to secret key and running the cargo command.""" + # Convert node ID to a 64-character hex string (32 bytes) with leading zeros + bytes = node_id.to_bytes(32, "little") + secret_key = f"0x{bytes.hex()}" + return __get_peer_id_from_secret_key(secret_key) + + +def get_commit() -> str: + """Get the current git commit hash, with '-dirty' suffix if there are uncommitted changes.""" + try: + # Get the commit hash + result = subprocess.run( + ["git", "rev-parse", "HEAD"], + capture_output=True, + text=True, + check=True, + cwd=project_root(), + ) + commit_hash = result.stdout.strip() + + # Check if there are uncommitted changes + dirty_check = subprocess.run( + ["git", "diff-index", "--quiet", "HEAD"], + cwd=project_root(), + ) + + # diff-index returns non-zero if there are changes + if dirty_check.returncode != 0: + commit_hash += "-dirty" + + return commit_hash + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get git commit: {e}") diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/yaml_maker.py b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/yaml_maker.py new file mode 100644 index 00000000000..412744b332c --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/run/yaml_maker.py @@ -0,0 +1,498 @@ +import yaml +from typing import Dict, Any +from args import get_env_vars +from utils import get_peer_id_from_node_id, make_multi_address + + +prometheus_service_name: str = "prometheus-service" + + +def _generate_yaml(data: Dict[str, Any]) -> str: + """ + Generate YAML string from data structure with proper formatting. + + Args: + data: Dictionary representing the Kubernetes manifest + + Returns: + Properly formatted YAML string + """ + try: + return yaml.dump( + data, + default_flow_style=False, + sort_keys=False, + indent=2, + width=120, + allow_unicode=True, + ) + except yaml.YAMLError as e: + raise ValueError(f"Failed to generate YAML: {e}") + + +def _get_prometheus_config_data( + self_scrape: bool, + metric_urls: list[str], + scrape_seconds: int, +) -> Dict[str, Any]: + """Generate Prometheus configuration data structure.""" + scrape_configs = [] + + if self_scrape: + scrape_configs.append( + { + "job_name": "prometheus", + "static_configs": [{"targets": ["localhost:9090"]}], + } + ) + + for i, url in enumerate(metric_urls): + scrape_configs.append( + { + "job_name": f"broadcast_network_stress_test_{i}", + "static_configs": [ + { + "targets": [url], + "labels": { + "application": "broadcast_network_stress_test_node", + "environment": "test", + }, + } + ], + } + ) + + return { + "global": {"scrape_interval": f"{scrape_seconds}s"}, + "scrape_configs": scrape_configs, + } + + +def get_prometheus_config(self_scrape: bool, metric_urls: list[str]) -> str: + """Generate Prometheus configuration YAML.""" + config_data = _get_prometheus_config_data( + self_scrape, metric_urls, scrape_seconds=1 + ) + return _generate_yaml(config_data) + + +def get_prometheus_yaml_file(num_nodes: int) -> str: + """Generate Prometheus ConfigMap YAML.""" + # Generate targets for each network stress test node (metrics on port 2000) + urls = [ + f"broadcast-network-stress-test-{i}.broadcast-network-stress-test-headless:2000" + for i in range(num_nodes) + ] + # Get the config data structure using existing function + config_data = _get_prometheus_config_data( + self_scrape=False, metric_urls=urls, scrape_seconds=5 + ) + + # Create a LiteralStr class to force literal block scalar representation + class LiteralStr(str): + pass + + def literal_str_representer(dumper, data): + return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") + + yaml.add_representer(LiteralStr, literal_str_representer) + + # Convert config data to YAML string and wrap as LiteralStr + config_yaml = _generate_yaml(config_data) + literal_config = LiteralStr(config_yaml) + + data = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "prometheus-config"}, + "data": {"prometheus.yml": literal_config}, + } + + return _generate_yaml(data) + + +def get_prometheus_deployment_yaml_file() -> str: + """Generate Prometheus StatefulSet YAML (named deployment for backward compatibility).""" + data = { + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "metadata": {"name": "prometheus"}, + "spec": { + "replicas": 1, + "serviceName": "prometheus-headless", + "selector": {"matchLabels": {"app": "prometheus"}}, + "template": { + "metadata": {"labels": {"app": "prometheus"}}, + "spec": { + "securityContext": { + "fsGroup": 65534, # nobody group - matches Prometheus container user + "runAsUser": 65534, # nobody user + "runAsGroup": 65534, # nobody group + "runAsNonRoot": True, + }, + "containers": [ + { + "name": "prometheus", + "image": "registry.hub.docker.com/prom/prometheus:main", + "imagePullPolicy": "Always", + "ports": [{"containerPort": 9090}], + "securityContext": { + "allowPrivilegeEscalation": False, + "readOnlyRootFilesystem": True, + "capabilities": {"drop": ["ALL"]}, + }, + "volumeMounts": [ + { + "name": "config-volume", + "mountPath": "/etc/prometheus", + }, + { + "name": "prometheus-data", + "mountPath": "/prometheus", + }, + { + "name": "tmp-volume", + "mountPath": "/tmp", + }, + ], + "args": [ + "--config.file=/etc/prometheus/prometheus.yml", + "--storage.tsdb.path=/prometheus", + "--web.console.libraries=/etc/prometheus/console_libraries", + "--web.console.templates=/etc/prometheus/consoles", + "--storage.tsdb.retention.time=15d", + "--web.enable-lifecycle", + "--web.listen-address=0.0.0.0:9090", + ], + } + ], + "volumes": [ + { + "name": "config-volume", + "configMap": {"name": "prometheus-config"}, + }, + { + "name": "tmp-volume", + "emptyDir": {}, + }, + ], + }, + }, + "volumeClaimTemplates": [ + { + "metadata": {"name": "prometheus-data"}, + "spec": { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": "16Gi"}}, + }, + } + ], + }, + } + + return _generate_yaml(data) + + +def get_prometheus_service_yaml_file() -> str: + """Generate Prometheus Service YAML.""" + data = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": prometheus_service_name}, + "spec": { + "selector": {"app": "prometheus"}, + "ports": [{"port": 9090, "targetPort": 9090}], + "type": "ClusterIP", + }, + } + + return _generate_yaml(data) + + +def get_prometheus_headless_service_yaml_file() -> str: + """Generate Prometheus Headless Service YAML for StatefulSet.""" + data = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": "prometheus-headless"}, + "spec": { + "selector": {"app": "prometheus"}, + "ports": [{"port": 9090, "targetPort": 9090}], + "clusterIP": "None", + }, + } + + return _generate_yaml(data) + + +def get_network_stress_test_deployment_yaml_file(image_tag: str, args) -> str: + """Generate Network Stress Test Indexed Job YAML.""" + # Get command line arguments and convert them to environment variables + bootstrap_nodes = [ + make_multi_address( + network_address=f"/dns/broadcast-network-stress-test-{j}.broadcast-network-stress-test-headless", + port=10000, + peer_id=get_peer_id_from_node_id(j), + args=args, + ) + for j in range(args.num_nodes) + ] + + env_vars = get_env_vars( + id=None, + metric_port=2000, + p2p_port=10000, + bootstrap_nodes=bootstrap_nodes, + args=args, + ) + + # Build the pod spec + pod_spec = { + "subdomain": "broadcast-network-stress-test-headless", # For stable DNS with headless service + "restartPolicy": "Never", # Jobs require Never or OnFailure + "setHostnameAsFQDN": False, # Keep short hostname + "containers": [ + { + "name": "broadcast-network-stress-test", + "image": image_tag, + "securityContext": {"privileged": True}, + "ports": [ + {"containerPort": 2000, "name": "metrics"}, + { + "containerPort": 10000, + "protocol": "UDP", + "name": "p2p-udp", + }, + { + "containerPort": 10000, + "protocol": "TCP", + "name": "p2p-tcp", + }, + ], + "env": env_vars, + "resources": { + "limits": { + "memory": args.memory_limits, + "cpu": args.cpu_limits, + }, + "requests": { + "memory": args.memory_requests, + "cpu": args.cpu_requests, + }, + }, + } + ], + } + + # Add tolerations and nodeSelector if dedicated node is requested + if args.dedicated_node_pool: + pod_spec["tolerations"] = [ + { + "effect": "NoSchedule", + "key": "key", + "operator": "Equal", + "value": args.node_pool_name, + } + ] + pod_spec["nodeSelector"] = {"role": args.node_pool_role} + + data = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": {"name": "broadcast-network-stress-test"}, + "spec": { + "completionMode": "Indexed", # Each pod gets JOB_COMPLETION_INDEX from 0 to completions-1 + "completions": args.num_nodes, # Total number of indexed pods + "parallelism": args.num_nodes, # Run all pods in parallel + "template": { + "metadata": {"labels": {"app": "broadcast-network-stress-test"}}, + "spec": pod_spec, + }, + }, + } + + return _generate_yaml(data) + + +def get_network_stress_test_headless_service_yaml_file() -> str: + """Generate Network Stress Test headless Service YAML.""" + data = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": "broadcast-network-stress-test-headless"}, + "spec": { + "clusterIP": "None", + "selector": {"app": "broadcast-network-stress-test"}, + "ports": [ + {"port": 2000, "targetPort": 2000, "name": "metrics"}, + { + "port": 10000, + "targetPort": 10000, + "protocol": "UDP", + "name": "p2p-udp", + }, + { + "port": 10000, + "targetPort": 10000, + "protocol": "TCP", + "name": "p2p-tcp", + }, + ], + }, + } + + return _generate_yaml(data) + + +def get_grafana_configmap_yaml_file() -> str: + """Generate Grafana ConfigMap YAML with dashboard and datasource configuration.""" + from grafana_config import ( + get_grafana_dashboard_json, + get_grafana_datasource_config_cluster, + get_grafana_dashboard_provisioning_config, + get_grafana_alerts_json, + get_grafana_config, + get_grafana_preferences_json, + ) + + dashboard_json = get_grafana_dashboard_json() + datasource_config = get_grafana_datasource_config_cluster() + dashboard_config = get_grafana_dashboard_provisioning_config() + alerting_rules = get_grafana_alerts_json() + grafana_config = get_grafana_config() + preferences_json = get_grafana_preferences_json() + + data = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "grafana-config"}, + "data": { + "dashboard.json": dashboard_json, + "datasource.yml": datasource_config, + "dashboard_config.yml": dashboard_config, + "alerting_rules.json": alerting_rules, + "grafana.ini": grafana_config, + "preferences.json": preferences_json, + }, + } + + return _generate_yaml(data) + + +def get_grafana_deployment_yaml_file() -> str: + """Generate Grafana StatefulSet YAML.""" + data = { + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "metadata": {"name": "grafana"}, + "spec": { + "replicas": 1, + "serviceName": "grafana-headless", + "selector": {"matchLabels": {"app": "grafana"}}, + "template": { + "metadata": {"labels": {"app": "grafana"}}, + "spec": { + "securityContext": { + "fsGroup": 472, # grafana group + "runAsUser": 472, # grafana user + "runAsGroup": 472, # grafana group + "runAsNonRoot": True, + }, + "containers": [ + { + "name": "grafana", + "image": "grafana/grafana:latest", + "imagePullPolicy": "Always", + "ports": [{"containerPort": 3000}], + "env": [ + { + "name": "GF_PATHS_CONFIG", + "value": "/etc/grafana/grafana.ini", + }, + ], + "securityContext": { + "allowPrivilegeEscalation": False, + "readOnlyRootFilesystem": False, + "capabilities": {"drop": ["ALL"]}, + }, + "volumeMounts": [ + { + "name": "grafana-config", + "mountPath": "/etc/grafana/grafana.ini", + "subPath": "grafana.ini", + }, + { + "name": "grafana-config", + "mountPath": "/etc/grafana/provisioning/datasources/datasource.yml", + "subPath": "datasource.yml", + }, + { + "name": "grafana-config", + "mountPath": "/etc/grafana/provisioning/dashboards/dashboard_config.yml", + "subPath": "dashboard_config.yml", + }, + { + "name": "grafana-config", + "mountPath": "/etc/grafana/provisioning/dashboards/dashboard.json", + "subPath": "dashboard.json", + }, + { + "name": "grafana-data", + "mountPath": "/var/lib/grafana", + }, + ], + } + ], + "volumes": [ + { + "name": "grafana-config", + "configMap": {"name": "grafana-config"}, + } + ], + }, + }, + "volumeClaimTemplates": [ + { + "metadata": {"name": "grafana-data"}, + "spec": { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": "8Gi"}}, + }, + } + ], + }, + } + + return _generate_yaml(data) + + +def get_grafana_service_yaml_file() -> str: + """Generate Grafana Service YAML.""" + data = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": "grafana-service"}, + "spec": { + "selector": {"app": "grafana"}, + "ports": [{"port": 3000, "targetPort": 3000}], + "type": "ClusterIP", + }, + } + + return _generate_yaml(data) + + +def get_grafana_headless_service_yaml_file() -> str: + """Generate Grafana Headless Service YAML for StatefulSet.""" + data = { + "apiVersion": "v1", + "kind": "Service", + "metadata": {"name": "grafana-headless"}, + "spec": { + "selector": {"app": "grafana"}, + "ports": [{"port": 3000, "targetPort": 3000}], + "clusterIP": "None", + }, + } + + return _generate_yaml(data) diff --git a/crates/apollo_network/src/bin/broadcast_network_stress_test_node/stress_test_node.rs b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/stress_test_node.rs new file mode 100644 index 00000000000..647feb75fed --- /dev/null +++ b/crates/apollo_network/src/bin/broadcast_network_stress_test_node/stress_test_node.rs @@ -0,0 +1,399 @@ +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +use apollo_network::network_manager::NetworkManager; +use apollo_network::NetworkConfig; +use futures::future::{select_all, BoxFuture}; +use futures::FutureExt; +use libp2p::swarm::dial_opts::DialOpts; +use libp2p::{Multiaddr, PeerId}; +use tokio::task::JoinHandle; +use tokio::time::interval; +use tracing::{info, trace, warn}; + +use crate::args::{Args, Mode}; +use crate::converters::{StressTestMessage, METADATA_SIZE}; +use crate::explore_config::{extract_explore_params, ExploreConfiguration, ExplorePhase}; +use crate::message_handling::{MessageReceiver, MessageSender}; +use crate::metrics::{ + monitor_process_metrics, + receive_stress_test_message, + seconds_since_epoch, + update_broadcast_metrics, + BROADCAST_MESSAGE_BYTES, + BROADCAST_MESSAGE_BYTES_SUM, + BROADCAST_MESSAGE_COUNT, + BROADCAST_MESSAGE_SEND_DELAY_SECONDS, + BROADCAST_MESSAGE_THROUGHPUT, + NETWORK_RESET_TOTAL, +}; +use crate::network_channels::{create_network_manager_with_channels, NetworkChannels}; + +/// The main stress test node that manages network communication and monitoring +pub struct BroadcastNetworkStressTestNode { + args: Args, + network_config: NetworkConfig, + network_manager: Option, + network_channels: NetworkChannels, + explore_config: Option, +} + +impl BroadcastNetworkStressTestNode { + /// Creates network configuration from arguments + fn create_network_config(args: &Args) -> NetworkConfig { + let peer_private_key = create_peer_private_key(args.id); + let peer_private_key_hex = + peer_private_key.iter().map(|byte| format!("{byte:02x}")).collect::(); + info!("Secret Key: {peer_private_key_hex:#?}"); + + let mut network_config = NetworkConfig { + port: args.p2p_port, + secret_key: Some(peer_private_key.to_vec()), + ..Default::default() + }; + + network_config.discovery_config.heartbeat_interval = Duration::from_secs(99999999); + + if !args.bootstrap.is_empty() { + let bootstrap_peers: Vec = + args.bootstrap.iter().map(|s| Multiaddr::from_str(s.trim()).unwrap()).collect(); + network_config.bootstrap_peer_multiaddr = Some(bootstrap_peers); + } + + network_config + } + + /// Creates explore configuration and initializes message parameters + fn setup_explore_config(args: &Args) -> Option { + if let Mode::Explore = args.mode { + let (cool_down, run_duration, min_throughput, min_message_size) = + extract_explore_params(args); + let explore_config = ExploreConfiguration::new( + cool_down, + run_duration, + min_throughput, + min_message_size, + ); + Some(explore_config) + } else { + None + } + } + + /// Creates a new BroadcastNetworkStressTestNode instance + pub async fn new(args: Args) -> Self { + // Create network configuration + let network_config = Self::create_network_config(&args); + + // Create network manager with protocol channels + let (network_manager, network_channels) = create_network_manager_with_channels( + &network_config, + args.buffer_size, + &args.network_protocol, + ); + + // Setup explore configuration if needed + let explore_config = Self::setup_explore_config(&args); + + Self { + args, + network_config, + network_manager: Some(network_manager), + network_channels, + explore_config, + } + } + + /// Starts the network manager in the background + pub async fn start_network_manager(&mut self) -> BoxFuture<'static, ()> { + let network_manager = + self.network_manager.take().expect("Network manager should be available"); + async move { + let _ = network_manager.run().await; + } + .boxed() + } + + /// Recreates the network manager with fresh state + pub async fn recreate_network_manager(&mut self) { + // Create new network manager with protocol channels using helper method + let (network_manager, network_channels) = create_network_manager_with_channels( + &self.network_config, + self.args.buffer_size, + &self.args.network_protocol, + ); + + info!("Recreated Network Manager"); + + // Update the struct with new components + self.network_manager = Some(network_manager); + self.network_channels = network_channels; + } + + /// Gets the broadcaster ID with validation for modes that require it + fn get_broadcaster_id(args: &Args) -> u64 { + args.broadcaster.expect("broadcaster required for one/explore mode") + } + + /// Determines if this node should broadcast messages based on the mode + pub fn should_broadcast(&self) -> bool { + match self.args.mode { + Mode::AllBroadcast | Mode::RoundRobin => true, + Mode::OneBroadcast | Mode::Explore => { + let broadcaster_id = Self::get_broadcaster_id(&self.args); + self.args.id == broadcaster_id + } + } + } + + fn get_peers(&self) -> Vec { + self.network_config + .bootstrap_peer_multiaddr + .as_ref() + .map(|peers| { + peers.iter().map(|m| DialOpts::from(m.clone()).get_peer_id().unwrap()).collect() + }) + .unwrap_or_default() + } + + /// Starts the message sending task if this node should broadcast + pub async fn start_message_sender(&mut self) -> Option> { + if !self.should_broadcast() { + info!("Node {} will NOT broadcast in mode `{}`", self.args.id, self.args.mode); + return None; + } + + info!("Node {} will broadcast in mode `{}`", self.args.id, self.args.mode); + + let message_sender = self.network_channels.take_sender(); + let args_clone = self.args.clone(); + let explore_config = self.explore_config.clone(); + let peers = self.get_peers(); + + Some( + async move { + Self::send_stress_test_messages_impl( + message_sender, + &args_clone, + peers, + &explore_config, + ) + .await; + } + .boxed(), + ) + } + + /// Unified implementation for sending stress test messages via any protocol + async fn send_stress_test_messages_impl( + mut message_sender: MessageSender, + args: &Args, + peers: Vec, + explore_config: &Option, + ) { + let size_bytes = args + .message_size_bytes + .expect("Even in explore mode message size should be set automatically."); + let heartbeat = Duration::from_millis( + args.heartbeat_millis + .expect("Even in explore mode heartbeat millis should be set automatically."), + ); + + let mut message_index = 0; + let mut message = get_message(args.id, size_bytes).clone(); + update_broadcast_metrics(message.len(), heartbeat); + + let mut interval = interval(heartbeat); + loop { + interval.tick().await; + + // Check if this node should broadcast based on the mode + let should_broadcast_now = match args.mode { + Mode::AllBroadcast | Mode::OneBroadcast => true, + Mode::RoundRobin => should_broadcast_round_robin(args), + Mode::Explore => { + explore_config + .as_ref() + .expect("ExploreConfig not available") + .get_current_phase() + == ExplorePhase::Running + } + }; + + if should_broadcast_now { + message.metadata.time = SystemTime::now(); + message.metadata.message_index = message_index; + let message_clone = message.clone().into(); + let start_time = std::time::Instant::now(); + message_sender.send_message(&peers, message_clone).await; + BROADCAST_MESSAGE_SEND_DELAY_SECONDS.record(start_time.elapsed().as_secs_f64()); + BROADCAST_MESSAGE_BYTES.set(message.len() as f64); + BROADCAST_MESSAGE_COUNT.increment(1); + BROADCAST_MESSAGE_BYTES_SUM.increment(message.len() as u64); + trace!("Node {} sent message {message_index} in mode `{}`", args.id, args.mode); + message_index += 1; + } + } + } + + /// Starts the message receiving task + pub async fn start_message_receiver(&mut self) -> BoxFuture<'static, ()> { + let message_receiver = self.network_channels.take_receiver(); + + async move { + Self::receive_stress_test_messages_impl(message_receiver).await; + } + .boxed() + } + + /// Implementation of the message receiving logic (moved from the standalone function) + async fn receive_stress_test_messages_impl(message_receiver: MessageReceiver) { + info!("Starting message receiver"); + let message_index_tracker = Arc::new(Mutex::new(Default::default())); + let tracker_ref = &message_index_tracker; + message_receiver + .for_each(move |message, peer_id| { + receive_stress_test_message(message, peer_id, tracker_ref.clone()) + }) + .await; + info!("Message receiver task ended"); + } + + /// Starts the process metrics monitoring task + pub fn start_metrics_monitor(&self) -> BoxFuture<'static, ()> { + let metrics_interval = 1; + async move { + monitor_process_metrics(metrics_interval).await; + } + .boxed() + } + + /// Sets up and starts all tasks common to both simple and network reset modes + async fn setup_tasks(&mut self) -> Vec> { + let mut tasks = Vec::new(); + tasks.push(self.start_network_manager().await); + tasks.push(self.start_message_receiver().await); + tasks.push(self.start_metrics_monitor()); + + if let Some(sender_task) = self.start_message_sender().await { + tasks.push(sender_task); + } + + tasks + } + + async fn wait_for_next_running_phase(&mut self) { + if let Some(explore_config) = &mut self.explore_config { + if self.args.id == Self::get_broadcaster_id(&self.args) { + BROADCAST_MESSAGE_THROUGHPUT.set(0); + } + while explore_config.get_current_phase() == ExplorePhase::CoolDown { + tokio::time::sleep(Duration::from_millis(100)).await; + } + let (size, duration) = explore_config.get_current_size_and_heartbeat(); + self.args.message_size_bytes = Some(size); + self.args.heartbeat_millis = Some(duration.as_millis().try_into().unwrap()); + } + } + + /// Unified run function that handles both simple and network reset modes + pub async fn run(mut self) -> Result<(), Box> { + let test_timeout = Duration::from_secs(self.args.timeout); + let start_time = tokio::time::Instant::now(); + + self.wait_for_next_running_phase().await; + + // Main loop - restart if network reset is enabled, otherwise run once + loop { + info!("Starting/restarting all tasks"); + + // Start all common tasks + let mut tasks = self.setup_tasks().await; + + // Add reset coordination task only for explore mode + if let Some(explore_config) = &self.explore_config { + let explore_config_clone = explore_config.clone(); + assert_eq!(explore_config_clone.get_current_phase(), ExplorePhase::Running); + let reset_task = async move { + while explore_config_clone.get_current_phase() == ExplorePhase::Running { + tokio::time::sleep(Duration::from_millis(500)).await; + } + info!("Explore mode: CoolDown phase detected - triggering network reset"); + NETWORK_RESET_TOTAL.increment(1); + } + .boxed(); + tasks.push(reset_task); + } + + // Wait for either timeout or any task completion + let remaining_time = test_timeout.saturating_sub(start_time.elapsed()); + let spawned_tasks: Vec<_> = tasks.into_iter().map(|task| tokio::spawn(task)).collect(); + let task_completed = + tokio::time::timeout(remaining_time, race_and_kill_tasks(spawned_tasks)) + .await + .is_ok(); + + if !task_completed { + info!("Test timeout reached"); + return Err("Test timeout".into()); + } + + // Handle task completion + if self.explore_config.is_none() { + return Err("Tasks should never end in simple mode".into()); + } + + // Reset mode: any task completing means restart is needed + info!("Task completed - triggering restart"); + if let Some(explore_config) = &mut self.explore_config { + assert_eq!(explore_config.get_current_phase(), ExplorePhase::CoolDown); + } + + self.wait_for_next_running_phase().await; + // Recreate network manager for clean state + self.recreate_network_manager().await; + } + } +} + +pub async fn race_and_kill_tasks(spawned_tasks: Vec>) { + if spawned_tasks.is_empty() { + return; + } + + // Wait for any task to complete + let (result, _index, remaining_tasks) = select_all(spawned_tasks).await; + + // Log the result of the completed task + if let Err(e) = result { + warn!("Task completed with error: {:?}", e); + } + + // Abort all remaining tasks + for task in remaining_tasks { + task.abort(); + } +} + +fn get_message(id: u64, size_bytes: usize) -> StressTestMessage { + let message = StressTestMessage::new(id, 0, vec![0; size_bytes - *METADATA_SIZE]); + assert_eq!(Vec::::from(message.clone()).len(), size_bytes); + message +} + +fn should_broadcast_round_robin(args: &Args) -> bool { + let now_seconds = seconds_since_epoch(); + let round_duration_seconds = + args.round_duration_seconds.expect("round_duration_seconds required for rr mode"); + let current_round = (now_seconds / round_duration_seconds) % args.num_nodes; + args.id == current_round +} + +fn create_peer_private_key(peer_index: u64) -> [u8; 32] { + let array = peer_index.to_le_bytes(); + assert_eq!(array.len(), 8); + let mut private_key = [0u8; 32]; + private_key[0..8].copy_from_slice(&array); + private_key +} diff --git a/temp_bin/broadcast_network_stress_test_node b/temp_bin/broadcast_network_stress_test_node new file mode 100755 index 00000000000..f38c2352130 Binary files /dev/null and b/temp_bin/broadcast_network_stress_test_node differ