Skip to content

Commit 08b1ce4

Browse files
ident check
1 parent a10c43f commit 08b1ce4

File tree

1 file changed

+194
-120
lines changed

1 file changed

+194
-120
lines changed

core/src/main/java/org/apache/iceberg/RESTTableScan.java

Lines changed: 194 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.apache.iceberg;
2020

21-
import java.time.Duration;
2221
import java.util.List;
2322
import java.util.Locale;
2423
import java.util.Map;
@@ -38,8 +37,12 @@
3837
import org.apache.iceberg.rest.responses.PlanTableScanResponse;
3938
import org.apache.iceberg.types.Types;
4039
import org.apache.iceberg.util.ParallelIterable;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
4142

4243
class RESTTableScan extends DataTableScan implements AutoCloseable {
44+
private static final Logger LOG = LoggerFactory.getLogger(RESTTableScan.class);
45+
4346
private final RESTClient client;
4447
private final String path;
4548
private final Supplier<Map<String, String>> headers;
@@ -50,7 +53,6 @@ class RESTTableScan extends DataTableScan implements AutoCloseable {
5053

5154
// Plan ID lifecycle management
5255
private final AtomicReference<String> activePlanId = new AtomicReference<>();
53-
private final Duration maxPlanningTimeout = Duration.ofMinutes(10); // Configurable timeout
5456

5557
// TODO revisit if this property should be configurable
5658
private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000;
@@ -133,12 +135,6 @@ public CloseableIterable<FileScanTask> planFiles() {
133135
}
134136

135137
private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planTableScanRequest) {
136-
ParserContext context =
137-
ParserContext.builder()
138-
.add("specsById", table.specs())
139-
.add("caseSensitive", context().caseSensitive())
140-
.build();
141-
142138
PlanTableScanResponse response =
143139
client.post(
144140
resourcePaths.planTableScan(tableIdentifier),
@@ -147,146 +143,194 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
147143
headers.get(),
148144
ErrorHandlers.defaultErrorHandler(),
149145
stringStringMap -> {},
150-
context);
146+
createParserContext());
147+
148+
return handleInitialPlanStatus(response.planStatus(), response);
149+
}
150+
151+
private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
152+
activePlanId.set(planId);
153+
154+
try {
155+
while (true) {
156+
FetchPlanningResultResponse response =
157+
client.get(
158+
resourcePaths.fetchPlanningResult(tableIdentifier, planId),
159+
Map.of(),
160+
FetchPlanningResultResponse.class,
161+
headers.get(),
162+
ErrorHandlers.defaultErrorHandler(),
163+
createParserContext());
164+
165+
CloseableIterable<FileScanTask> result =
166+
handlePlanningStatus(response.planStatus(), planId, response);
167+
if (result != null) {
168+
return result;
169+
}
170+
}
171+
} catch (Exception e) {
172+
// Ensure cleanup on any exception
173+
cancelPlanningWithId(planId);
174+
throw e;
175+
}
176+
}
177+
178+
private CloseableIterable<FileScanTask> handlePlanningStatus(
179+
PlanStatus planStatus, String planId, FetchPlanningResultResponse response) {
151180

152-
PlanStatus planStatus = response.planStatus();
153181
switch (planStatus) {
154182
case COMPLETED:
183+
activePlanId.compareAndSet(planId, null);
155184
return getScanTasksIterable(response.planTasks(), response.fileScanTasks());
185+
186+
case SUBMITTED:
187+
handleSubmittedStatus(planId);
188+
return null; // Continue polling
189+
190+
case FAILED:
191+
activePlanId.compareAndSet(planId, null);
192+
throw new RuntimeException(
193+
"Received \"failed\" status from service when fetching a table scan");
194+
195+
case CANCELLED:
196+
activePlanId.compareAndSet(planId, null);
197+
throw new RuntimeException(
198+
String.format(
199+
Locale.ROOT,
200+
"Received \"cancelled\" status from service when fetching a table scan, planId: %s is invalid",
201+
planId));
202+
203+
default:
204+
throw new RuntimeException(
205+
String.format(
206+
Locale.ROOT, "Invalid planStatus during fetchPlanningResult: %s", planStatus));
207+
}
208+
}
209+
210+
private void handleSubmittedStatus(String planId) {
211+
try {
212+
Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
213+
} catch (InterruptedException e) {
214+
Thread.currentThread().interrupt();
215+
cancelPlanningWithId(planId);
216+
throw new RuntimeException("Interrupted while fetching plan status", e);
217+
}
218+
}
219+
220+
private CloseableIterable<FileScanTask> handleInitialPlanStatus(
221+
PlanStatus planStatus, PlanTableScanResponse response) {
222+
223+
switch (planStatus) {
224+
case COMPLETED:
225+
return getScanTasksIterable(response.planTasks(), response.fileScanTasks());
226+
156227
case SUBMITTED:
157228
return fetchPlanningResult(response.planId());
229+
158230
case FAILED:
159231
throw new IllegalStateException(
160232
"Received \"failed\" status from service when planning a table scan");
233+
161234
case CANCELLED:
162235
throw new IllegalStateException(
163236
"Received \"cancelled\" status from service when planning a table scan");
237+
164238
default:
165239
throw new RuntimeException(
166240
String.format(Locale.ROOT, "Invalid planStatus during planTableScan: %s", planStatus));
167241
}
168242
}
169243

