diff --git a/src/distributed_planner/network_boundary.rs b/src/distributed_planner/network_boundary.rs index 2fd25cf..8053a53 100644 --- a/src/distributed_planner/network_boundary.rs +++ b/src/distributed_planner/network_boundary.rs @@ -35,6 +35,9 @@ pub trait NetworkBoundary: ExecutionPlan { input_tasks: usize, ) -> datafusion::common::Result>; + /// Returns the input tasks assigned to this [NetworkBoundary]. + fn input_task_count(&self) -> usize; + /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this /// information to perform any internal transformations necessary for distributed execution. /// diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index 9db7f59..5eea8d0 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -176,6 +176,13 @@ impl NetworkBoundary for NetworkCoalesceExec { } })) } + + fn input_task_count(&self) -> usize { + match self { + Self::Pending(v) => v.input_tasks, + Self::Ready(v) => v.input_stage.tasks.len(), + } + } } impl DisplayAs for NetworkCoalesceExec { diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 5769bfc..175617a 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -203,6 +203,13 @@ impl NetworkBoundary for NetworkShuffleExec { })) } + fn input_task_count(&self) -> usize { + match self { + Self::Pending(v) => v.input_tasks, + Self::Ready(v) => v.input_stage.tasks.len(), + } + } + fn with_input_stage( &self, input_stage: Stage,