From 95e4eaa75172f084b5ceade2f4d1b8b57d7bfbaf Mon Sep 17 00:00:00 2001 From: Gabriel Musat Mestre Date: Thu, 23 Oct 2025 13:29:38 +0200 Subject: [PATCH] Return input task count in NetworkBoundary --- src/distributed_planner/network_boundary.rs | 3 +++ src/execution_plans/network_coalesce.rs | 7 +++++++ src/execution_plans/network_shuffle.rs | 7 +++++++ 3 files changed, 17 insertions(+) diff --git a/src/distributed_planner/network_boundary.rs b/src/distributed_planner/network_boundary.rs index 2fd25cf..8053a53 100644 --- a/src/distributed_planner/network_boundary.rs +++ b/src/distributed_planner/network_boundary.rs @@ -35,6 +35,9 @@ pub trait NetworkBoundary: ExecutionPlan { input_tasks: usize, ) -> datafusion::common::Result>; + /// Returns the input tasks assigned to this [NetworkBoundary]. + fn input_task_count(&self) -> usize; + /// Called when a [Stage] is correctly formed. The [NetworkBoundary] can use this /// information to perform any internal transformations necessary for distributed execution. /// diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index 9db7f59..5eea8d0 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -176,6 +176,13 @@ impl NetworkBoundary for NetworkCoalesceExec { } })) } + + fn input_task_count(&self) -> usize { + match self { + Self::Pending(v) => v.input_tasks, + Self::Ready(v) => v.input_stage.tasks.len(), + } + } } impl DisplayAs for NetworkCoalesceExec { diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 5769bfc..175617a 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -203,6 +203,13 @@ impl NetworkBoundary for NetworkShuffleExec { })) } + fn input_task_count(&self) -> usize { + match self { + Self::Pending(v) => v.input_tasks, + Self::Ready(v) => v.input_stage.tasks.len(), + } + } + fn with_input_stage( &self, input_stage: Stage,