Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -685,24 +685,33 @@ public static PlanTableScanResponse planTableScan(
}

String planId = "sync-" + UUID.randomUUID();
planFilesFor(
configuredScan,
planId,
table.uuid().toString(),
tasksPerPlanTask.applyAsInt(configuredScan));
Pair<List<FileScanTask>, String> initial = IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
return PlanTableScanResponse.builder()
.withPlanStatus(PlanStatus.COMPLETED)
.withPlanId(planId)
.withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()))
.withFileScanTasks(initial.first())
.withDeleteFiles(
initial.first().stream()
.flatMap(task -> task.deletes().stream())
.distinct()
.collect(Collectors.toList()))
.withSpecsById(table.specs())
.build();
Pair<List<FileScanTask>, String> initial =
planFilesFor(
configuredScan,
planId,
table.uuid().toString(),
tasksPerPlanTask.applyAsInt(configuredScan));
List<String> nextPlanTasks =
initial.second() == null
? Collections.emptyList()
: IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second());
PlanTableScanResponse.Builder builder =
PlanTableScanResponse.builder()
.withPlanStatus(PlanStatus.COMPLETED)
.withPlanId(planId)
.withFileScanTasks(initial.first())
.withDeleteFiles(
initial.first().stream()
.flatMap(task -> task.deletes().stream())
.distinct()
.collect(Collectors.toList()))
.withSpecsById(table.specs());

if (!nextPlanTasks.isEmpty()) {
builder.withPlanTasks(nextPlanTasks);
}

return builder.build();
}

/**
Expand Down Expand Up @@ -807,22 +816,39 @@ static void clearPlanningState() {
* @param planId the unique identifier for this plan
* @param tableId the uuid of the table being scanned
* @param tasksPerPlanTask number of file scan tasks to group per plan task
* @return the initial file scan tasks and the first plan task key
*/
private static void planFilesFor(
private static Pair<List<FileScanTask>, String> planFilesFor(
Scan<?, FileScanTask, ?> scan, String planId, String tableId, int tasksPerPlanTask) {
Iterable<List<FileScanTask>> taskGroupings =
Iterables.partition(scan.planFiles(), tasksPerPlanTask);
Iterable<FileScanTask> planTasks = scan.planFiles();
String planTaskPrefix = planId + "-" + tableId + "-";

// Handle empty table scans
if (!planTasks.iterator().hasNext()) {
String planTaskKey = planTaskPrefix + "0";
// Add empty scan to planning state so async calls know the scan completed
IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, Collections.emptyList());
return Pair.of(Collections.emptyList(), planTaskKey);
}

Iterable<List<FileScanTask>> taskGroupings = Iterables.partition(planTasks, tasksPerPlanTask);
int planTaskSequence = 0;
String previousPlanTask = null;
String firstPlanTaskKey = null;
List<FileScanTask> initialFileScanTasks = null;
for (List<FileScanTask> taskGrouping : taskGroupings) {
String planTaskKey = String.format("%s-%s-%s", planId, tableId, planTaskSequence++);
String planTaskKey = planTaskPrefix + planTaskSequence++;
IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
if (previousPlanTask != null) {
IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, planTaskKey);
} else {
firstPlanTaskKey = planTaskKey;
initialFileScanTasks = taskGrouping;
}

previousPlanTask = planTaskKey;
}
return Pair.of(initialFileScanTasks, firstPlanTaskKey);
}

@SuppressWarnings("FutureReturnValueIgnored")
Expand Down