Skip to content

Commit 5f91620

Browse files
committed
Using Parquet.icebergSchema instead of keeping track of it in the ParquetFormatModel.ReadBuilderWrapper
1 parent 96bf9ec commit 5f91620

File tree

2 files changed

+66
-8
lines changed

2 files changed

+66
-8
lines changed

parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ public static class ReadBuilder implements InternalData.ReadBuilder {
11741174
private Schema schema = null;
11751175
private Expression filter = null;
11761176
private ReadSupport<?> readSupport = null;
1177-
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = null;
1177+
private BatchReaderFunction batchedReaderFunc = null;
11781178
private ReaderFunction readerFunction = null;
11791179
private boolean filterRecords = true;
11801180
private boolean caseSensitive = true;
@@ -1240,6 +1240,50 @@ public ReaderFunction withSchema(Schema expectedSchema) {
12401240
}
12411241
}
12421242

1243+
public interface BatchReaderFunction {
1244+
Function<MessageType, VectorizedReader<?>> apply();
1245+
1246+
default BatchReaderFunction withSchema(Schema schema) {
1247+
return this;
1248+
}
1249+
}
1250+
1251+
private static class UnaryBatchReaderFunction implements BatchReaderFunction {
1252+
private final Function<MessageType, VectorizedReader<?>> readerFunc;
1253+
1254+
UnaryBatchReaderFunction(Function<MessageType, VectorizedReader<?>> readerFunc) {
1255+
this.readerFunc = readerFunc;
1256+
}
1257+
1258+
@Override
1259+
public Function<MessageType, VectorizedReader<?>> apply() {
1260+
return readerFunc;
1261+
}
1262+
}
1263+
1264+
private static class BinaryBatchReaderFunction implements BatchReaderFunction {
1265+
private final BiFunction<Schema, MessageType, VectorizedReader<?>> readerFuncWithSchema;
1266+
private Schema schema;
1267+
1268+
BinaryBatchReaderFunction(
1269+
BiFunction<Schema, MessageType, VectorizedReader<?>> readerFuncWithSchema) {
1270+
this.readerFuncWithSchema = readerFuncWithSchema;
1271+
}
1272+
1273+
@Override
1274+
public Function<MessageType, VectorizedReader<?>> apply() {
1275+
Preconditions.checkArgument(
1276+
schema != null, "Schema must be set for 2-argument reader function");
1277+
return messageType -> readerFuncWithSchema.apply(schema, messageType);
1278+
}
1279+
1280+
@Override
1281+
public BinaryBatchReaderFunction withSchema(Schema expectedSchema) {
1282+
this.schema = expectedSchema;
1283+
return this;
1284+
}
1285+
}
1286+
12431287
private ReadBuilder(InputFile file) {
12441288
this.file = file;
12451289
}
@@ -1314,14 +1358,27 @@ public ReadBuilder createReaderFunc(
13141358
return this;
13151359
}
13161360

1317-
public ReadBuilder createBatchedReaderFunc(Function<MessageType, VectorizedReader<?>> func) {
1361+
public ReadBuilder createBatchedReaderFunc(
1362+
Function<MessageType, VectorizedReader<?>> newReaderFunction) {
1363+
Preconditions.checkArgument(
1364+
this.batchedReaderFunc == null,
1365+
"Cannot set batched reader function: batched reader function already set");
1366+
Preconditions.checkArgument(
1367+
this.readerFunction == null,
1368+
"Cannot set batched reader function: ReaderFunction already set");
1369+
this.batchedReaderFunc = new UnaryBatchReaderFunction(newReaderFunction);
1370+
return this;
1371+
}
1372+
1373+
public ReadBuilder createBatchedReaderFunc(
1374+
BiFunction<Schema, MessageType, VectorizedReader<?>> newReaderFunction) {
13181375
Preconditions.checkArgument(
13191376
this.batchedReaderFunc == null,
13201377
"Cannot set batched reader function: batched reader function already set");
13211378
Preconditions.checkArgument(
13221379
this.readerFunction == null,
13231380
"Cannot set batched reader function: ReaderFunction already set");
1324-
this.batchedReaderFunc = func;
1381+
this.batchedReaderFunc = new BinaryBatchReaderFunction(newReaderFunction);
13251382
return this;
13261383
}
13271384

@@ -1441,11 +1498,13 @@ public <D> CloseableIterable<D> build() {
14411498
}
14421499

14431500
if (batchedReaderFunc != null) {
1501+
Function<MessageType, VectorizedReader<?>> readBuilder =
1502+
batchedReaderFunc.withSchema(schema).apply();
14441503
return new VectorizedParquetReader<>(
14451504
file,
14461505
schema,
14471506
options,
1448-
batchedReaderFunc,
1507+
readBuilder,
14491508
mapping,
14501509
filter,
14511510
reuseContainers,

parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ private static class ReadBuilderWrapper<D, S> implements ReadBuilder<D, S> {
245245
private final ReaderFunction<D> readerFunction;
246246
private final BatchReaderFunction<D> batchReaderFunction;
247247
private final Map<String, String> config = Maps.newHashMap();
248-
private Schema icebergSchema;
249248
private Map<Integer, ?> idToConstant = ImmutableMap.of();
250249

251250
private ReadBuilderWrapper(
@@ -265,7 +264,6 @@ public ReadBuilder<D, S> split(long newStart, long newLength) {
265264

266265
@Override
267266
public ReadBuilder<D, S> project(Schema schema) {
268-
this.icebergSchema = schema;
269267
internal.project(schema);
270268
return this;
271269
}
@@ -318,12 +316,13 @@ public CloseableIterable<D> build() {
318316
if (readerFunction != null) {
319317
return internal
320318
.createReaderFunc(
321-
messageType -> readerFunction.read(icebergSchema, messageType, idToConstant))
319+
(icebergSchema, messageType) ->
320+
readerFunction.read(icebergSchema, messageType, idToConstant))
322321
.build();
323322
} else if (batchReaderFunction != null) {
324323
return internal
325324
.createBatchedReaderFunc(
326-
messageType ->
325+
(icebergSchema, messageType) ->
327326
batchReaderFunction.read(icebergSchema, messageType, idToConstant, config))
328327
.build();
329328
} else {

0 commit comments

Comments
 (0)