diff --git a/.gitignore b/.gitignore index 8a333b3..ade713e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ target/ *.data.folded *.perf.data +scripts/*.png -*.results.txt \ No newline at end of file +# I'll output the results of cargo run to these files +*.results.txt diff --git a/Cargo.lock b/Cargo.lock index 96a9533..aee1001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,6 +886,14 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "copy_span" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -1113,7 +1121,6 @@ dependencies = [ [[package]] name = "dfir_lang" version = "0.14.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "auto_impl", "documented", @@ -1799,7 +1806,6 @@ checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "hydro_build_utils" version = "0.0.1" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "insta", "rustc_version", @@ -1808,7 +1814,6 @@ dependencies = [ [[package]] name = "hydro_deploy" version = "0.14.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "anyhow", "async-process", @@ -1844,7 +1849,6 @@ dependencies = [ [[package]] name = "hydro_deploy_integration" version = "0.14.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "async-recursion", "async-trait", @@ -1862,13 +1866,13 @@ dependencies = [ [[package]] name = "hydro_lang" version = "0.14.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "auto_impl", "backtrace", "bincode", "bytes", "clap", + "copy_span", "ctor 0.2.9", "data-encoding", "dfir_lang", @@ -1947,7 +1951,6 @@ dependencies = [ [[package]] name = "hydro_std" version = "0.14.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "hdrhistogram", "hydro_lang", @@ -1959,7 +1962,6 @@ dependencies = [ [[package]] name = "hydro_test" version = "0.0.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "bytes", "colored", @@ -2166,7 +2168,6 @@ dependencies = [ [[package]] name = "include_mdtests" version = "0.0.0" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "glob", "proc-macro2", @@ -2286,9 +2287,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "iri-string" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" dependencies = [ "memchr", "serde", @@ -3705,9 +3706,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.34" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "once_cell", "ring", @@ -4024,7 +4025,6 @@ checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" [[package]] name = "sinktools" version = "0.0.1" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "futures-util", "pin-project-lite", @@ -4133,9 +4133,9 @@ checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" [[package]] name = "stageleft" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92cb4d28ec3c2b3aba8ee05487f10c3aa00d7a369a3fe9d4d89e8719f28ca4f" +checksum = "101469d4cf8d54ac88b735ecd1dcc5e11da859e191a1dd0e28e71a298ffae1b9" dependencies = [ "ctor 0.4.3", "proc-macro-crate", @@ -4147,9 +4147,9 @@ dependencies = [ [[package]] name = "stageleft_macro" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05624677c37d2abebe0c3e50fa7722f99936d26de2a8a23ac5d2a397be596c0" +checksum = "e1dc19da279ba29d00ae49363841037bd7c933130d0c4476899e1d7f8f04dab5" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -4160,9 +4160,9 @@ dependencies = [ [[package]] name = "stageleft_tool" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da14207006ed0031a24197e0a2d3bc84b2a7ecf3a2ca70b70f1886cf1a37b464" +checksum = "977b4e22d5233ef274f43a02d9946dd4ee66c1957eac8a5f031450ab97bfa834" dependencies = [ "prettyplease", "proc-macro-crate", @@ -4746,7 +4746,6 @@ dependencies = [ [[package]] name = "variadics" version = "0.0.9" -source = "git+https://github.com/hydro-project/hydro.git#1c139772baa20d5c9aa8ab060cc6650d6f239ca0" dependencies = [ "hashbrown 0.14.5", "hydro_build_utils", diff --git a/Cargo.toml b/Cargo.toml index d394b0b..23fed39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,13 +8,13 @@ members = [ resolver = "2" [workspace.dependencies] -hydro_lang = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" } -hydro_std = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" } -hydro_test = { version = "0.0.0", git = "https://github.com/hydro-project/hydro.git" } -dfir_lang = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" } -hydro_build_utils = { version = "0.0.1", git = "https://github.com/hydro-project/hydro.git" } -hydro_deploy = { version = "0.14.0", git = "https://github.com/hydro-project/hydro.git" } -include_mdtests = { version = "0.0.0", git = "https://github.com/hydro-project/hydro.git" } +hydro_lang = { path = "../hydroflow/hydro_lang" } +hydro_std = { path = "../hydroflow/hydro_std" } +hydro_test = { path = "../hydroflow/hydro_test" } +dfir_lang = { path = "../hydroflow/dfir_lang" } +hydro_build_utils = { path = "../hydroflow/hydro_build_utils" } +hydro_deploy = { path = "../hydroflow/hydro_deploy/core" } +include_mdtests = { path = "../hydroflow/include_mdtests" } serde = { version = "1.0.197", features = ["derive"] } stageleft = "0.10.0" stageleft_tool = "0.10.0" diff --git a/hydro_optimize_examples/examples/network_calibrator.rs b/hydro_optimize_examples/examples/network_calibrator.rs new file mode 100644 index 0000000..323ae35 --- /dev/null +++ b/hydro_optimize_examples/examples/network_calibrator.rs @@ -0,0 +1,108 @@ +use std::cell::RefCell; +use std::collections::HashMap; +use std::sync::Arc; + +use clap::Parser; +use hydro_deploy::Deployment; +use hydro_deploy::gcp::GcpNetwork; +use hydro_lang::viz::config::GraphConfig; +use hydro_lang::location::Location; +use hydro_lang::prelude::FlowBuilder; +use hydro_optimize::deploy::ReusableHosts; +use hydro_optimize::deploy_and_analyze::deploy_and_analyze; +use hydro_optimize_examples::network_calibrator::{Aggregator, Client, Server, network_calibrator}; +use tokio::sync::RwLock; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + #[command(flatten)] + graph: GraphConfig, + + /// Use GCP for deployment (provide project name) + #[arg(long)] + gcp: Option, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let mut deployment = Deployment::new(); + let (host_arg, project) = if let Some(project) = args.gcp { + ("gcp".to_string(), project) + } else { + ("localhost".to_string(), String::new()) + }; + let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); + + let num_clients = 10; // >1 clients so it doesn't become the bottleneck + let num_clients_per_node = 1000; + + // Deploy + let mut reusable_hosts = ReusableHosts { + hosts: HashMap::new(), + host_arg, + project: project.clone(), + network: network.clone(), + }; + + let message_sizes = vec![1, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192]; + let num_seconds_to_profile = Some(60); + let multi_run_metadata = RefCell::new(vec![]); + + for message_size in message_sizes { + let builder = FlowBuilder::new(); + let server = builder.cluster(); + let clients = builder.cluster(); + let client_aggregator = builder.process(); + + let clusters = vec![ + ( + server.id().raw_id(), + std::any::type_name::().to_string(), + 1, + ), + ( + clients.id().raw_id(), + std::any::type_name::().to_string(), + num_clients, + ), + ]; + let processes = vec![( + client_aggregator.id().raw_id(), + std::any::type_name::().to_string(), + )]; + + println!("Running network calibrator with message size: {} bytes, num clients: {}", message_size, num_clients); + network_calibrator( + num_clients_per_node, + message_size, + &server, + &clients, + &client_aggregator, + ); + + let (rewritten_ir_builder, ir, _, _, _) = + deploy_and_analyze( + &mut reusable_hosts, + &mut deployment, + builder, + &clusters, + &processes, + vec![ + std::any::type_name::().to_string(), + std::any::type_name::().to_string(), + ], + num_seconds_to_profile, + &multi_run_metadata, + 0, // Set to 0 to turn off comparisons between iterations + ) + .await; + + let built = rewritten_ir_builder.build_with(|_| ir).finalize(); + + // Generate graphs if requested + _ = built.generate_graph_with_config(&args.graph, None); + } +} \ No newline at end of file diff --git a/hydro_optimize_examples/examples/simple_graphs.rs b/hydro_optimize_examples/examples/simple_graphs.rs new file mode 100644 index 0000000..a2ad66a --- /dev/null +++ b/hydro_optimize_examples/examples/simple_graphs.rs @@ -0,0 +1,136 @@ +use std::cell::RefCell; +use std::collections::HashMap; +use std::sync::Arc; + +use clap::Parser; +use hydro_deploy::Deployment; +use hydro_deploy::gcp::GcpNetwork; +use hydro_lang::viz::config::GraphConfig; +use hydro_lang::location::Location; +use hydro_lang::prelude::FlowBuilder; +use hydro_optimize::decoupler; +use hydro_optimize::deploy::ReusableHosts; +use hydro_optimize::deploy_and_analyze::deploy_and_analyze; +use hydro_optimize_examples::simple_graphs::{Client, Server, get_graph_function}; +use hydro_optimize_examples::simple_graphs_bench::{Aggregator, simple_graphs_bench, simple_graphs_bench_no_union}; +use tokio::sync::RwLock; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + #[command(flatten)] + graph: GraphConfig, + + /// Use GCP for deployment (provide project name) + #[arg(long)] + gcp: Option, + + #[arg(long)] + function: String, +} + +#[tokio::main] +async fn main() { + let args = Args::parse(); + + let mut deployment = Deployment::new(); + let (host_arg, project) = if let Some(project) = args.gcp { + ("gcp".to_string(), project) + } else { + ("localhost".to_string(), String::new()) + }; + let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); + + let mut builder = FlowBuilder::new(); + let num_clients = 10; + let num_clients_per_node = 1000; + let graph_function = get_graph_function(&args.function); + let server = builder.cluster(); + let clients = builder.cluster(); + let client_aggregator = builder.process(); + + simple_graphs_bench( + num_clients_per_node, + &server, + &clients, + &client_aggregator, + graph_function, + ); + // simple_graphs_bench_no_union( + // num_clients_per_node, + // &server, + // &clients, + // &client_aggregator, + // ); + + let mut clusters = vec![ + ( + server.id().raw_id(), + std::any::type_name::().to_string(), + 1, + ), + ( + clients.id().raw_id(), + std::any::type_name::().to_string(), + num_clients, + ), + ]; + let processes = vec![( + client_aggregator.id().raw_id(), + std::any::type_name::().to_string(), + )]; + + // Deploy + let mut reusable_hosts = ReusableHosts { + hosts: HashMap::new(), + host_arg, + project: project.clone(), + network: network.clone(), + }; + + let num_times_to_optimize = 2; + let num_seconds_to_profile = Some(60); + let multi_run_metadata = RefCell::new(vec![]); + + for i in 0..num_times_to_optimize { + let (rewritten_ir_builder, mut ir, mut decoupler, bottleneck_name, bottleneck_num_nodes) = + deploy_and_analyze( + &mut reusable_hosts, + &mut deployment, + builder, + &clusters, + &processes, + vec![ + std::any::type_name::().to_string(), + std::any::type_name::().to_string(), + ], + num_seconds_to_profile, + &multi_run_metadata, + i, + ) + .await; + + // Apply decoupling + let mut decoupled_cluster = None; + builder = rewritten_ir_builder.build_with(|builder| { + let new_cluster = builder.cluster::<()>(); + decoupler.decoupled_location = new_cluster.id().clone(); + decoupler::decouple(&mut ir, &decoupler, &multi_run_metadata, i); + decoupled_cluster = Some(new_cluster); + + ir + }); + if let Some(new_cluster) = decoupled_cluster { + clusters.push(( + new_cluster.id().raw_id(), + format!("{}_decouple_{}", bottleneck_name, i), + bottleneck_num_nodes, + )); + } + } + + let built = builder.finalize(); + + // Generate graphs if requested + _ = built.generate_graph_with_config(&args.graph, None); +} \ No newline at end of file diff --git a/hydro_optimize_examples/src/lib.rs b/hydro_optimize_examples/src/lib.rs index 09bf34a..aebaec4 100644 --- a/hydro_optimize_examples/src/lib.rs +++ b/hydro_optimize_examples/src/lib.rs @@ -1,5 +1,13 @@ stageleft::stageleft_no_entry_crate!(); +pub mod network_calibrator; +pub mod simple_graphs; +pub mod simple_graphs_bench; +pub mod simple_kv_bench; +pub mod lock_server; +pub mod lobsters; +pub mod web_submit; + #[cfg(test)] mod test_init { #[ctor::ctor] diff --git a/hydro_optimize_examples/src/lobsters.rs b/hydro_optimize_examples/src/lobsters.rs new file mode 100644 index 0000000..7d1546e --- /dev/null +++ b/hydro_optimize_examples/src/lobsters.rs @@ -0,0 +1,165 @@ +use std::collections::HashSet; + +use hydro_lang::{ + live_collections::stream::NoOrder, + location::{Location, MemberId}, + nondet::nondet, + prelude::{Process, Stream, Unbounded}, +}; +use sha2::{Digest, Sha256}; +use stageleft::q; +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; + +pub struct Server {} + +#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)] +pub struct Story { + pub title: String, + pub epoch_time: u128, + pub id: u32, +} + +impl PartialOrd for Story { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.epoch_time.cmp(&other.epoch_time)) + } +} + +/// Implementation of Lobsters, roughly based on API calls exposed here: https://lobste.rs/s/cqnzl5/lobste_rs_access_pattern_statistics_for#c_2op8by +/// We expose the following APIs: +/// - add_user (takes username, returns api_key, should only approve if user is admin but it's tautological so just approve everyone) +/// - get_users (returns usernames) +/// - add_story (takes api_key, title, timestamp, returns story_id) +/// - add_comment (takes api_key, story_id, comment, timestamp, returns comment_id) +/// - upvote_story (takes api_key, story_id) +/// - upvote_comment (takes api_key, comment_id) +/// - get_stories (returns the 20 stories with the latest timestamps) +/// - get_comments (returns the 20 comments with the latest timestamps) +/// - get_story_comments (takes story_id, returns the comments for that story) +/// +/// Any call with an invalid API key (either it does not exist or does not have the privileges required) will not receive a response. +#[expect( + clippy::too_many_arguments, + clippy::type_complexity, + reason = "internal Lobsters code // TODO" +)] +pub fn lobsters<'a, Client>( + server: &Process<'a, Server>, + add_user: Stream<(MemberId, String), Process<'a, Server>, Unbounded, NoOrder>, + get_users: Stream, Process<'a, Server>, Unbounded, NoOrder>, + add_story: Stream< + (MemberId, (String, String, Instant)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + _add_comment: Stream< + (MemberId, (String, u32, String, Instant)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + _upvote_story: Stream< + (MemberId, (String, u32)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + _upvote_comment: Stream< + (MemberId, (String, u32)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + _get_stories: Stream, Process<'a, Server>, Unbounded, NoOrder>, + _get_comments: Stream, Process<'a, Server>, Unbounded, NoOrder>, + _get_story_comments: Stream<(MemberId, u32), Process<'a, Server>, Unbounded, NoOrder>, +) { + let user_auth_tick = server.tick(); + let stories_tick = server.tick(); + + // Add user + let add_user_with_api_key = add_user.map(q!(|(client_id, username)| { + let api_key = self::generate_api_key(username.clone()); + (client_id, (username, api_key)) + })); + let users_this_tick_with_api_key = add_user_with_api_key.batch( + &user_auth_tick, + nondet!(/** Snapshot current users to approve/deny access */), + ); + // Persisted users + let curr_users = users_this_tick_with_api_key + .clone() + .map(q!(|(_client_id, (username, api_key))| (api_key, username))) + .persist(); + let curr_users_hashset = curr_users.clone().fold_commutative_idempotent( + q!(|| HashSet::new()), + q!(|set, (_api_key, username)| { + set.insert(username); + }), + ); + // Send response back to client. Only done after the tick to ensure that once the client gets the response, the user has been added + let _add_user_response = + users_this_tick_with_api_key + .all_ticks() + .map(q!(|(client_id, (_api_key, _username))| (client_id, ()))); + + // Get users + let _get_users_response = get_users + .batch( + &user_auth_tick, + nondet!(/** Snapshot against current users */), + ) + .cross_singleton(curr_users_hashset) + .all_ticks(); + + // Add story + let add_story_pre_join = add_story.map(q!(|(client_id, (api_key, title, timestamp))| { + (api_key, (client_id, title, timestamp)) + })); + let stories = add_story_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks(); + let curr_stories = stories.batch(&stories_tick, nondet!(/** Snapshot of current stories */)).assume_ordering(nondet!(/** In order to use enumerate to assign a unique ID, we need total ordering. */)); + // Assign each story a unique ID + let (story_id_complete_cycle, story_id) = + stories_tick.cycle_with_initial(stories_tick.singleton(q!(0))); + let _indexed_curr_stories = curr_stories + .clone() + .enumerate() + .cross_singleton(story_id.clone()) + .map(q!(|((index, story), story_id)| (index + story_id, story))); + let num_curr_stories = curr_stories.clone().count(); + let new_story_id = num_curr_stories + .zip(story_id) + .map(q!(|(num_stories, story_id)| num_stories + story_id)); + story_id_complete_cycle.complete_next_tick(new_story_id); + + let _top_stories = curr_stories.clone().persist().fold_commutative_idempotent( + q!(|| vec![]), + q!( + |vec, (_api_key, ((_client_id, title, timestamp), username))| { + let new_elem = (title, timestamp, username); + // TODO: Use a binary heap + // TODO: Create a struct that is ordered by timestamp + let pos = vec.binary_search(&new_elem).unwrap_or_else(|e| e); + vec.insert(pos, new_elem); + vec.truncate(20); + } + ), + ); +} + +fn generate_api_key(email: String) -> String { + let secret = "There is no secret ingredient"; + let mut hasher = Sha256::new(); + hasher.update(email.as_bytes()); + hasher.update(secret.as_bytes()); + let hash = hasher.finalize(); + format!("{:x}", hash) +} \ No newline at end of file diff --git a/hydro_optimize_examples/src/lock_server.rs b/hydro_optimize_examples/src/lock_server.rs new file mode 100644 index 0000000..90afb66 --- /dev/null +++ b/hydro_optimize_examples/src/lock_server.rs @@ -0,0 +1,80 @@ +use hydro_lang::{ + live_collections::stream::NoOrder, + location::{Location, MemberId}, + nondet::nondet, + prelude::{Process, Stream, Unbounded}, +}; +use stageleft::q; + +pub struct Server {} + +/// Lock server implementation as described in https://dl.acm.org/doi/pdf/10.1145/3341301.3359651, with the difference being that each server can hold multiple locks. +/// Clients send (virt_client_id, server_id, acquire) requesting a lock from the server. +/// +/// If acquire = true, then: +/// - If the server currently holds the lock, it returns (virt_client_id, server_id, true). +/// - Otherwise, it returns (virt_client_id, server_id, false). +/// +/// If acquire = false, then the client wants to release its lock. Return (virt_client_id, server_id, true). +#[expect(clippy::type_complexity, reason = "internal Lock Server code // TODO")] +pub fn lock_server<'a, Client>( + server: &Process<'a, Server>, + payloads: Stream<(MemberId, (u32, u32, bool)), Process<'a, Server>, Unbounded, NoOrder>, +) -> Stream<(MemberId, (u32, u32, bool)), Process<'a, Server>, Unbounded, NoOrder> { + let server_tick = server.tick(); + let keyed_payloads = payloads.map(q!(|(client_id, (virt_client_id, server_id, acquire))| ( + server_id, + (client_id, virt_client_id, acquire) + ))); + + let batched_payloads = keyed_payloads + .batch( + &server_tick, + nondet!(/** Need to check who currently owns the lock */), + ) + .assume_ordering(nondet!(/** First to acquire the lock wins */)); + let lock_state = batched_payloads + .clone() + .persist() + .into_keyed() + .reduce(q!( + |(curr_client_id, curr_virt_client_id, is_held_by_client), + (client_id, virt_client_id, acquire)| { + if acquire { + // If the lock is currently held by the server, give the client the lock + if !*is_held_by_client { + *curr_client_id = client_id; + *curr_virt_client_id = virt_client_id; + *is_held_by_client = true; + } + } else { + // If the client is releasing the lock and it holds it, give the lock back to the server + if *is_held_by_client + && *curr_virt_client_id == virt_client_id + && *curr_client_id == client_id + { + *is_held_by_client = false; + } + } + } + )) + .entries(); + let results = batched_payloads.join(lock_state).all_ticks().map(q!(|( + server_id, + ( + (client_id, virt_client_id, acquire), + (curr_client_id, curr_virt_client_id, is_held_by_client), + ), + )| { + if acquire { + let acquired = is_held_by_client + && curr_client_id == client_id + && curr_virt_client_id == virt_client_id; + (client_id, (virt_client_id, server_id, acquired)) + } else { + // Releasing always succeeds + (client_id, (virt_client_id, server_id, true)) + } + })); + results +} diff --git a/hydro_optimize_examples/src/network_calibrator.rs b/hydro_optimize_examples/src/network_calibrator.rs new file mode 100644 index 0000000..fc2f058 --- /dev/null +++ b/hydro_optimize_examples/src/network_calibrator.rs @@ -0,0 +1,53 @@ +use hydro_lang::{live_collections::stream::NoOrder, nondet::nondet, prelude::{Cluster, Process, Stream, Unbounded}}; +use hydro_std::bench_client::{bench_client, print_bench_results}; + +use stageleft::q; + +pub struct Client; +pub struct Server; +pub struct Aggregator; + +pub fn network_calibrator<'a>( + num_clients_per_node: usize, + message_size: usize, + server: &Cluster<'a, Server>, + clients: &Cluster<'a, Client>, + client_aggregator: &Process<'a, Aggregator>, +) { + let bench_results = bench_client( + clients, + |_client, payload_request| { + size_based_workload_generator(message_size, payload_request) + }, + |payloads| { + // Server just echoes the payload + payloads + .broadcast_bincode(server, nondet!(/** Test */)) + .demux_bincode(clients) + .values() + }, + num_clients_per_node, + nondet!(/** bench */), + ); + + print_bench_results(bench_results, client_aggregator, clients); +} + +/// Generates an incrementing u32 for each virtual client ID, starting at 0 +pub fn size_based_workload_generator<'a, Client>( + message_size: usize, + payload_request: Stream<(u32, Option>), Cluster<'a, Client>, Unbounded, NoOrder>, +) -> Stream<(u32, Vec), Cluster<'a, Client>, Unbounded, NoOrder> { + payload_request.map(q!(move |(virtual_id, payload)| { + if let Some(mut payload) = payload { + if let Some(last) = payload.last_mut() { + *last += 1; + return (virtual_id, payload); + } + } + + // Temp fix for macro stuff that isn't supported by stageleft I guess + let msg_size = message_size; + (virtual_id, vec![0; msg_size]) + })) +} \ No newline at end of file diff --git a/hydro_optimize_examples/src/simple_graphs.rs b/hydro_optimize_examples/src/simple_graphs.rs new file mode 100644 index 0000000..dba95be --- /dev/null +++ b/hydro_optimize_examples/src/simple_graphs.rs @@ -0,0 +1,1175 @@ +use hydro_lang::{ + live_collections::stream::NoOrder, + location::{Location, MemberId}, + nondet::nondet, + prelude::{Cluster, KeyedStream, Unbounded}, +}; +use sha2::{Digest, Sha256}; +use stageleft::q; + +pub struct Client {} +pub struct Server {} + +pub trait GraphFunction<'a>: + Fn( + &Cluster<'a, Server>, + KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> +{ +} + +impl<'a, F> GraphFunction<'a> for F where + F: Fn( + &Cluster<'a, Server>, + KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, + ) + -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> +{ +} + +fn sha256(n: u32) -> u32 { + let start_time = std::time::Instant::now(); + let mut sha_input = n; + + loop { + let mut sha = Sha256::new(); + sha.update(sha_input.to_be_bytes()); + let sha_output = sha.finalize(); + sha_input = sha_output[0].into(); + if start_time.elapsed().as_micros() >= n.into() { + break; + } + } + + sha_input +} + +// Note: H = high load, L = low load + +pub fn get_graph_function<'a>(name: &str) -> impl GraphFunction<'a> { + match name { + "noop" => noop, + "map_h_map_h" => map_h_map_h, + "map_h_map_h_split_up" => map_h_map_h_split_up, + "map_h_map_h_parallel" => map_h_map_h_parallel, + "map_h_map_h_map_h" => map_h_map_h_map_h, + "map_h_map_h_map_l" => map_h_map_h_map_l, + "map_h_map_l_map_h" => map_h_map_l_map_h, + "map_l_map_h_map_h" => map_l_map_h_map_h, + "map_h_map_l_map_l" => map_h_map_l_map_l, + "map_l_map_h_map_l" => map_l_map_h_map_l, + "map_l_map_l_map_h" => map_l_map_l_map_h, + "map_l_map_l_map_l" => map_l_map_l_map_l, + "map_l_first_map_l_second_union" => map_l_first_map_l_second_union, + "map_l_first_map_h_second_union" => map_l_first_map_h_second_union, + "map_h_first_map_l_second_union" => map_h_first_map_l_second_union, + "map_h_first_map_h_second_union" => map_h_first_map_h_second_union, + "map_l_map_l_first_payload_second_union" => map_l_map_l_first_payload_second_union, + "map_l_map_h_first_payload_second_union" => map_l_map_h_first_payload_second_union, + "map_h_map_l_first_payload_second_union" => map_h_map_l_first_payload_second_union, + "map_h_map_h_first_payload_second_union" => map_h_map_h_first_payload_second_union, + "map_l_first_payload_second_union_map_l" => map_l_first_payload_second_union_map_l, + "map_l_first_payload_second_union_map_h" => map_l_first_payload_second_union_map_h, + "map_h_first_payload_second_union_map_l" => map_h_first_payload_second_union_map_l, + "map_h_first_payload_second_union_map_h" => map_h_first_payload_second_union_map_h, + "map_l_first_map_l_second_anti_join" => map_l_first_map_l_second_anti_join, + "map_l_first_map_h_second_anti_join" => map_l_first_map_h_second_anti_join, + "map_h_first_map_l_second_anti_join" => map_h_first_map_l_second_anti_join, + "map_h_first_map_h_second_anti_join" => map_h_first_map_h_second_anti_join, + "map_l_map_l_first_payload_second_anti_join" => map_l_map_l_first_payload_second_anti_join, + "map_l_map_h_first_payload_second_anti_join" => map_l_map_h_first_payload_second_anti_join, + "map_h_map_l_first_payload_second_anti_join" => map_h_map_l_first_payload_second_anti_join, + "map_h_map_h_first_payload_second_anti_join" => map_h_map_h_first_payload_second_anti_join, + "map_l_first_payload_second_anti_join_map_l" => map_l_first_payload_second_anti_join_map_l, + "map_l_first_payload_second_anti_join_map_h" => map_l_first_payload_second_anti_join_map_h, + "map_h_first_payload_second_anti_join_map_l" => map_h_first_payload_second_anti_join_map_l, + "map_h_first_payload_second_anti_join_map_h" => map_h_first_payload_second_anti_join_map_h, + _ => unimplemented!(), + } +} + +pub fn noop<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads +} + +pub fn map_h_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) +} + +pub fn map_h_map_h_split_up<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(10 + n % 2) + ))) +} + +pub fn map_h_map_h_parallel<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let batch0 = payloads.clone().filter(q!(|(virt_client_id, _)| virt_client_id % 2 == 0)); + let batch1 = payloads.filter(q!(|(virt_client_id, _)| virt_client_id % 2 == 1)); + let batch0out = batch0 + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))); + batch1 + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .interleave(batch0out) +} + +pub fn map_h_map_h_parallel_no_union<'a>( + _server: &Cluster<'a, Server>, + payloads1: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, + payloads2: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> (KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>) { + (payloads1 + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))), + payloads2 + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + )))) +} + +pub fn map_h_map_h_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) +} + +pub fn map_h_map_h_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) +} + +pub fn map_h_map_l_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) +} + +pub fn map_l_map_h_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) +} + +pub fn map_h_map_l_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) +} + +pub fn map_l_map_h_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) +} + +pub fn map_l_map_l_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(100 + n % 2) + ))) +} + +pub fn map_l_map_l_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(virt_client_id, n)| ( + virt_client_id, + self::sha256(n % 2 + 1) + ))) +} + +pub fn map_l_first_map_l_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let map_l1 = payloads + .clone() + .map(q!(|(_virt_client_id, n)| (None, self::sha256(n % 2 + 1)))); + let map_l2 = payloads.map(q!(|(virt_client_id, n)| ( + Some(virt_client_id), + self::sha256(n % 2 + 1) + ))); + map_l1 + .interleave(map_l2) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_l_first_map_h_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let map_l1 = payloads + .clone() + .map(q!(|(_virt_client_id, n)| (None, self::sha256(n % 2 + 1)))); + let map_h2 = payloads.map(q!(|(virt_client_id, n)| ( + Some(virt_client_id), + self::sha256(100 + n % 2) + ))); + map_l1 + .interleave(map_h2) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_h_first_map_l_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let map_h1 = payloads + .clone() + .map(q!(|(_virt_client_id, n)| (None, self::sha256(100 + n % 2)))); + let map_l2 = payloads.map(q!(|(virt_client_id, n)| ( + Some(virt_client_id), + self::sha256(n % 2 + 1) + ))); + map_h1 + .interleave(map_l2) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_h_first_map_h_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let map_h1 = payloads + .clone() + .map(q!(|(_virt_client_id, n)| (None, self::sha256(100 + n % 2)))); + let map_h2 = payloads.map(q!(|(virt_client_id, n)| ( + Some(virt_client_id), + self::sha256(100 + n % 2) + ))); + map_h1 + .interleave(map_h2) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_l_map_l_first_payload_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(_virt_client_id, n)| (None, self::sha256(n % 2 + 1)))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_l_map_h_first_payload_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(n % 2 + 1) + ))) + .map(q!(|(_virt_client_id, n)| (None, self::sha256(100 + n % 2)))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_h_map_l_first_payload_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(100 + n % 2) + ))) + .map(q!(|(_virt_client_id, n)| (None, self::sha256(n % 2 + 1)))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_h_map_h_first_payload_second_union<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(100 + n % 2) + ))) + .map(q!(|(_virt_client_id, n)| (None, self::sha256(100 + n % 2)))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, n)) + } else { + None + } + })) +} + +pub fn map_l_first_payload_second_union_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(n % 2 + 1) + ))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + let sha = self::sha256(n % 2 + 1); + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, sha)) + } else { + None + } + })) +} + +pub fn map_l_first_payload_second_union_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(n % 2 + 1) + ))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + let sha = self::sha256(100 + n % 2); + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, sha)) + } else { + None + } + })) +} + +pub fn map_h_first_payload_second_union_map_l<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(100 + n % 2) + ))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + let sha = self::sha256(n % 2 + 1); + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, sha)) + } else { + None + } + })) +} + +pub fn map_h_first_payload_second_union_map_h<'a>( + _server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + payloads + .clone() + .map(q!(|(_virt_client_id, n)| ( + None::, + self::sha256(100 + n % 2) + ))) + .interleave(payloads.map(q!(|(virt_client_id, n)| (Some(virt_client_id), n)))) + .filter_map(q!(|(virt_client_id_opt, n)| { + let sha = self::sha256(100 + n % 2); + // Since we cloned payloads, delete half the payloads so 1 input = 1 output + if let Some(virt_client_id) = virt_client_id_opt { + Some((virt_client_id, sha)) + } else { + None + } + })) +} + +pub fn map_l_first_map_l_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let true_payloads = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + true, + n + ))); + let map_l1 = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(n % 2 + 1) + ))) + .interleave(true_payloads) // The actual payloads that will pass the anti_join + .batch(&tick, nondet); + let map_l2 = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(n % 2 + 1) + ))) + .batch(&tick, nondet); + map_l1 + .filter_not_in(map_l2) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, keep, n)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_l_first_map_h_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let true_payloads = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + true, + n + ))); + let map_l1 = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(n % 2 + 1) + ))) + .interleave(true_payloads) // The actual payloads that will pass the anti_join + .batch(&tick, nondet); + let map_h2 = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(100 + n % 2) + ))) + .batch(&tick, nondet); + map_l1 + .filter_not_in(map_h2) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, keep, n)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_first_map_l_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let true_payloads = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + true, + n + ))); + let map_h1 = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(100 + n % 2) + ))) + .interleave(true_payloads) // The actual payloads that will pass the anti_join + .batch(&tick, nondet); + let map_l2 = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(n % 2 + 1) + ))) + .batch(&tick, nondet); + map_h1 + .filter_not_in(map_l2) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, keep, n)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_first_map_h_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let true_payloads = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + true, + n + ))); + let map_h1 = payloads + .clone() + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(100 + n % 2) + ))) + .interleave(true_payloads) // The actual payloads that will pass the anti_join + .batch(&tick, nondet); + let map_h2 = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + false, + self::sha256(100 + n % 2) + ))) + .batch(&tick, nondet); + map_h1 + .filter_not_in(map_h2) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, keep, n)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_l_map_l_first_payload_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + false_payloads + .clone() + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, n, keep)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_l_map_h_first_payload_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + false_payloads + .clone() + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, n, keep)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_map_l_first_payload_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + false_payloads + .clone() + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, n, keep)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_map_h_first_payload_second_anti_join<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + false_payloads + .clone() + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .map(q!(|(client_id, virt_client_id, n, _keep)| ( + client_id, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client_id, virt_client_id, n, keep)| { + if keep { + Some((client_id, (virt_client_id, n))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_l_first_payload_second_anti_join_map_l<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + + false_payloads + .clone() + .map(q!(|(client, virt_client_id, n, _keep)| ( + client, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.clone().batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client, virt_client_id, n, keep)| { + if keep { + Some((client, (virt_client_id, self::sha256(n % 2 + 1)))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_l_first_payload_second_anti_join_map_h<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + + false_payloads + .clone() + .map(q!(|(client, virt_client_id, n, _keep)| ( + client, + virt_client_id, + self::sha256(n % 2 + 1), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.clone().batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client, virt_client_id, n, keep)| { + if keep { + Some((client, (virt_client_id, self::sha256(100 + n % 2)))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_first_payload_second_anti_join_map_l<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + + false_payloads + .clone() + .map(q!(|(client, virt_client_id, n, _keep)| ( + client, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.clone().batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client, virt_client_id, n, keep)| { + if keep { + Some((client, (virt_client_id, self::sha256(n % 2 + 1)))) + } else { + None + } + })) + .into_keyed() +} + +pub fn map_h_first_payload_second_anti_join_map_h<'a>( + server: &Cluster<'a, Server>, + payloads: KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder>, +) -> KeyedStream, (u32, u32), Cluster<'a, Server>, Unbounded, NoOrder> { + let tick = server.tick(); + let nondet = nondet!(/** Test */); + + let false_payloads = payloads + .entries() + .map(q!(|(client_id, (virt_client_id, n))| ( + client_id, + virt_client_id, + n, + false + ))); + + false_payloads + .clone() + .map(q!(|(client, virt_client_id, n, _keep)| ( + client, + virt_client_id, + self::sha256(100 + n % 2), + true + ))) + .interleave(false_payloads.clone()) + .batch(&tick, nondet) + .filter_not_in(false_payloads.clone().batch(&tick, nondet)) + .all_ticks() + .filter_map(q!(|(client, virt_client_id, n, keep)| { + if keep { + Some((client, (virt_client_id, self::sha256(100 + n % 2)))) + } else { + None + } + })) + .into_keyed() +} diff --git a/hydro_optimize_examples/src/simple_graphs_bench.rs b/hydro_optimize_examples/src/simple_graphs_bench.rs new file mode 100644 index 0000000..57ae990 --- /dev/null +++ b/hydro_optimize_examples/src/simple_graphs_bench.rs @@ -0,0 +1,70 @@ +use hydro_lang::{prelude::{Cluster, Process}, nondet::nondet}; +use hydro_std::bench_client::{bench_client, print_bench_results}; + +use hydro_test::cluster::paxos_bench::inc_u32_workload_generator; +use stageleft::q; +use crate::simple_graphs::{Client, GraphFunction, Server, map_h_map_h_parallel_no_union}; +pub struct Aggregator; + +pub fn simple_graphs_bench<'a>( + num_clients_per_node: usize, + server: &Cluster<'a, Server>, + clients: &Cluster<'a, Client>, + client_aggregator: &Process<'a, Aggregator>, + graph: impl GraphFunction<'a>, +) { + let bench_results = bench_client( + clients, + inc_u32_workload_generator, + |payloads| { + graph( + server, + payloads + .broadcast_bincode(server, nondet!(/** Test */)) + .into(), + ) + .demux_bincode(clients) + .values() + }, + num_clients_per_node, + nondet!(/** bench */), + ); + + print_bench_results(bench_results, client_aggregator, clients); +} + +pub fn simple_graphs_bench_no_union<'a>( + num_clients_per_node: usize, + server: &Cluster<'a, Server>, + clients: &Cluster<'a, Client>, + client_aggregator: &Process<'a, Aggregator>, +) { + let bench_results = bench_client( + clients, + inc_u32_workload_generator, + |payloads| { + let payloads1 = payloads.clone().filter(q!(|(virt_client_id, _)| virt_client_id % 2 == 0)); + let payloads2 = payloads.filter(q!(|(virt_client_id, _)| virt_client_id % 2 == 1)); + let (batch0, batch1) = map_h_map_h_parallel_no_union( + server, + payloads1 + .broadcast_bincode(server, nondet!(/** Test */)) + .into(), + payloads2 + .broadcast_bincode(server, nondet!(/** Test */)) + .into(), + ); + let clients_batch0 = batch0 + .demux_bincode(clients) + .values(); + batch1 + .demux_bincode(clients) + .values() + .interleave(clients_batch0) + }, + num_clients_per_node, + nondet!(/** bench */), + ); + + print_bench_results(bench_results, client_aggregator, clients); +} \ No newline at end of file diff --git a/hydro_optimize_examples/src/simple_kv_bench.rs b/hydro_optimize_examples/src/simple_kv_bench.rs new file mode 100644 index 0000000..da8e1f9 --- /dev/null +++ b/hydro_optimize_examples/src/simple_kv_bench.rs @@ -0,0 +1,142 @@ +use hydro_lang::{ + location::Location, + nondet::nondet, + prelude::{Cluster, Process}, +}; +use hydro_std::bench_client::{bench_client, print_bench_results}; + +use hydro_test::cluster::paxos_bench::inc_u32_workload_generator; +use stageleft::q; + +pub struct Kv; +pub struct Client; +pub struct Aggregator; + +pub fn simple_kv_bench<'a>( + num_clients_per_node: usize, + kv: &Process<'a, Kv>, + clients: &Cluster<'a, Client>, + client_aggregator: &Process<'a, Aggregator>, +) { + let bench_results = bench_client( + clients, + inc_u32_workload_generator, + |payloads| { + let k_tick = kv.tick(); + let k_payloads = payloads.send_bincode(kv).batch(&k_tick, nondet!(/** TODO: Actually can use atomic() here, but there's no way to exit atomic in KeyedStreams? */)); + + // Insert each payload into the KV store + k_payloads + .clone() + .values() + .assume_ordering(nondet!(/** Last writer wins. TODO: Technically, we only need to assume ordering over the keyed stream (ordering of values with different keys doesn't matter. But there's no .persist() for KeyedStreams) */)) + .persist() + .into_keyed() + .reduce(q!(|prev, new| { + *prev = new; + })) + .entries() + .all_ticks() + .assume_ordering(nondet!(/** for_each does nothing, just need to end on a HydroLeaf */)) + .assume_retries(nondet!(/** for_each does nothing, just need to end on a HydroLeaf */)) + .for_each(q!(|_| {})); // Do nothing, just need to end on a HydroLeaf + + // Send committed requests back to the original client + k_payloads.all_ticks().demux_bincode(clients).into() + }, + num_clients_per_node, + nondet!(/** bench */), + ); + + print_bench_results(bench_results, client_aggregator, clients); +} + +#[cfg(test)] +mod tests { + use dfir_lang::graph::WriteConfig; + use hydro_build_utils::insta; + use hydro_deploy::Deployment; + use hydro_lang::{ + compile::ir::dbg_dedup_tee, + deploy::{DeployCrateWrapper, HydroDeploy, TrybuildHost}, + prelude::FlowBuilder, + }; + use std::str::FromStr; + + use regex::Regex; + + #[cfg(stageleft_runtime)] + use crate::simple_kv_bench::simple_kv_bench; + + #[test] + fn simple_kv_ir() { + let builder = FlowBuilder::new(); + let kv = builder.process(); + let clients = builder.cluster(); + let client_aggregator = builder.process(); + + simple_kv_bench(1, &kv, &clients, &client_aggregator); + let built = builder.with_default_optimize::(); + + dbg_dedup_tee(|| { + insta::assert_debug_snapshot!(built.ir()); + }); + + let preview = built.preview_compile(); + insta::with_settings!({snapshot_suffix => "kv_mermaid"}, { + insta::assert_snapshot!( + preview.dfir_for(&kv).to_mermaid(&WriteConfig { + no_subgraphs: true, + no_pull_push: true, + no_handoffs: true, + op_text_no_imports: true, + ..WriteConfig::default() + }) + ); + }); + } + + #[tokio::test] + async fn simple_kv_some_throughput() { + let builder = FlowBuilder::new(); + let kv = builder.process(); + let clients = builder.cluster(); + let client_aggregator = builder.process(); + + simple_kv_bench(1, &kv, &clients, &client_aggregator); + let mut deployment = Deployment::new(); + + let nodes = builder + .with_process(&kv, TrybuildHost::new(deployment.Localhost())) + .with_cluster(&clients, vec![TrybuildHost::new(deployment.Localhost())]) + .with_process( + &client_aggregator, + TrybuildHost::new(deployment.Localhost()), + ) + .deploy(&mut deployment); + + deployment.deploy().await.unwrap(); + + let client_node = &nodes.get_process(&client_aggregator); + let client_out = client_node.stdout_filter("Throughput:").await; + + deployment.start().await.unwrap(); + + let re = Regex::new(r"Throughput: ([^ ]+) - ([^ ]+) - ([^ ]+) requests/s").unwrap(); + let mut found = 0; + let mut client_out = client_out; + while let Some(line) = client_out.recv().await { + if let Some(caps) = re.captures(&line) { + if let Ok(lower) = f64::from_str(&caps[1]) { + if lower > 0.0 { + println!("Found throughput lower-bound: {}", lower); + found += 1; + if found == 2 { + break; + } + } + } + } + } + } +} diff --git a/hydro_optimize_examples/src/web_submit.rs b/hydro_optimize_examples/src/web_submit.rs new file mode 100644 index 0000000..2ba935b --- /dev/null +++ b/hydro_optimize_examples/src/web_submit.rs @@ -0,0 +1,406 @@ +use std::collections::{HashMap, HashSet}; + +use hydro_lang::{ + live_collections::stream::NoOrder, + location::{Location, MemberId}, + nondet::nondet, + prelude::{Process, Stream, Unbounded}, +}; +use sha2::{Digest, Sha256}; +use stageleft::q; + +pub struct Server {} + +/// Implementation of WebSubmit https://github.com/ms705/websubmit-rs/tree/master. +/// We expose the following APIs: +/// - add_lecture (takes api_key, lecture_id, lecture, only approves if user is admin) +/// - add_question (takes api_key, question, question_id, lecture_id, only approves if user is admin) +/// - add_user (takes user_email, is_admin, hashes user's email + secret, stores API key in table, emails them the key, should only approve if user is admin but it's tautological so just approve everyone) +/// - get_users (takes api_key, only approves if caller is admin, returns user_id, user_email, user_is_admin) +/// - list_lectures (takes api_key, returns lecture_id, lecture) +/// - list_lecture_questions_all (takes api_key & lecture_id, returns question, question_id, optional answer joining on answer_id = question_id, only approves if user is admin) +/// - list_lecture_questions_user (takes api_key & lecture_id, returns question, question_id, optional answer joining on answer_id = question_id if this user wrote the answer) +/// - add_answer (takes api_key, question_id, answer) +/// +/// Any call with an invalid API key (either it does not exist or does not have the privileges required) will not receive a response. +#[expect( + clippy::too_many_arguments, + clippy::type_complexity, + reason = "internal Web Submit code // TODO" +)] +pub fn web_submit<'a, Client>( + server: &Process<'a, Server>, + add_lecture: Stream< + (MemberId, (String, u32, String)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + add_question: Stream< + (MemberId, (String, String, u32, u32)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + add_user: Stream<(MemberId, (String, bool)), Process<'a, Server>, Unbounded, NoOrder>, + get_users: Stream<(MemberId, String), Process<'a, Server>, Unbounded, NoOrder>, + list_lectures: Stream<(MemberId, String), Process<'a, Server>, Unbounded, NoOrder>, + list_lecture_questions_all: Stream< + (MemberId, (String, u32)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + list_lecture_questions_user: Stream< + (MemberId, (String, u32)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + add_answer: Stream< + (MemberId, (String, u32, String)), + Process<'a, Server>, + Unbounded, + NoOrder, + >, +) -> ( + Stream<(MemberId, ()), Process<'a, Server>, Unbounded, NoOrder>, + Stream<(MemberId, ()), Process<'a, Server>, Unbounded, NoOrder>, + Stream<(MemberId, ()), Process<'a, Server>, Unbounded, NoOrder>, + Stream<(MemberId, HashMap), Process<'a, Server>, Unbounded, NoOrder>, + Stream<(MemberId, HashMap), Process<'a, Server>, Unbounded, NoOrder>, + Stream< + (MemberId, HashMap)>), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + Stream< + (MemberId, HashMap)>), + Process<'a, Server>, + Unbounded, + NoOrder, + >, + Stream<(MemberId, ()), Process<'a, Server>, Unbounded, NoOrder>, +) { + let user_auth_tick = server.tick(); + let lectures_tick = server.tick(); + let question_answer_tick = server.tick(); + + // Add user + let add_user_with_api_key = add_user.map(q!(|(client_id, (email, is_admin))| { + let api_key = self::generate_api_key(email.clone()); + (client_id, (email, is_admin, api_key)) + })); + let users_this_tick_with_api_key = add_user_with_api_key.batch( + &user_auth_tick, + nondet!(/** Snapshot current users to approve/deny access */), + ); + // Persisted users + let curr_users = users_this_tick_with_api_key + .clone() + .map(q!(|(_client_id, (email, is_admin, api_key))| ( + api_key, + (email, is_admin) + ))) + .persist(); + let curr_users_hashmap = curr_users.clone().fold_commutative_idempotent( + q!(|| HashMap::new()), + q!(|map, (_api_key, (email, is_admin))| { + map.insert(email, is_admin); + }), + ); + // Email the API key. Only done after the tick to ensure that once the client gets the email, the user has been added + users_this_tick_with_api_key + .clone() + .all_ticks() + .assume_ordering(nondet!(/** Email order doesn't matter */)) + .assume_retries(nondet!(/** At least once delivery is fine */)) + .for_each(q!(|(_client_id, (email, _is_admin, api_key))| { + self::send_email(api_key, email) + })); + // Send response back to client. Only done after the tick to ensure that once the client gets the response, the user has been added + let add_user_response = + users_this_tick_with_api_key.all_ticks().map(q!(|( + client_id, + (_email, _is_admin, _api_key), + )| (client_id, ()))); + + // Add lecture + let add_lecture_pre_join = + add_lecture.map(q!(|(client_id, (api_key, lecture_id, lecture))| { + (api_key, (client_id, lecture_id, lecture)) + })); + let lectures = add_lecture_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks() + .filter(q!(|( + _api_key, + ((_client_id, _lecture_id, _lecture), (_email, is_admin)), + )| *is_admin)); + let curr_lectures = + lectures.batch(&lectures_tick, nondet!(/** Snapshot of current lectures */)); + let curr_lectures_hashmap = curr_lectures.clone().persist().fold_commutative_idempotent( + q!(|| HashMap::new()), + q!( + |map, (_api_key, ((_client_id, lecture_id, lecture), (_email, _is_admin)))| { + map.insert(lecture_id, lecture); + } + ), + ); + // Only done after the lectures_tick to ensure that once the client gets the response, the lecture has been added + let add_lecture_response = curr_lectures.all_ticks().map(q!(|( + _api_key, + ((client_id, _lecture_id, _lecture), (_email, _is_admin)), + )| (client_id, ()))); + + // Add question + let add_question_pre_join = add_question.map(q!(|( + client_id, + (api_key, question, question_id, lecture_id), + )| { + (api_key, (client_id, question, question_id, lecture_id)) + })); + let add_question_auth = add_question_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks() + .filter(q!(|( + _api_key, + ((_client_id, _question, _question_id, _lecture_id), (_email, is_admin)), + )| *is_admin)); + let add_question_this_tick = add_question_auth.batch( + &question_answer_tick, + nondet!(/** Snapshot of current questions */), + ); + let curr_questions = add_question_this_tick + .clone() + .map(q!(|( + _api_key, + ((_client_id, question, question_id, lecture_id), (_email, _is_admin)), + )| (lecture_id, (question_id, question)))) + .persist(); + // Only done after the question_answer_tick to ensure that once the client gets the response, the question has been added + let add_question_response = add_question_this_tick.all_ticks().map(q!(|( + _api_key, + ((client_id, _question, _question_id, _lecture_id), (_email, _is_admin)), + )| (client_id, ()))); + + // Get users + let get_users_pre_join = get_users.map(q!(|(client_id, api_key)| (api_key, client_id))); + let get_users_response = get_users_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .filter_map(q!(|(_api_key, (client_id, (_email, is_admin)))| { + if is_admin { Some(client_id) } else { None } + })) + .cross_singleton(curr_users_hashmap) + .all_ticks(); + + // List lectures + let list_lectures_pre_join = list_lectures.map(q!(|(client_id, api_key)| (api_key, client_id))); + let list_lectures_auth = list_lectures_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks() + .map(q!(|(_api_key, (client_id, (_email, _is_admin)))| client_id)); + let list_lectures_response = list_lectures_auth + .batch( + &lectures_tick, + nondet!(/** Join with snapshot of current lectures */), + ) + .cross_singleton(curr_lectures_hashmap) + .all_ticks(); + + // Add answer + let add_answer_pre_join = add_answer.map(q!(|(client_id, (api_key, question_id, answer))| { + (api_key, (client_id, question_id, answer)) + })); + let add_answer_auth = add_answer_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks(); + let add_answer_this_tick = add_answer_auth.batch( + &question_answer_tick, + nondet!(/** Snapshot of current answers */), + ); + let curr_answers = add_answer_this_tick + .clone() + .map(q!(|( + api_key, + ((_client_id, question_id, answer), (_email, _is_admin)), + )| ((question_id, api_key), answer))) + .persist(); + // Only done after the question_answer_tick to ensure that once the client gets the response, the answer has been added + let add_answer_response = add_answer_this_tick.all_ticks().map(q!(|( + _api_key, + ((client_id, _question_id, _answer), (_email, _is_admin)), + )| (client_id, ()))); + + // List lecture questions all + let list_lecture_questions_all_pre_join = + list_lecture_questions_all.map(q!(|(client_id, (api_key, lecture_id))| { + (api_key, (client_id, lecture_id)) + })); + let list_lecture_questions_all_auth = list_lecture_questions_all_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks() + .filter_map(q!(|( + _api_key, + ((client_id, lecture_id), (_email, is_admin)), + )| { + if is_admin { + Some((lecture_id, client_id)) + } else { + None + } + })); + // Find all questions with that ID + let list_lecture_questions_all_question_only = list_lecture_questions_all_auth + .batch( + &question_answer_tick, + nondet!(/** Join with snapshot of current questions */), + ) + .join(curr_questions.clone()) + .map(q!(|(_lecture_id, (client_id, (question_id, question)))| ( + question_id, + (client_id, question) + ))); + // Don't need to join on api_key since we're getting all answers, regardless of who wrote them + let curr_answers_no_api_key = + curr_answers + .clone() + .map(q!(|((question_id, _api_key), answer)| ( + question_id, + answer + ))); + // Find all answers with the question ID + let list_lecture_questions_all_with_answer = list_lecture_questions_all_question_only + .clone() + .join(curr_answers_no_api_key.clone()) + .map(q!(|(question_id, ((client_id, question), answer))| { + (client_id, (question_id, question, Some(answer))) + })); + // Find all questions without answers + let list_lecture_questions_all_no_answer = list_lecture_questions_all_question_only + .anti_join(curr_answers_no_api_key.map(q!(|(question_id, _answer)| question_id))) + .map(q!(|(question_id, (client_id, question))| ( + client_id, + (question_id, question, None) + ))); + let list_lecture_questions_all_response = list_lecture_questions_all_with_answer + .chain(list_lecture_questions_all_no_answer) + .into_keyed() + .fold_commutative_idempotent( + q!(|| HashMap::new()), + q!(|map, (question_id, question, answer)| { + let (_question, set_of_answers) = + map.entry(question_id).or_insert((question, HashSet::new())); + if let Some(answer) = answer { + set_of_answers.insert(answer); + } + }), + ) + .entries() + .all_ticks(); + + // List lecture questions user + let list_lecture_questions_user_pre_join = + list_lecture_questions_user.map(q!(|(client_id, (api_key, lecture_id))| { + (api_key, (client_id, lecture_id)) + })); + let list_lecture_questions_user_auth = list_lecture_questions_user_pre_join + .batch( + &user_auth_tick, + nondet!(/** Compare against current users to approve/deny access */), + ) + .join(curr_users.clone()) + .all_ticks() + .map(q!(|( + api_key, + ((client_id, lecture_id), (_email, _is_admin)), + )| (lecture_id, (client_id, api_key)))); + let list_lecture_questions_user_question_only = list_lecture_questions_user_auth + .batch( + &question_answer_tick, + nondet!(/** Join with snapshot of current questions */), + ) + .join(curr_questions) + .map(q!(|( + _lecture_id, + ((client_id, api_key), (question_id, question)), + )| ( + (question_id, api_key), + (client_id, question) + ))); + // Find all answers with the question ID + let list_lecture_questions_user_with_answer = list_lecture_questions_user_question_only + .clone() + .join(curr_answers.clone()) + .map(q!(|( + (question_id, _api_key), + ((client_id, question), answer), + )| { + (client_id, (question_id, question, Some(answer))) + })); + // Find all questions without answers + let list_lecture_questions_user_no_answer = list_lecture_questions_user_question_only + .anti_join(curr_answers.map(q!(|(k, _)| k))) + .map(q!(|((question_id, _api_key), (client_id, question))| ( + client_id, + (question_id, question, None) + ))); + let list_lecture_questions_user_response = list_lecture_questions_user_with_answer + .chain(list_lecture_questions_user_no_answer) + .into_keyed() + .fold_commutative_idempotent( + q!(|| HashMap::new()), + q!(|map, (question_id, question, answer)| { + map.insert(question_id, (question, answer)); + }), + ) + .entries() + .all_ticks(); + + ( + add_lecture_response, + add_question_response, + add_user_response, + get_users_response, + list_lectures_response, + list_lecture_questions_all_response, + list_lecture_questions_user_response, + add_answer_response, + ) +} + +fn generate_api_key(email: String) -> String { + let secret = "There is no secret ingredient"; + let mut hasher = Sha256::new(); + hasher.update(email.as_bytes()); + hasher.update(secret.as_bytes()); + let hash = hasher.finalize(); + format!("{:x}", hash) +} + +fn send_email(_api_key: String, _email: String) {}