@@ -685,24 +685,33 @@ public static PlanTableScanResponse planTableScan(
685685 }
686686
687687 String planId = "sync-" + UUID .randomUUID ();
688- planFilesFor (
689- configuredScan ,
690- planId ,
691- table .uuid ().toString (),
692- tasksPerPlanTask .applyAsInt (configuredScan ));
693- Pair <List <FileScanTask >, String > initial = IN_MEMORY_PLANNING_STATE .initialScanTasksFor (planId );
694- return PlanTableScanResponse .builder ()
695- .withPlanStatus (PlanStatus .COMPLETED )
696- .withPlanId (planId )
697- .withPlanTasks (IN_MEMORY_PLANNING_STATE .nextPlanTask (initial .second ()))
698- .withFileScanTasks (initial .first ())
699- .withDeleteFiles (
700- initial .first ().stream ()
701- .flatMap (task -> task .deletes ().stream ())
702- .distinct ()
703- .collect (Collectors .toList ()))
704- .withSpecsById (table .specs ())
705- .build ();
688+ Pair <List <FileScanTask >, String > initial =
689+ planFilesFor (
690+ configuredScan ,
691+ planId ,
692+ table .uuid ().toString (),
693+ tasksPerPlanTask .applyAsInt (configuredScan ));
694+ List <String > nextPlanTasks =
695+ initial .second () == null
696+ ? Collections .emptyList ()
697+ : IN_MEMORY_PLANNING_STATE .nextPlanTask (initial .second ());
698+ PlanTableScanResponse .Builder builder =
699+ PlanTableScanResponse .builder ()
700+ .withPlanStatus (PlanStatus .COMPLETED )
701+ .withPlanId (planId )
702+ .withFileScanTasks (initial .first ())
703+ .withDeleteFiles (
704+ initial .first ().stream ()
705+ .flatMap (task -> task .deletes ().stream ())
706+ .distinct ()
707+ .collect (Collectors .toList ()))
708+ .withSpecsById (table .specs ());
709+
710+ if (!nextPlanTasks .isEmpty ()) {
711+ builder .withPlanTasks (nextPlanTasks );
712+ }
713+
714+ return builder .build ();
706715 }
707716
708717 /**
@@ -807,22 +816,39 @@ static void clearPlanningState() {
807816 * @param planId the unique identifier for this plan
808817 * @param tableId the uuid of the table being scanned
809818 * @param tasksPerPlanTask number of file scan tasks to group per plan task
819+ * @return the initial file scan tasks and the first plan task key
810820 */
811- private static void planFilesFor (
821+ private static Pair < List < FileScanTask >, String > planFilesFor (
812822 Scan <?, FileScanTask , ?> scan , String planId , String tableId , int tasksPerPlanTask ) {
813- Iterable <List <FileScanTask >> taskGroupings =
814- Iterables .partition (scan .planFiles (), tasksPerPlanTask );
823+ Iterable <FileScanTask > planTasks = scan .planFiles ();
824+ String planTaskPrefix = planId + "-" + tableId + "-" ;
825+
826+ // Handle empty table scans
827+ if (!planTasks .iterator ().hasNext ()) {
828+ String planTaskKey = planTaskPrefix + "0" ;
829+ // Add empty scan to planning state so async calls know the scan completed
830+ IN_MEMORY_PLANNING_STATE .addPlanTask (planTaskKey , Collections .emptyList ());
831+ return Pair .of (Collections .emptyList (), planTaskKey );
832+ }
833+
834+ Iterable <List <FileScanTask >> taskGroupings = Iterables .partition (planTasks , tasksPerPlanTask );
815835 int planTaskSequence = 0 ;
816836 String previousPlanTask = null ;
837+ String firstPlanTaskKey = null ;
838+ List <FileScanTask > initialFileScanTasks = null ;
817839 for (List <FileScanTask > taskGrouping : taskGroupings ) {
818- String planTaskKey = String . format ( "%s-%s-%s" , planId , tableId , planTaskSequence ++) ;
840+ String planTaskKey = planTaskPrefix + planTaskSequence ++;
819841 IN_MEMORY_PLANNING_STATE .addPlanTask (planTaskKey , taskGrouping );
820842 if (previousPlanTask != null ) {
821843 IN_MEMORY_PLANNING_STATE .addNextPlanTask (previousPlanTask , planTaskKey );
844+ } else {
845+ firstPlanTaskKey = planTaskKey ;
846+ initialFileScanTasks = taskGrouping ;
822847 }
823848
824849 previousPlanTask = planTaskKey ;
825850 }
851+ return Pair .of (initialFileScanTasks , firstPlanTaskKey );
826852 }
827853
828854 @ SuppressWarnings ("FutureReturnValueIgnored" )
0 commit comments