Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
target/
*.data.folded
*.perf.data
scripts/*.png

*.results.txt
# I'll output the results of cargo run to these files
*.results.txt
39 changes: 19 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ members = [
resolver = "2"

[workspace.dependencies]
hydro_lang = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" }
hydro_std = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" }
hydro_test = { version = "0.0.0", git = "https://github.com/hydro-project/hydro.git" }
dfir_lang = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" }
hydro_build_utils = { version = "0.0.1", git = "https://github.com/hydro-project/hydro.git" }
hydro_deploy = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" }
include_mdtests = { version = "0.0.0", git = "https://github.com/hydro-project/hydro.git" }
hydro_lang = { path = "../hydroflow/hydro_lang" }
hydro_std = { path = "../hydroflow/hydro_std" }
hydro_test = { path = "../hydroflow/hydro_test" }
dfir_lang = { path = "../hydroflow/dfir_lang" }
hydro_build_utils = { path = "../hydroflow/hydro_build_utils" }
hydro_deploy = { path = "../hydroflow/hydro_deploy/core" }
include_mdtests = { path = "../hydroflow/include_mdtests" }
serde = { version = "1.0.197", features = ["derive"] }
stageleft = "0.10.0"
stageleft_tool = "0.10.0"
Expand Down
108 changes: 108 additions & 0 deletions hydro_optimize_examples/examples/network_calibrator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::Arc;

use clap::Parser;
use hydro_deploy::Deployment;
use hydro_deploy::gcp::GcpNetwork;
use hydro_lang::viz::config::GraphConfig;
use hydro_lang::location::Location;
use hydro_lang::prelude::FlowBuilder;
use hydro_optimize::deploy::ReusableHosts;
use hydro_optimize::deploy_and_analyze::deploy_and_analyze;
use hydro_optimize_examples::network_calibrator::{Aggregator, Client, Server, network_calibrator};
use tokio::sync::RwLock;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[command(flatten)]
graph: GraphConfig,

/// Use GCP for deployment (provide project name)
#[arg(long)]
gcp: Option<String>,
}

#[tokio::main]
async fn main() {
let args = Args::parse();

let mut deployment = Deployment::new();
let (host_arg, project) = if let Some(project) = args.gcp {
("gcp".to_string(), project)
} else {
("localhost".to_string(), String::new())
};
let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None)));

let num_clients = 10; // >1 clients so it doesn't become the bottleneck
let num_clients_per_node = 1000;

// Deploy
let mut reusable_hosts = ReusableHosts {
hosts: HashMap::new(),
host_arg,
project: project.clone(),
network: network.clone(),
};

let message_sizes = vec![1, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192];
let num_seconds_to_profile = Some(60);
let multi_run_metadata = RefCell::new(vec![]);

for message_size in message_sizes {
let builder = FlowBuilder::new();
let server = builder.cluster();
let clients = builder.cluster();
let client_aggregator = builder.process();

let clusters = vec![
(
server.id().raw_id(),
std::any::type_name::<Server>().to_string(),
1,
),
(
clients.id().raw_id(),
std::any::type_name::<Client>().to_string(),
num_clients,
),
];
let processes = vec![(
client_aggregator.id().raw_id(),
std::any::type_name::<Aggregator>().to_string(),
)];

println!("Running network calibrator with message size: {} bytes, num clients: {}", message_size, num_clients);
network_calibrator(
num_clients_per_node,
message_size,
&server,
&clients,
&client_aggregator,
);

let (rewritten_ir_builder, ir, _, _, _) =
deploy_and_analyze(
&mut reusable_hosts,
&mut deployment,
builder,
&clusters,
&processes,
vec![
std::any::type_name::<Client>().to_string(),
std::any::type_name::<Aggregator>().to_string(),
],
num_seconds_to_profile,
&multi_run_metadata,
0, // Set to 0 to turn off comparisons between iterations
)
.await;

let built = rewritten_ir_builder.build_with(|_| ir).finalize();

// Generate graphs if requested
_ = built.generate_graph_with_config(&args.graph, None);
}
}
Loading
Loading