170-
private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
171-
// Track the active plan ID for cleanup
172-
activePlanId.set(planId);
244+
public CloseableIterable<FileScanTask> getScanTasksIterable(
245+
List<String> planTasks, List<FileScanTask> fileScanTasks) {
246+
247+
if (isInputEmpty(planTasks, fileScanTasks)) {
248+
return CloseableIterable.empty();
249+
}
173250

174-
long startTime = System.currentTimeMillis();
175-
long timeoutMs = maxPlanningTimeout.toMillis();
251+
validateDependencies();
176252

177253
try {
178-
// we need to inject specById map here and also the caseSensitive
179-
ParserContext context =
180-
ParserContext.builder()
181-
.add("specsById", table.specs())
182-
.add("caseSensitive", context().caseSensitive())
183-
.build();
254+
List<ScanTasksIterable> iterables = Lists.newArrayList();
184255

185-
while (true) {
186-
// Check timeout
187-
long elapsed = System.currentTimeMillis() - startTime;
188-
if (elapsed > timeoutMs) {
189-
cancelPlanningWithId(planId);
190-
throw new RuntimeException(
191-
String.format(Locale.ROOT, "Plan %s timed out after %d ms", planId, elapsed));
192-
}
256+
addFileScanTaskIterables(fileScanTasks, iterables);
257+
addPlanTaskIterables(planTasks, iterables);
258+
259+
return combineIterables(iterables);
193260

194-
FetchPlanningResultResponse response =
195-
client.get(
196-
resourcePaths.fetchPlanningResult(tableIdentifier, planId),
197-
Map.of(),
198-
FetchPlanningResultResponse.class,
199-
headers.get(),
200-
ErrorHandlers.defaultErrorHandler(),
201-
context);
202-
203-
PlanStatus planStatus = response.planStatus();
204-
switch (planStatus) {
205-
case COMPLETED:
206-
// Clear plan ID on successful completion
207-
activePlanId.compareAndSet(planId, null);
208-
return getScanTasksIterable(response.planTasks(), response.fileScanTasks());
209-
case SUBMITTED:
210-
try {
211-
Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
212-
} catch (InterruptedException e) {
213-
Thread.currentThread().interrupt();
214-
// Cancel plan on interruption
215-
cancelPlanningWithId(planId);
216-
throw new RuntimeException("Interrupted while fetching plan status", e);
217-
}
218-
break;
219-
case FAILED:
220-
// Clear plan ID on failure (server handles cleanup)
221-
activePlanId.compareAndSet(planId, null);
222-
throw new RuntimeException(
223-
"Received \"failed\" status from service when fetching a table scan");
224-
case CANCELLED:
225-
// Clear plan ID on cancellation
226-
activePlanId.compareAndSet(planId, null);
227-
throw new RuntimeException(
228-
String.format(
229-
Locale.ROOT,
230-
"Received \"cancelled\" status from service when fetching a table scan, planId: %s is invalid",
231-
planId));
232-
default:
233-
throw new RuntimeException(
234-
String.format(
235-
Locale.ROOT, "Invalid planStatus during fetchPlanningResult: %s", planStatus));
236-
}
237-
}
238261
} catch (Exception e) {
239-
// Ensure cleanup on any exception
240-
cancelPlanningWithId(planId);
241-
throw e;
262+
LOG.error("Failed to create scan tasks iterable", e);
263+
throw new RuntimeException("Failed to create scan tasks iterable", e);
242264
}
243265
}
244266

