diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/BUILD.bazel b/examples/rust/cycle-benchmark-orchstrator-feo/BUILD.bazel new file mode 100644 index 0000000..f279d59 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/BUILD.bazel @@ -0,0 +1,40 @@ +load("@cargo//:defs.bzl", "all_crate_deps") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library") + +rust_library( + name = "libcycle_benchmark_rust", + srcs = [ + "src/activities.rs", + "src/config.rs", + "src/lib.rs", + ], + crate_name = "cycle_benchmark", + visibility = ["//visibility:public"], + deps = [ + "//feo:libfeo_recording_rust", + "//feo-com:libfeo_com_rust", + "//feo-log:libfeo_log_rust", + "//feo-logger:libfeo_logger_rust", + "//feo-time:libfeo_time_rust", + "//feo-tracing:libfeo_tracing_rust", + "@cargo//:postcard", + "@cargo//:serde", + "@cargo//:serde_json", +], +) + +rust_binary( + name = "cycle_bench", + srcs = [ + "src/bin/cycle_bench.rs", + ], + visibility = ["//visibility:public"], + deps = [ + ":libcycle_benchmark_rust", + "//feo:libfeo_recording_rust", + "//feo-log:libfeo_log_rust", + "//feo-logger:libfeo_logger_rust", + "//feo-time:libfeo_time_rust", + "//feo-tracing:libfeo_tracing_rust", + ], +) diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/Cargo.toml b/examples/rust/cycle-benchmark-orchstrator-feo/Cargo.toml new file mode 100644 index 0000000..9cd8bab --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "cycle-benchmark" +version = "0.1.0" +edition = "2021" + +[[bin]] +path = "src/bin/cycle_bench.rs" +name = "cycle_bench" +required-features = [] + +[dependencies] +feo = { workspace = true } +feo-log = { workspace = true } +feo-logger = { workspace = true } +feo-time = { workspace = true } +feo-tracing = { workspace = true } +tracing = { workspace = true } +postcard = { workspace = true, features = ["experimental-derive"] } +serde = { workspace = true } +serde_json = { workspace = true } + +async_runtime = { workspace = true } +orchestration = { workspace = true } +logging_tracing = { workspace = true } +foundation = { workspace = true } + +[build-dependencies] + +[features] +default = [] diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/README.md b/examples/rust/cycle-benchmark-orchstrator-feo/README.md new file mode 100644 index 0000000..cb29919 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/README.md @@ -0,0 +1,23 @@ +# Cycle time benchmark + +A simple configurable benchmarking application to measure the FEO cycle time in various signalling and +single- or multi-agent setups. + +## Running + +Adapt the config file `config/cycle_bench.json` to reflect your desired setup. Then run one or more +instances of the application in different terminals: + +```sh +cargo run --release --bin cycle_bench -- [TARGET_CYCLE_TIME] +``` + +The first command line parameter specifies the agent ID of the process to be started. The second parameter defines +the the target FEO cycle time for the scheduler. It is only relevant for the primary agent and defaults to 5ms if +not given. + +If the specified agent ID is equal to the primary ID defined in the config file, the above command will start the +primary agent. If it is equal to one of the secondary agent IDs (i.e. an agent ID in the config that is not +equal to the primary ID), it will start a secondary agent. Finally, if the agent ID matches one of the +recorder IDs from the config file, a recorder will be started. + diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench.json b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench.json new file mode 100644 index 0000000..d15dac1 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench.json @@ -0,0 +1,28 @@ +{ + "cycle_time_ms": 0, + "primary_agent": 1000, + "agent_assignments": { + "1000": + [10, + [ 1, 11, 21, + 101, + 31, 41, 51, + 102, + 61, 71, 81, + 103]] + }, + "activity_graph": + [ [[1], true], + [[11], true], + [[21], true], + [[101], false], + [[31], true], + [[41], true], + [[51], true], + [[102], false], + [[61], true], + [[71], true], + [[81], true], + [[103], false] + ] +} \ No newline at end of file diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_4.json b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_4.json new file mode 100644 index 0000000..495a4f0 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_4.json @@ -0,0 +1,34 @@ +{ + "cycle_time_ms": 0, + "primary_agent": 1000, + "agent_assignments": { + "1000": + [3, + [ 1, 2, 3, 4, 5, + 11, 12, 13, 14, 15, + 21, 22, 23, 24, 25, + 101, + 31, 32, 33, 34, 35, + 41, 42, 43, 44, 45, + 51, 52, 53, 54, 55, + 102, + 61, 62, 63, 64, 65, + 71, 72, 73, 74, 75, + 81, 82, 83, 84, 85, + 103]] + }, + "activity_graph": + [ [[1, 2, 3, 4, 5], true], + [[11, 12, 13, 14, 15], true], + [[21, 22, 23, 24, 25], true], + [[101], false], + [[31, 32, 33, 34, 35], true], + [[41, 42, 43, 44, 45], true], + [[51, 52, 53, 54, 55], true], + [[102], false], + [[61, 62, 63, 64, 65], true], + [[71, 72, 73, 74, 75], true], + [[81, 82, 83, 84, 85], true], + [[103], false] + ] +} \ No newline at end of file diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_5.json b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_5.json new file mode 100644 index 0000000..d15dac1 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_5.json @@ -0,0 +1,28 @@ +{ + "cycle_time_ms": 0, + "primary_agent": 1000, + "agent_assignments": { + "1000": + [10, + [ 1, 11, 21, + 101, + 31, 41, 51, + 102, + 61, 71, 81, + 103]] + }, + "activity_graph": + [ [[1], true], + [[11], true], + [[21], true], + [[101], false], + [[31], true], + [[41], true], + [[51], true], + [[102], false], + [[61], true], + [[71], true], + [[81], true], + [[103], false] + ] +} \ No newline at end of file diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_6.json b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_6.json new file mode 100644 index 0000000..e458522 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/config/cycle_bench_6.json @@ -0,0 +1,31 @@ +{ + "cycle_time_ms": 0, + "primary_agent": 1000, + "agent_assignments": { + "1000": + [10, + [ 1, 21, + 101, + 31, 51, + 102, + 61, 81, + 103]], + "1001": + [10, + [ 11, 41, 71]] + }, + "activity_graph": + [ [[1], true], + [[11], true], + [[21], true], + [[101], false], + [[31], true], + [[41], true], + [[51], true], + [[102], false], + [[61], true], + [[71], true], + [[81], true], + [[103], false] + ] +} \ No newline at end of file diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/src/activities.rs b/examples/rust/cycle-benchmark-orchstrator-feo/src/activities.rs new file mode 100644 index 0000000..0390236 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/src/activities.rs @@ -0,0 +1,70 @@ +// Copyright 2025 Accenture. +// +// SPDX-License-Identifier: Apache-2.0 + +use crate::runtime_adapters::{get_activity_name, ActivityAdapterTrait}; +use feo::activity::{Activity, ActivityId}; +use feo_tracing::{instrument, tracing, Level}; +use orchestration::prelude::ActionResult; +use std::future::Future; +use std::sync::{Arc, Mutex}; +use tracing::event; + +/// This is a dummy activity that does nothing. +#[derive(Debug)] +pub struct DummyActivity { + /// ID of the activity + activity_id: ActivityId, + activity_name: &'static str, +} + +impl DummyActivity { + pub fn build(activity_id: &ActivityId) -> Self { + Self { + activity_id: *activity_id, + activity_name: get_activity_name(activity_id.into()), + } + } +} + +impl Activity for DummyActivity { + fn id(&self) -> ActivityId { + self.activity_id + } + + #[instrument(name = "Activity startup")] + fn startup(&mut self) {} + + #[instrument(name = "Activity step")] + fn step(&mut self) { + event!(Level::INFO, id = self.activity_name); + } + + #[instrument(name = "Activity shutdown")] + fn shutdown(&mut self) {} +} + +impl ActivityAdapterTrait for DummyActivity { + type T = DummyActivity; + + fn step_runtime(instance: Arc>) -> impl Future + Send { + async move { + instance.lock().unwrap().step(); + Ok(()) + } + } + + fn start(&mut self) -> ActionResult { + self.startup(); + Ok(()) + } + + fn stop(&mut self) -> ActionResult { + self.shutdown(); + Ok(()) + } + + fn get_named_id(&self) -> &'static str { + self.activity_name + } +} diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/src/bin/cycle_bench.rs b/examples/rust/cycle-benchmark-orchstrator-feo/src/bin/cycle_bench.rs new file mode 100644 index 0000000..57d86f1 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/src/bin/cycle_bench.rs @@ -0,0 +1,236 @@ +// Copyright 2025 Accenture. +// +// SPDX-License-Identifier: Apache-2.0 + +use async_runtime::runtime::runtime::AsyncRuntimeBuilder; +use async_runtime::scheduler::execution_engine::ExecutionEngineBuilder; +use cycle_benchmark::activities::DummyActivity; +use cycle_benchmark::config::ApplicationConfig; +use cycle_benchmark::runtime_adapters::{ + activity_into_invokes, get_activity_name, get_agent_name, init_activity_ids, init_agent_ids, + ActivityDetails, GlobalOrchestrator, LocalFeoAgent, +}; +use feo::prelude::ActivityId; +use feo_log::info; +use feo_time::Duration; +use foundation::threading::thread_wait_barrier::ThreadWaitBarrier; +use logging_tracing::prelude::*; +use orchestration::prelude::Event; +use std::collections::HashMap; +use std::string::ToString; +use std::sync::{Arc, Mutex}; +// TODO: number of threads + +const DEFAULT_FEO_CYCLE_TIME: Duration = Duration::from_millis(5); + +// Finish the program after this number of cycles +const NUM_FEO_CYCLES: usize = 1000000; + +// log level +const LOG_LEVEL: Level = Level::ERROR; // same as for tracing + +fn main() { + // Uncomment one or both of the following lines for benchmarking with logging/tracing + // feo_logger::init(feo_log::LevelFilter::Debug, true, true); // TODO: not working in orchestrator? + // feo_tracing::init(feo_tracing::LevelFilter::INFO); + + // // Initialize LogMode with AppScope + // let mut logger = TracingLibraryBuilder::new() + // .global_log_level(LOG_LEVEL) + // .enable_tracing(TraceScope::SystemScope) + // .enable_logging(true) + // .build(); + // + // logger.init_log_trace(); + + let params = Params::from_args(); + let app_config = ApplicationConfig::load(); + + // initialize static maps of agent names and activity names for orchestrator + let activity_names: HashMap = app_config + .all_activities() + .iter() + .map(|id| (*id, id.to_string())) + .collect(); + let agent_names: HashMap = app_config + .all_agents() + .iter() + .map(|id| (*id, id.to_string())) + .collect(); + init_activity_ids(activity_names); + init_agent_ids(agent_names); + + if params.agent_id == app_config.primary() { + run_as_primary(params, app_config); + } else if app_config.secondaries().contains(¶ms.agent_id) { + run_as_secondary(params, app_config); + } else { + eprintln!( + "ERROR: Agent id {} not defined in system configuration", + params.agent_id + ); + } +} + +fn run_as_primary(params: Params, app_config: ApplicationConfig) { + println!("Starting primary agent {}", params.agent_id); + + let agent_id = params.agent_id; + let (local_activity_sequence, concurrency) = app_config.sequence_and_concurrency(agent_id); + + println!("Agent activity sequence: {:?}", local_activity_sequence); + println!("Agent concurrency: {:?}", concurrency); + + let mut runtime = AsyncRuntimeBuilder::new() + .with_engine( + ExecutionEngineBuilder::new() + .task_queue_size(256) + .workers(app_config.num_threads(agent_id)), + ) + .build() + .unwrap(); + + Event::get_instance() + .lock() + .unwrap() + .create_polling_thread(); + + // Since runtime `enter_engine` is now not blocking, we do it manually here. + let waiter = Arc::new(ThreadWaitBarrier::new(1)); + let notifier = waiter.get_notifier().unwrap(); + + let activity_graph_names: Vec<(Vec<&'static str>, bool)> = app_config + .activity_graph() + .into_iter() + .map(|(ids, parallel)| { + let ids: Vec<&'static str> = ids.iter().map(|id| get_activity_name(*id)).collect(); + (ids, parallel) + }) + .collect(); + + println!("activity graph: {:?}", activity_graph_names); + + let agent_names: Vec = app_config + .all_agents() + .iter() + .map(|id| get_agent_name(*id).to_string()) + .collect(); + + runtime + .enter_engine(async move { + // VEC of activities(s) which has to be executed in sequence, TRUE: if the activities(s) can be executed concurrently. + + let local_agent_program = async_runtime::spawn(async move { + let acts = activities(&app_config, &agent_id); + let mut agent = LocalFeoAgent::new(acts, get_agent_name(agent_id)); + let mut program = agent.create_program(); + println!("{:?}", program); + + program.run().await; + }); + + let global_orch = + GlobalOrchestrator::new(agent_names, params.feo_cycle_time, NUM_FEO_CYCLES); + + let execution_structure = activity_graph_names; + global_orch.run(&execution_structure).await; + local_agent_program.await; + + notifier.ready(); + }) + .unwrap_or_default(); + + waiter + .wait_for_all(Duration::new(2000, 0)) + .unwrap_or_default(); +} + +fn run_as_secondary(params: Params, app_config: ApplicationConfig) { + println!("Starting secondary agent {}", params.agent_id); + + let agent_id = params.agent_id; + let (activity_sequence, concurrency) = app_config.sequence_and_concurrency(agent_id); + + println!("Agent activity sequence: {:?}", activity_sequence); + println!("Agent concurrency: {:?}", concurrency); + + let mut runtime = AsyncRuntimeBuilder::new() + .with_engine( + ExecutionEngineBuilder::new() + .task_queue_size(256) + .workers(2), + ) + .build() + .unwrap(); + + Event::get_instance() + .lock() + .unwrap() + .create_polling_thread(); + + // Since runtime `enter_engine` is now not blocking, we do it manually here. + let waiter = Arc::new(ThreadWaitBarrier::new(1)); + let notifier = waiter.get_notifier().unwrap(); + + runtime + .enter_engine(async move { + let acts = activities(&app_config, &agent_id); + let mut agent = LocalFeoAgent::new(acts, get_agent_name(agent_id)); + let mut program = agent.create_program(); + println!("{:?}", program); + + program.run().await; + info!("Finished"); + notifier.ready(); + }) + .unwrap_or_default(); + + waiter + .wait_for_all(Duration::new(2000, 0)) + .unwrap_or_default(); +} + +fn activities(cfg: &ApplicationConfig, agent_id: &usize) -> Vec { + cfg.activities(*agent_id) + .iter() + .map(|id| { + activity_into_invokes(&Arc::new(Mutex::new(DummyActivity::build( + &ActivityId::from(*id), + )))) + }) + .collect() +} + +/// Parameters of the primary +struct Params { + /// Agent ID + agent_id: usize, + + /// Cycle time in milli seconds + feo_cycle_time: Duration, +} + +impl Params { + fn from_args() -> Self { + let args: Vec = std::env::args().collect(); + + // First argument is the ID of this agent + let agent_id = args + .get(1) + .and_then(|x| x.parse::().ok()) + .expect("Missing or invalid agent id"); + + // Second argument is the cycle time in milli seconds, e.g. 30 or 2500, + // only needed for primary agent, ignored for secondaries + let feo_cycle_time = args + .get(2) + .and_then(|x| x.parse::().ok()) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_FEO_CYCLE_TIME); + + Self { + agent_id, + feo_cycle_time, + } + } +} diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/src/config.rs b/examples/rust/cycle-benchmark-orchstrator-feo/src/config.rs new file mode 100644 index 0000000..a700c00 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/src/config.rs @@ -0,0 +1,178 @@ +// Copyright 2025 Accenture. +// +// SPDX-License-Identifier: Apache-2.0 + +use serde::Deserialize; +use serde_json; +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::io::BufReader; +use std::path::Path; + +/// Agent assignments +/// +/// For each agent id, a number of threads and set of activity ids running on that agent. +pub type AgentAssignments = HashMap)>; + +/// Graph of activity dependencies +/// +/// A sequence of activity chains with parallel-execution flags +pub type ActivityGraph = Vec<(Vec, bool)>; + +/// Path of the config file relative to this file +static CONFIG_PATH: &str = "../config/cycle_bench.json"; + +/// Configuration of the benchmark application +#[derive(Deserialize)] +pub struct ApplicationConfig { + /// FEO cycle time in ms + cycle_time_ms: u64, + /// ID of primary agent + primary_agent: usize, + /// Agent assignments + /// + /// For each agent id, a number of threads and set of activity ids running on that agent. + agent_assignments: AgentAssignments, + /// Graph of activity dependencies + /// + /// A sequence of activity chains with parallel-execution flags + activity_graph: ActivityGraph, +} + +impl ApplicationConfig { + pub fn load() -> Self { + application_config() + } + + pub fn cycle_time(&self) -> u64 { + self.cycle_time_ms + } + + pub fn primary(&self) -> usize { + self.primary_agent + } + + pub fn secondaries(&self) -> Vec { + self.agent_assignments + .keys() + .cloned() + .filter(|id| *id != self.primary_agent) + .collect() + } + + pub fn all_agents(&self) -> Vec { + self.agent_assignments.keys().cloned().collect() + } + + pub fn num_threads(&self, agent_id: usize) -> usize { + self.agent_assignments + .get(&agent_id) + .expect("agent id missing in config") + .0 + } + + pub fn agent_assignments(&self) -> AgentAssignments { + self.agent_assignments.clone() + } + + pub fn activity_graph(&self) -> ActivityGraph { + self.activity_graph.clone() + } + + pub fn activities(&self, agent_id: usize) -> HashSet { + self.agent_assignments() + .get(&agent_id) + .expect("agent id missing in config") + .1 + .clone() + } + + pub fn all_activities(&self) -> HashSet { + self.agent_assignments + .values() + .flat_map(|(_, activity_id)| activity_id.iter()) + .copied() + .collect() + } + + pub fn sequence_and_concurrency(&self, agent_id: usize) -> (Vec, Vec) { + // determine activities on this agent + let activities = self.activities(agent_id); + + let mut sequence: Vec = vec![]; + let mut concurrency: Vec = vec![]; + for (program, _) in self.activity_graph() { + let mut is_first: bool = true; + for id in program { + if activities.contains(&id) { + sequence.push(id); + concurrency.push(is_first); // first program entry true, then false + if is_first { + is_first = false; + } + } + } + } + (sequence, concurrency) + } +} + +fn application_config() -> ApplicationConfig { + let config_file = Path::new(file!()) + .parent() + .unwrap() + .join(CONFIG_PATH) + .canonicalize() + .unwrap(); + println!("Reading configuration from {}", config_file.display()); + + let file = + File::open(config_file).unwrap_or_else(|e| panic!("failed to open config file: {e}")); + let reader = BufReader::new(file); + + // Read the JSON file to an instance of `RawConfig`. + let config: ApplicationConfig = serde_json::from_reader(reader) + .unwrap_or_else(|e| panic!("failed to parse config file: {e}")); + + check_consistency(&config); + config +} + +fn check_consistency(config: &ApplicationConfig) { + // do consistency check wrt primary agent id + assert!( + config.agent_assignments.contains_key(&config.primary_agent), + "Primary agent ID not listed in agent assignments" + ); + + // do basic consistency checks wrt activity ids + let vec_activities_agents: Vec = config + .agent_assignments + .values() + .flat_map(|(_, activity_id)| activity_id.iter()) + .copied() + .collect(); + + let mut all_activities_agents: HashSet = Default::default(); + for aid in vec_activities_agents { + let is_new = all_activities_agents.insert(aid); + assert!(is_new, "duplicate activity {aid} in agent assignments"); + } + + let vec_activities_graph: Vec = config + .activity_graph() + .iter() + .flat_map(|(aids, _)| aids.iter().copied()) + .collect(); + + let mut all_activities_graph: HashSet = Default::default(); + for aid in vec_activities_graph { + let is_new = all_activities_graph.insert(aid); + assert!(is_new, "duplicate activity {aid} in activity graph"); + } + + assert_eq!( + all_activities_agents, all_activities_graph, + "Set of activities assigned to agents does not match set of activities in activity graph" + ); +} diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/src/lib.rs b/examples/rust/cycle-benchmark-orchstrator-feo/src/lib.rs new file mode 100644 index 0000000..b6ef06e --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright 2025 Accenture. +// +// SPDX-License-Identifier: Apache-2.0 + +//! Signalling-benchmark + +pub mod activities; +pub mod config; + +pub mod runtime_adapters; diff --git a/examples/rust/cycle-benchmark-orchstrator-feo/src/runtime_adapters.rs b/examples/rust/cycle-benchmark-orchstrator-feo/src/runtime_adapters.rs new file mode 100644 index 0000000..c92d464 --- /dev/null +++ b/examples/rust/cycle-benchmark-orchstrator-feo/src/runtime_adapters.rs @@ -0,0 +1,344 @@ +// Copyright (c) 2025 Qorix GmbH +// +// This program and the accompanying materials are made available under the +// terms of the Apache License, Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: Apache-2.0 +// + +// +// Well known issues: +// - currently activity must be hidden behind Mutex - subject to be lifted +// - !Send issues due to iceoryx +// - ... +// +use feo::activity::ActivityId; +use feo::prelude::AgentId; +use logging_tracing::prelude::*; +use orchestration::prelude::*; +use orchestration::program::{Program, ProgramBuilder}; +use std::collections::HashMap; +use std::sync::OnceLock; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +pub struct ActivityDetails { + binded_hooks: ( + Option>, + Option>, + Option>, + ), + + name: &'static str, +} + +static ACTIVITY_NAMES: OnceLock> = OnceLock::new(); +static AGENT_NAMES: OnceLock> = OnceLock::new(); + +pub fn init_activity_ids(names: HashMap) { + ACTIVITY_NAMES.set(names).unwrap(); +} + +pub fn init_agent_ids(names: HashMap) { + AGENT_NAMES.set(names).unwrap(); +} + +pub fn get_activity_name(id: usize) -> &'static str { + ACTIVITY_NAMES + .get() + .expect("static ids uninitialized") + .get(&id) + .expect("id out of bounds") +} + +pub fn get_agent_name(id: usize) -> &'static str { + AGENT_NAMES + .get() + .expect("static ids uninitialized") + .get(&id) + .expect("id out of bounds") +} + +/// +/// Returns startup, step, shutdown for activity as invoke actions +/// +pub fn activity_into_invokes(obj: &Arc>) -> ActivityDetails +where + T: 'static + Send + ActivityAdapterTrait, +{ + let start = Invoke::from_arc(obj.clone(), T::start); + let step = Invoke::from_arc_mtx(obj.clone(), T::step_runtime); + let stop = Invoke::from_arc(obj.clone(), T::stop); + ActivityDetails { + binded_hooks: (Some(start), Some(step), Some(stop)), + name: obj.lock().unwrap().get_named_id(), + } +} + +/// +/// Responsible to react on request coming from primary process +/// +pub struct LocalFeoAgent { + activities: Vec, + agent_name: &'static str, +} + +impl LocalFeoAgent { + pub fn new(activities: Vec, agent_name: &'static str) -> Self { + Self { + activities, + agent_name, + } + } + + pub fn create_program(&mut self) -> Program { + let mut program = ProgramBuilder::new("local"); + + program = program.with_startup_hook(self.create_startup()); + program = program.with_body(self.create_body()); + program = program.with_shutdown_notification(self.create_shutdown_notification()); + program = program.with_shutdown_hook(self.create_shutdown()); + + program.build() + } + + fn create_startup(&mut self) -> Box { + let mut seq = Sequence::new() + .with_step(Trigger::new(format!("{}_alive", self.agent_name).as_str())) + .with_step(Sync::new( + format!("{}_waiting_startup", self.agent_name).as_str(), + )); + + let mut concurrent = Concurrency::new(); + + // startups from al activities + for e in &mut self.activities { + concurrent = concurrent.with_branch(e.binded_hooks.0.take().unwrap()); + } + + seq = seq.with_step(concurrent); + seq.with_step(Trigger::new( + format!("{}_startup_done", self.agent_name).as_str(), + )) + } + + fn create_body(&mut self) -> Box { + let mut concurrent = Concurrency::new(); + + for e in &mut self.activities { + concurrent = concurrent.with_branch( + Sequence::new() + .with_step(Sync::new(format!("{}_start", e.name).as_str())) + .with_step(e.binded_hooks.1.take().unwrap()) + .with_step(Trigger::new(format!("{}_done", e.name).as_str())), + ); + } + + concurrent + } + + fn create_shutdown_notification(&mut self) -> Box { + let seq = Sequence::new().with_step(Sync::new( + format!("{}_waiting_shutdown", self.agent_name).as_str(), + )); + + seq + } + + fn create_shutdown(&mut self) -> Box { + let mut seq = Sequence::new(); + + let mut concurrent = Concurrency::new(); + + // shutdown from all activities + for e in &mut self.activities { + concurrent = concurrent.with_branch(e.binded_hooks.2.take().unwrap()); + } + + seq = seq.with_step(concurrent); + seq.with_step(Trigger::new( + format!("{}_shutdown_done", self.agent_name).as_str(), + )) + } +} + +/// +/// Responsible for controlling Task Chain execution across processes according to provided configuration +/// +pub struct GlobalOrchestrator { + agents: Vec, + cycle: Duration, + num_iters: usize, +} + +impl GlobalOrchestrator { + pub fn new(agents: Vec, cycle: Duration, num_iters: usize) -> Self { + Self { + agents, + cycle, + num_iters, + } + } + + pub async fn run(&self, graph: &Vec<(Vec<&str>, bool)>) { + let mut program = ProgramBuilder::new("main") + .with_startup_hook(self.startup()) + .with_body(self.generate_body(&graph)) + .with_shutdown_notification(self.orch_shutdown_notification()) + .with_shutdown_hook(self.shutdown()) + .with_cycle_time(self.cycle) + .build(); + + info!("Executor starts syncing with agents and execution of activity chain 20 times for demo..."); + info!("{:?}", program); + + program.run_n(self.num_iters).await; + + info!("Done"); + } + + fn sync_to_agents(&self) -> Box { + let mut top = Concurrency::new_with_id(NamedId::new_static("sync_to_agents")); + + for name in &self.agents { + let sub_sequence = Sync::new(format!("{}_alive", name).as_str()); + + top = top.with_branch(sub_sequence); + } + + top + } + + fn release_agents(&self) -> Box { + let mut top = Sequence::new_with_id(NamedId::new_static("release_agents")); + + for name in &self.agents { + let sub_sequence = Trigger::new(format!("{}_waiting_startup", name).as_str()); + + top = top.with_step(sub_sequence); + } + + top + } + + fn wait_startup_completed(&self) -> Box { + let mut top = Sequence::new_with_id(NamedId::new_static("wait_startup_completed")); + + for name in &self.agents { + let sub_sequence = Sync::new(format!("{}_startup_done", name).as_str()); + + top = top.with_step(sub_sequence); + } + + top + } + + fn startup(&self) -> Box { + let seq = Sequence::new_with_id(NamedId::new_static("startup")) + .with_step(self.sync_to_agents()) + .with_step(self.release_agents()) + .with_step(self.wait_startup_completed()); + + seq + } + + fn shutdown_agents(&self) -> Box { + let mut top = Sequence::new_with_id(NamedId::new_static("shutdown_agents")); + + for name in &self.agents { + let sub_sequence = Trigger::new(format!("{}_waiting_shutdown", name).as_str()); + + top = top.with_step(sub_sequence); + } + + top + } + + fn wait_shutdown_completed(&self) -> Box { + let mut top = Sequence::new_with_id(NamedId::new_static("wait_shutdown_completed")); + + for name in &self.agents { + let sub_sequence = Sync::new(format!("{}_shutdown_done", name).as_str()); + + top = top.with_step(sub_sequence); + } + + top + } + + fn shutdown(&self) -> Box { + let seq = Sequence::new_with_id(NamedId::new_static("shutdown")) + .with_step(self.shutdown_agents()) + .with_step(self.wait_shutdown_completed()); + + seq + } + + // This can be used to stop orchestration from another application for demo. + fn orch_shutdown_notification(&self) -> Box { + let seq = Sequence::new_with_id(NamedId::new_static("shutdown")) + .with_step(Sync::new("qorix_orch_shutdown_event")); + + seq + } + + // Converts a dependency graph into an execution sequence. + fn generate_body(&self, execution_structure: &Vec<(Vec<&str>, bool)>) -> Box { + let mut sequence = Sequence::new(); // The overall execution sequence + let mut concurrency_action = Concurrency::new(); + + let mut concurrent_block_added = false; + + for task_group in execution_structure { + if task_group.1 == false { + // Add the concurrency block into sequence + if concurrent_block_added { + sequence = sequence.with_step(concurrency_action); + concurrency_action = Concurrency::new(); + concurrent_block_added = false; + } + // sequence + let action = self.generate_step(task_group.0.clone()); + sequence = sequence.with_step(action); + } else { + // concurrency block + let action = self.generate_step(task_group.0.clone()); + concurrency_action = concurrency_action.with_branch(action); + concurrent_block_added = true; + } + } + if concurrent_block_added { + sequence = sequence.with_step(concurrency_action); + } + sequence + } + + fn generate_step(&self, names: Vec<&str>) -> Box { + let mut sequence = Sequence::new(); + for name in names { + sequence = sequence + .with_step(Trigger::new(format!("{}_start", name).as_str())) + .with_step(Sync::new(format!("{}_done", name).as_str())); + } + return sequence; + } +} + +pub trait ActivityAdapterTrait: Send { + type T; // Activity Type + + /// + /// This let you use async context in step function so You are free now to use non blocking sleep, non blocking wait on IO etc. + /// There is no problem to create trait with plain `fn` but then async context is lost for activity + /// + fn step_runtime( + instance: Arc>, + ) -> impl std::future::Future + Send; + + fn start(&mut self) -> ActionResult; + + fn stop(&mut self) -> ActionResult; + + fn get_named_id(&self) -> &'static str; +}