|
26 | 26 | import java.time.OffsetDateTime; |
27 | 27 | import java.time.ZoneOffset; |
28 | 28 | import java.util.Collections; |
| 29 | +import java.util.Iterator; |
29 | 30 | import java.util.List; |
30 | 31 | import java.util.Map; |
31 | 32 | import java.util.Optional; |
@@ -672,20 +673,30 @@ public static PlanTableScanResponse planTableScan( |
672 | 673 | } |
673 | 674 |
|
674 | 675 | String planId = "sync-" + UUID.randomUUID(); |
675 | | - planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan)); |
676 | | - Pair<List<FileScanTask>, String> initial = IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId); |
677 | | - return PlanTableScanResponse.builder() |
678 | | - .withPlanStatus(PlanStatus.COMPLETED) |
679 | | - .withPlanId(planId) |
680 | | - .withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second())) |
681 | | - .withFileScanTasks(initial.first()) |
682 | | - .withDeleteFiles( |
683 | | - initial.first().stream() |
684 | | - .flatMap(task -> task.deletes().stream()) |
685 | | - .distinct() |
686 | | - .collect(Collectors.toList())) |
687 | | - .withSpecsById(table.specs()) |
688 | | - .build(); |
| 676 | + Pair<List<FileScanTask>, String> initial = |
| 677 | + planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan)); |
| 678 | + List<String> nextPlanTasks = |
| 679 | + initial.second() == null |
| 680 | + ? Collections.emptyList() |
| 681 | + : IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()); |
| 682 | + |
| 683 | + PlanTableScanResponse.Builder builder = |
| 684 | + PlanTableScanResponse.builder() |
| 685 | + .withPlanStatus(PlanStatus.COMPLETED) |
| 686 | + .withPlanId(planId) |
| 687 | + .withFileScanTasks(initial.first()) |
| 688 | + .withDeleteFiles( |
| 689 | + initial.first().stream() |
| 690 | + .flatMap(task -> task.deletes().stream()) |
| 691 | + .distinct() |
| 692 | + .collect(Collectors.toList())) |
| 693 | + .withSpecsById(table.specs()); |
| 694 | + |
| 695 | + if (!nextPlanTasks.isEmpty()) { |
| 696 | + builder.withPlanTasks(nextPlanTasks); |
| 697 | + } |
| 698 | + |
| 699 | + return builder.build(); |
689 | 700 | } |
690 | 701 |
|
691 | 702 | /** |
@@ -763,28 +774,45 @@ static void clearPlanningState() { |
763 | 774 | * @param tableScan the table scan to plan |
764 | 775 | * @param planId the unique identifier for this plan |
765 | 776 | * @param tasksPerPlanTask number of file scan tasks to group per plan task |
| 777 | + * @return the initial file scan tasks and the first plan task key |
766 | 778 | */ |
767 | | - private static void planFilesFor(TableScan tableScan, String planId, int tasksPerPlanTask) { |
| 779 | + private static Pair<List<FileScanTask>, String> planFilesFor( |
| 780 | + TableScan tableScan, String planId, int tasksPerPlanTask) { |
768 | 781 | Iterable<List<FileScanTask>> taskGroupings = |
769 | 782 | Iterables.partition(tableScan.planFiles(), tasksPerPlanTask); |
| 783 | + Iterator<List<FileScanTask>> taskIterator = taskGroupings.iterator(); |
| 784 | + String planTaskKeyPrefix = planId + "-" + tableScan.table().uuid() + "-"; |
| 785 | + |
| 786 | + if (!taskIterator.hasNext()) { |
| 787 | + // Handle empty table scans |
| 788 | + String emptyPlanTaskKey = planTaskKeyPrefix + 0; |
| 789 | + IN_MEMORY_PLANNING_STATE.addPlanTask(emptyPlanTaskKey, Collections.emptyList()); |
| 790 | + return Pair.of(Collections.emptyList(), null); |
| 791 | + } |
| 792 | + |
770 | 793 | int planTaskSequence = 0; |
771 | 794 | String previousPlanTask = null; |
772 | | - String planTaskKeyPrefix = planId + "-" + tableScan.table().uuid() + "-"; |
773 | | - for (List<FileScanTask> taskGrouping : taskGroupings) { |
| 795 | + String firstPlanTaskKey = null; |
| 796 | + List<FileScanTask> initialFileScanTasks = null; |
| 797 | + |
| 798 | + do { |
| 799 | + List<FileScanTask> taskGrouping = taskIterator.next(); |
774 | 800 | String planTaskKey = planTaskKeyPrefix + planTaskSequence++; |
775 | 801 | IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping); |
| 802 | + |
| 803 | + if (firstPlanTaskKey == null) { |
| 804 | + firstPlanTaskKey = planTaskKey; |
| 805 | + initialFileScanTasks = taskGrouping; |
| 806 | + } |
| 807 | + |
776 | 808 | if (previousPlanTask != null) { |
777 | 809 | IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, planTaskKey); |
778 | 810 | } |
779 | 811 |
|
780 | 812 | previousPlanTask = planTaskKey; |
781 | | - } |
| 813 | + } while (taskIterator.hasNext()); |
782 | 814 |
|
783 | | - // If a scan produces no file tasks, create a single empty plan task |
784 | | - if (planTaskSequence == 0) { |
785 | | - String planTaskKey = planTaskKeyPrefix + planTaskSequence; |
786 | | - IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, Collections.emptyList()); |
787 | | - } |
| 815 | + return Pair.of(initialFileScanTasks, firstPlanTaskKey); |
788 | 816 | } |
789 | 817 |
|
790 | 818 | @SuppressWarnings("FutureReturnValueIgnored") |
|
0 commit comments