Skip to content

Commit 52b3957

Browse files
Address review feedback
1 parent d752e0a commit 52b3957

File tree

2 files changed

+19
-16
lines changed

2 files changed

+19
-16
lines changed

core/src/main/java/org/apache/iceberg/RestTableScan.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.iceberg.types.Types;
3737
import org.apache.iceberg.util.ParallelIterable;
3838

39-
public class RestTableScan extends DataTableScan {
39+
class RestTableScan extends DataTableScan {
4040
private final RESTClient client;
4141
private final String path;
4242
private final Supplier<Map<String, String>> headers;
@@ -48,7 +48,7 @@ public class RestTableScan extends DataTableScan {
4848
// TODO revisit if this property should be configurable
4949
private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000;
5050

51-
public RestTableScan(
51+
RestTableScan(
5252
Table table,
5353
Schema schema,
5454
TableScanContext context,
@@ -85,6 +85,9 @@ protected TableScan newRefinedScan(
8585

8686
@Override
8787
public CloseableIterable<FileScanTask> planFiles() {
88+
Long startSnapshotId = context().fromSnapshotId();
89+
Long endSnapshotId = context().toSnapshotId();
90+
Long snapshotId = snapshotId();
8891
List<String> selectedColumns =
8992
schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
9093

@@ -96,10 +99,6 @@ public CloseableIterable<FileScanTask> planFiles() {
9699
.collect(Collectors.toList());
97100
}
98101

99-
Long startSnapshotId = context().fromSnapshotId();
100-
Long endSnapshotId = context().toSnapshotId();
101-
Long snapshotId = snapshotId();
102-
103102
PlanTableScanRequest.Builder planTableScanRequestBuilder =
104103
new PlanTableScanRequest.Builder()
105104
.withSelect(selectedColumns)
@@ -127,12 +126,12 @@ public CloseableIterable<FileScanTask> planFiles() {
127126
}
128127

129128
private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planTableScanRequest) {
130-
// we need to inject specById map here and also the caseSensitive
131129
ParserContext context =
132130
ParserContext.builder()
133131
.add("specsById", table.specs())
134132
.add("caseSensitive", context().caseSensitive())
135133
.build();
134+
136135
PlanTableScanResponse response =
137136
client.post(
138137
resourcePaths.planTableScan(tableIdentifier),
@@ -146,13 +145,15 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
146145
PlanStatus planStatus = response.planStatus();
147146
switch (planStatus) {
148147
case COMPLETED:
149-
// List<FileScanTask> fileScanTasks = bindFileScanTasksWithSpec(response.fileScanTasks());
150148
return getScanTasksIterable(response.planTasks(), response.fileScanTasks());
151149
case SUBMITTED:
152150
return fetchPlanningResult(response.planId());
153151
case FAILED:
154-
throw new RuntimeException(
152+
throw new IllegalStateException(
155153
"Received \"failed\" status from service when planning a table scan");
154+
case CANCELLED:
155+
throw new IllegalStateException(
156+
"Received \"cancelled\" status from service when planning a table scan");
156157
default:
157158
throw new RuntimeException(
158159
String.format("Invalid planStatus during planTableScan: %s", planStatus));
@@ -181,9 +182,10 @@ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
181182
PlanStatus planStatus = response.planStatus();
182183
switch (planStatus) {
183184
case COMPLETED:
184-
// List<FileScanTask> fileScanTasks = bindFileScanTasksWithSpec(response.fileScanTasks());
185185
return getScanTasksIterable(response.planTasks(), response.fileScanTasks());
186186
case SUBMITTED:
187+
// TODO: Think more about whether we should use a backoff strategy here.
188+
// For now, we will just sleep for a fixed duration before checking the status again.
187189
try {
188190
Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
189191
} catch (InterruptedException e) {

core/src/main/java/org/apache/iceberg/ScanTasksIterable.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.iceberg;
2020

2121
import java.io.IOException;
22+
import java.util.ArrayDeque;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.ExecutorService;
@@ -43,7 +44,7 @@ public class ScanTasksIterable implements CloseableIterable<FileScanTask> {
4344
// parallelizing on this where a planTask produces a list of file scan tasks, as
4445
// well more planTasks.
4546
private final String planTask;
46-
private final List<FileScanTask> fileScanTasks;
47+
private final ArrayDeque<FileScanTask> fileScanTasks;
4748
private final ExecutorService executorService;
4849
private final Map<Integer, PartitionSpec> specsById;
4950
private final boolean caseSensitive;
@@ -78,7 +79,7 @@ public ScanTasksIterable(
7879
Map<Integer, PartitionSpec> specsById,
7980
boolean caseSensitive) {
8081
this.planTask = null;
81-
this.fileScanTasks = fileScanTasks;
82+
this.fileScanTasks = new ArrayDeque<>(fileScanTasks);
8283
this.client = client;
8384
this.resourcePaths = resourcePaths;
8485
this.tableIdentifier = tableIdentifier;
@@ -111,14 +112,14 @@ private static class ScanTasksIterator implements CloseableIterator<FileScanTask
111112
private final TableIdentifier tableIdentifier;
112113
private final Supplier<Map<String, String>> headers;
113114
private String planTask;
114-
private final List<FileScanTask> fileScanTasks;
115+
private final ArrayDeque<FileScanTask> fileScanTasks;
115116
private final ExecutorService executorService;
116117
private final Map<Integer, PartitionSpec> specsById;
117118
private final boolean caseSensitive;
118119

119120
ScanTasksIterator(
120121
String planTask,
121-
List<FileScanTask> fileScanTasks,
122+
ArrayDeque<FileScanTask> fileScanTasks,
122123
RESTClient client,
123124
ResourcePaths resourcePaths,
124125
TableIdentifier tableIdentifier,
@@ -131,7 +132,7 @@ private static class ScanTasksIterator implements CloseableIterator<FileScanTask
131132
this.tableIdentifier = tableIdentifier;
132133
this.headers = headers;
133134
this.planTask = planTask;
134-
this.fileScanTasks = fileScanTasks != null ? fileScanTasks : Lists.newArrayList();
135+
this.fileScanTasks = fileScanTasks != null ? fileScanTasks : new ArrayDeque<>();
135136
this.executorService = executorService;
136137
this.specsById = specsById;
137138
this.caseSensitive = caseSensitive;
@@ -158,7 +159,7 @@ public boolean hasNext() {
158159

159160
@Override
160161
public FileScanTask next() {
161-
return fileScanTasks.remove(0);
162+
return fileScanTasks.removeFirst();
162163
}
163164

164165
private void fetchScanTasks(String withPlanTask) {

0 commit comments

Comments
 (0)