Skip to content

Commit 0d4d3a5

Browse files
singhpk234sfc-gh-prsinghamogh-jahagirdar
authored
Core: Fix Async Planning handling in RESTCatalogAdapter for Remote Scan Planning (#14629)
Co-authored-by: Prashant Singh <[email protected]> Co-authored-by: Amogh Jahagirdar <[email protected]>
1 parent 06c1e0a commit 0d4d3a5

File tree

2 files changed

+81
-6
lines changed

2 files changed

+81
-6
lines changed

core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Optional;
3232
import java.util.Set;
3333
import java.util.UUID;
34+
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.Executors;
3637
import java.util.concurrent.atomic.AtomicBoolean;
@@ -661,7 +662,7 @@ public static PlanTableScanResponse planTableScan(
661662
tableScan = tableScan.caseSensitive(request.caseSensitive());
662663

663664
if (shouldPlanAsync.test(tableScan)) {
664-
String asyncPlanId = UUID.randomUUID().toString();
665+
String asyncPlanId = "async-" + UUID.randomUUID();
665666
asyncPlanFiles(tableScan, asyncPlanId, tasksPerPlanTask.applyAsInt(tableScan));
666667
return PlanTableScanResponse.builder()
667668
.withPlanId(asyncPlanId)
@@ -670,11 +671,12 @@ public static PlanTableScanResponse planTableScan(
670671
.build();
671672
}
672673

673-
String planId = UUID.randomUUID().toString();
674+
String planId = "sync-" + UUID.randomUUID();
674675
planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
675676
Pair<List<FileScanTask>, String> initial = IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
676677
return PlanTableScanResponse.builder()
677678
.withPlanStatus(PlanStatus.COMPLETED)
679+
.withPlanId(planId)
678680
.withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()))
679681
.withFileScanTasks(initial.first())
680682
.withDeleteFiles(
@@ -697,6 +699,11 @@ public static PlanTableScanResponse planTableScan(
697699
public static FetchPlanningResultResponse fetchPlanningResult(
698700
Catalog catalog, TableIdentifier ident, String planId) {
699701
Table table = catalog.loadTable(ident);
702+
PlanStatus status = IN_MEMORY_PLANNING_STATE.asyncPlanStatus(planId);
703+
if (status != PlanStatus.COMPLETED) {
704+
return FetchPlanningResultResponse.builder().withPlanStatus(status).build();
705+
}
706+
700707
Pair<List<FileScanTask>, String> initial = IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
701708
return FetchPlanningResultResponse.builder()
702709
.withPlanStatus(PlanStatus.COMPLETED)
@@ -743,12 +750,11 @@ public static FetchScanTasksResponse fetchScanTasks(
743750
* @param planId the plan identifier to cancel
744751
*/
745752
public static void cancelPlanTableScan(String planId) {
746-
IN_MEMORY_PLANNING_STATE.removePlan(planId);
753+
IN_MEMORY_PLANNING_STATE.cancelPlan(planId);
747754
}
748755

749756
static void clearPlanningState() {
750757
InMemoryPlanningState.getInstance().clear();
751-
ASYNC_PLANNING_POOL.shutdown();
752758
}
753759

754760
/**
@@ -775,8 +781,22 @@ private static void planFilesFor(TableScan tableScan, String planId, int tasksPe
775781
}
776782
}
777783

784+
@SuppressWarnings("FutureReturnValueIgnored")
778785
private static void asyncPlanFiles(
779786
TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
780-
ASYNC_PLANNING_POOL.execute(() -> planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask));
787+
IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
788+
CompletableFuture.runAsync(
789+
() -> {
790+
planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
791+
},
792+
ASYNC_PLANNING_POOL)
793+
.whenComplete(
794+
(result, exception) -> {
795+
if (exception != null) {
796+
IN_MEMORY_PLANNING_STATE.markAsyncPlanFailed(asyncPlanId);
797+
} else {
798+
IN_MEMORY_PLANNING_STATE.markAsyncPlanAsComplete(asyncPlanId);
799+
}
800+
});
781801
}
782802
}

core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iceberg.FileScanTask;
2626
import org.apache.iceberg.exceptions.NoSuchPlanIdException;
2727
import org.apache.iceberg.exceptions.NoSuchPlanTaskException;
28+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2829
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
2930
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3031
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -43,10 +44,12 @@ class InMemoryPlanningState {
4344

4445
private final Map<String, List<FileScanTask>> planTaskToFileScanTasks;
4546
private final Map<String, String> planTaskToNext;
47+
private final Map<String, PlanStatus> asyncPlanningStates;
4648

4749
private InMemoryPlanningState() {
4850
this.planTaskToFileScanTasks = Maps.newConcurrentMap();
4951
this.planTaskToNext = Maps.newConcurrentMap();
52+
this.asyncPlanningStates = Maps.newConcurrentMap();
5053
}
5154

5255
static InMemoryPlanningState getInstance() {
@@ -68,6 +71,44 @@ void addNextPlanTask(String currentTask, String nextTask) {
6871
planTaskToNext.put(currentTask, nextTask);
6972
}
7073

74+
void addAsyncPlan(String plan) {
75+
PlanStatus existingStatus = asyncPlanningStates.get(plan);
76+
Preconditions.checkArgument(
77+
existingStatus == null, "Plan %s already exists with status %s", plan, existingStatus);
78+
asyncPlanningStates.put(plan, PlanStatus.SUBMITTED);
79+
}
80+
81+
PlanStatus asyncPlanStatus(String plan) {
82+
PlanStatus existingStatus = asyncPlanningStates.get(plan);
83+
if (existingStatus == null) {
84+
throw new NoSuchPlanIdException("Cannot find plan with id %s", plan);
85+
}
86+
87+
return asyncPlanningStates.get(plan);
88+
}
89+
90+
void markAsyncPlanAsComplete(String plan) {
91+
PlanStatus existingStatus = asyncPlanningStates.get(plan);
92+
Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s", plan);
93+
Preconditions.checkArgument(
94+
existingStatus == PlanStatus.SUBMITTED,
95+
"Cannot mark plan %s as completed as it is %s",
96+
plan,
97+
existingStatus);
98+
asyncPlanningStates.put(plan, PlanStatus.COMPLETED);
99+
}
100+
101+
void markAsyncPlanFailed(String plan) {
102+
PlanStatus existingStatus = asyncPlanningStates.get(plan);
103+
Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s", plan);
104+
Preconditions.checkArgument(
105+
existingStatus == PlanStatus.SUBMITTED,
106+
"Cannot mark plan %s as completed as it is %s",
107+
plan,
108+
existingStatus);
109+
asyncPlanningStates.put(plan, PlanStatus.FAILED);
110+
}
111+
71112
List<FileScanTask> fileScanTasksForPlanTask(String planTaskKey) {
72113
List<FileScanTask> tasks = planTaskToFileScanTasks.get(planTaskKey);
73114
if (tasks == null) {
@@ -120,13 +161,27 @@ Pair<List<FileScanTask>, String> initialScanTasksFor(String planId) {
120161
return Pair.of(initialEntry.getValue(), initialEntry.getKey());
121162
}
122163

123-
void removePlan(String planId) {
164+
void cancelPlan(String planId) {
124165
planTaskToNext.entrySet().removeIf(entry -> entry.getKey().contains(planId));
125166
planTaskToFileScanTasks.entrySet().removeIf(entry -> entry.getKey().contains(planId));
167+
// Clear the ongoing plan status in case the planID is an async one.
168+
if (asyncPlanningStates.containsKey(planId)) {
169+
PlanStatus existingStatus = asyncPlanningStates.get(planId);
170+
// No need to fail cancellation if the plan could not be found
171+
if (existingStatus == null) {
172+
return;
173+
}
174+
175+
// No need to fail cancellation if the plan already terminated
176+
if (existingStatus == PlanStatus.SUBMITTED) {
177+
asyncPlanningStates.put(planId, PlanStatus.CANCELLED);
178+
}
179+
}
126180
}
127181

128182
void clear() {
129183
planTaskToFileScanTasks.clear();
130184
planTaskToNext.clear();
185+
asyncPlanningStates.clear();
131186
}
132187
}

0 commit comments

Comments
 (0)