245-
public CloseableIterable<FileScanTask> getScanTasksIterable(
246-
List<String> planTasks, List<FileScanTask> fileScanTasks) {
247-
List<ScanTasksIterable> iterableOfScanTaskIterables = Lists.newArrayList();
248-
if (fileScanTasks != null) {
249-
// add this to the list for below if planTasks will also be present
250-
ScanTasksIterable scanTasksIterable =
251-
new ScanTasksIterable(
252-
fileScanTasks,
253-
client,
254-
resourcePaths,
255-
tableIdentifier,
256-
headers,
257-
planExecutor(),
258-
table.specs(),
259-
isCaseSensitive());
260-
iterableOfScanTaskIterables.add(scanTasksIterable);
267+
private boolean isInputEmpty(List<String> planTasks, List<FileScanTask> fileScanTasks) {
268+
boolean isEmpty =
269+
(planTasks == null || planTasks.isEmpty())
270+
&& (fileScanTasks == null || fileScanTasks.isEmpty());
271+
if (isEmpty) {
272+
LOG.debug("Both planTasks and fileScanTasks are null or empty, returning empty iterable");
273+
}
274+
return isEmpty;
275+
}
276+
277+
private void validateDependencies() {
278+
if (client == null) {
279+
throw new IllegalStateException("RESTClient is null");
261280
}
262-
if (planTasks != null) {
263-
// Use parallel iterable since planTasks are present
264-
for (String planTask : planTasks) {
265-
ScanTasksIterable iterable =
266-
new ScanTasksIterable(
267-
planTask,
268-
client,
269-
resourcePaths,
270-
tableIdentifier,
271-
headers,
272-
planExecutor(),
273-
table.specs(),
274-
isCaseSensitive());
275-
iterableOfScanTaskIterables.add(iterable);
281+
if (resourcePaths == null) {
282+
throw new IllegalStateException("ResourcePaths is null");
283+
}
284+
if (tableIdentifier == null) {
285+
throw new IllegalStateException("TableIdentifier is null");
286+
}
287+
}
288+
289+
private void addFileScanTaskIterables(
290+
List<FileScanTask> fileScanTasks, List<ScanTasksIterable> iterables) {
291+
if (fileScanTasks != null && !fileScanTasks.isEmpty()) {
292+
LOG.debug("Creating ScanTasksIterable for {} file scan tasks", fileScanTasks.size());
293+
ScanTasksIterable scanTasksIterable = createScanTasksIterable(fileScanTasks);
294+
iterables.add(scanTasksIterable);
295+
}
296+
}
297+
298+
private void addPlanTaskIterables(List<String> planTasks, List<ScanTasksIterable> iterables) {
299+
if (planTasks == null || planTasks.isEmpty()) {
300+
return;
301+
}
302+
303+
LOG.debug("Creating ScanTasksIterables for {} plan tasks", planTasks.size());
304+
305+
for (String planTask : planTasks) {
306+
if (planTask == null || planTask.trim().isEmpty()) {
307+
LOG.warn("Skipping null or empty plan task");
308+
continue;
309+
}
310+
311+
try {
312+
ScanTasksIterable iterable = createScanTasksIterable(planTask);
313+
iterables.add(iterable);
314+
} catch (Exception e) {
315+
LOG.error("Failed to create ScanTasksIterable for plan task: {}", planTask, e);
316+
throw new RuntimeException(
317+
"Failed to create ScanTasksIterable for plan task: " + planTask, e);
276318
}
277-
return new ParallelIterable<>(iterableOfScanTaskIterables, planExecutor());
278-
// another idea is to keep concatenating to the original parallel iterable???
279319
}
280-
// use a single scanTasks iterable since no need to parallelize since no planTasks
281-
return new ScanTasksIterable(
282-
fileScanTasks,
283-
client,
284-
resourcePaths,
285-
tableIdentifier,
286-
headers,
287-
planExecutor(),
288-
table.specs(),
289-
isCaseSensitive());
320+
}
321+
322+
private CloseableIterable<FileScanTask> combineIterables(List<ScanTasksIterable> iterables) {
323+
324+
if (iterables.isEmpty()) {
325+
LOG.warn("No valid iterables found, returning empty iterable");
326+
return CloseableIterable.empty();
327+
}
328+
329+
if (iterables.size() == 1) {
330+
return iterables.get(0);
331+
}
332+
333+
return new ParallelIterable<>(iterables, planExecutor());
290334
}
291335

292336
/**
@@ -310,7 +354,6 @@ private void cancelPlanningWithId(String planId) {
310354
}
311355

312356
try {
313-
// Call the cancel endpoint - this is a DELETE request that returns 204 (no content)
314357
client.delete(
315358
resourcePaths.cancelPlanning(tableIdentifier, planId),
316359
null, // 204 response has no content
@@ -322,7 +365,7 @@ private void cancelPlanningWithId(String planId) {
322365
} catch (Exception e) {
323366
// Log but don't throw - cancel is best effort for cleanup
324367
// The server will eventually clean up abandoned plans
325-
System.err.println("Warning: Failed to cancel plan " + planId + ": " + e.getMessage());
368+
LOG.warn("Failed to cancel plan {}: {}", planId, e.getMessage(), e);
326369
}
327370
}
328371

@@ -334,4 +377,35 @@ private void cancelPlanningWithId(String planId) {
334377
public void close() {
335378
cancelPlanning();
336379
}
380+
381+
private ParserContext createParserContext() {
382+
return ParserContext.builder()
383+
.add("specsById", table.specs())
384+
.add("caseSensitive", context().caseSensitive())
385+
.build();
386+
}
387+
388+
private ScanTasksIterable createScanTasksIterable(List<FileScanTask> fileScanTasks) {
389+
return new ScanTasksIterable(
390+
fileScanTasks,
391+
client,
392+
resourcePaths,
393+
tableIdentifier,
394+
headers,
395+
planExecutor(),
396+
table.specs(),
397+
isCaseSensitive());
398+
}
399+
400+
private ScanTasksIterable createScanTasksIterable(String planTask) {
401+
return new ScanTasksIterable(
402+
planTask,
403+
client,
404+
resourcePaths,
405+
tableIdentifier,
406+
headers,
407+
planExecutor(),
408+
table.specs(),
409+
isCaseSensitive());
410+
}
337411
}

0 commit comments

Comments
 (0)