Skip to content

Commit 065df97

Browse files
committed
Fix for Comet, and simplify a bit the base
1 parent 9f5d206 commit 065df97

File tree

9 files changed

+220
-55
lines changed

9 files changed

+220
-55
lines changed

.baseline/checkstyle/checkstyle-suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@
5555

5656
<!-- Suppress checks for CometColumnReader -->
5757
<suppress files="org.apache.iceberg.spark.data.vectorized.CometColumnReader" checks="IllegalImport"/>
58+
<!-- Suppress checks for CometDeletedColumnVector -->
59+
<suppress files="org.apache.iceberg.spark.data.vectorized.CometDeletedColumnVector" checks="IllegalImport"/>
5860

5961
<!-- Suppress TestClassNamingConvention for main source files -->
6062
<suppress files=".*[/\\]src[/\\]main[/\\].*" id="TestClassNamingConvention" />

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Map;
2323
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
2424
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
25-
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
2625
import org.apache.iceberg.parquet.VectorizedReader;
2726
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2827
import org.apache.parquet.column.page.PageReadStore;
@@ -37,12 +36,9 @@
3736
* populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
3837
*/
3938
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
40-
private final boolean hasIsDeletedColumn;
4139

4240
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
4341
super(readers);
44-
this.hasIsDeletedColumn =
45-
readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);
4642
}
4743

4844
@Override
@@ -72,15 +68,6 @@ private class ColumnBatchLoader {
7268
ColumnarBatch loadDataToColumnBatch() {
7369
ColumnVector[] vectors = readDataToColumnVectors();
7470

75-
if (hasIsDeletedColumn) {
76-
boolean[] isDeleted = new boolean[batchSize];
77-
for (ColumnVector vector : vectors) {
78-
if (vector instanceof DeletedColumnVector) {
79-
((DeletedColumnVector) vector).setValue(isDeleted);
80-
}
81-
}
82-
}
83-
8471
ColumnarBatch batch = new ColumnarBatch(vectors);
8572
batch.setNumRows(batchSize);
8673
return batch;

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
4444

4545
private final CometColumnReader[] readers;
46-
private final boolean hasIsDeletedColumn;
4746

4847
// The delegated BatchReader on the Comet side does the real work of loading a batch of rows.
4948
// The Comet BatchReader contains an array of ColumnReader. There is no need to explicitly call
@@ -57,8 +56,6 @@ class CometColumnarBatchReader implements VectorizedReader<ColumnarBatch> {
5756
CometColumnarBatchReader(List<VectorizedReader<?>> readers, Schema schema) {
5857
this.readers =
5958
readers.stream().map(CometColumnReader.class::cast).toArray(CometColumnReader[]::new);
60-
this.hasIsDeletedColumn =
61-
readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader);
6259

6360
AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()];
6461
this.delegate = new BatchReader(abstractColumnReaders);
@@ -121,11 +118,6 @@ private class ColumnBatchLoader {
121118
ColumnarBatch loadDataToColumnBatch() {
122119
ColumnVector[] vectors = readDataToColumnVectors();
123120

124-
if (hasIsDeletedColumn) {
125-
boolean[] isDeleted = new boolean[batchSize];
126-
readDeletedColumn(vectors, isDeleted);
127-
}
128-
129121
ColumnarBatch batch = new ColumnarBatch(vectors);
130122
batch.setNumRows(batchSize);
131123
return batch;
@@ -141,16 +133,5 @@ ColumnVector[] readDataToColumnVectors() {
141133

142134
return columnVectors;
143135
}
144-
145-
void readDeletedColumn(ColumnVector[] columnVectors, boolean[] isDeleted) {
146-
for (int i = 0; i < readers.length; i++) {
147-
if (readers[i] instanceof CometDeleteColumnReader) {
148-
CometDeleteColumnReader deleteColumnReader = new CometDeleteColumnReader<>(isDeleted);
149-
deleteColumnReader.setBatchSize(batchSize);
150-
deleteColumnReader.delegate().readBatch(batchSize);
151-
columnVectors[i] = deleteColumnReader.delegate().currentBatch();
152-
}
153-
}
154-
}
155136
}
156137
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.comet.parquet.MetadataColumnReader;
2222
import org.apache.comet.parquet.Native;
2323
import org.apache.comet.parquet.TypeUtil;
24+
import org.apache.comet.vector.CometVector;
2425
import org.apache.iceberg.MetadataColumns;
2526
import org.apache.iceberg.types.Types;
2627
import org.apache.spark.sql.types.DataTypes;
@@ -46,30 +47,34 @@ public void setBatchSize(int batchSize) {
4647
}
4748

4849
private static class DeleteColumnReader extends MetadataColumnReader {
49-
private boolean[] isDeleted;
50+
private CometDeletedColumnVector deletedVector;
5051

5152
DeleteColumnReader() {
53+
this(new boolean[0]);
54+
}
55+
56+
DeleteColumnReader(boolean[] isDeleted) {
5257
super(
5358
DataTypes.BooleanType,
5459
TypeUtil.convertToParquet(
5560
new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())),
5661
false /* useDecimal128 = false */,
5762
false /* isConstant = false */);
58-
this.isDeleted = new boolean[0];
59-
}
60-
61-
DeleteColumnReader(boolean[] isDeleted) {
62-
this();
63-
this.isDeleted = isDeleted;
63+
this.deletedVector = new CometDeletedColumnVector(isDeleted);
6464
}
6565

6666
@Override
6767
public void readBatch(int total) {
6868
Native.resetBatch(nativeHandle);
6969
// set isDeleted on the native side to be consumed by native execution
70-
Native.setIsDeleted(nativeHandle, isDeleted);
70+
Native.setIsDeleted(nativeHandle, deletedVector.isDeleted());
7171

7272
super.readBatch(total);
7373
}
74+
75+
@Override
76+
public CometVector currentBatch() {
77+
return deletedVector;
78+
}
7479
}
7580
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.data.vectorized;
20+
21+
import org.apache.comet.shaded.arrow.vector.ValueVector;
22+
import org.apache.comet.vector.CometVector;
23+
import org.apache.iceberg.spark.SparkSchemaUtil;
24+
import org.apache.iceberg.types.Types;
25+
import org.apache.spark.sql.types.Decimal;
26+
import org.apache.spark.sql.vectorized.ColumnVector;
27+
import org.apache.spark.sql.vectorized.ColumnarArray;
28+
import org.apache.spark.sql.vectorized.ColumnarMap;
29+
import org.apache.spark.unsafe.types.UTF8String;
30+
31+
public class CometDeletedColumnVector extends CometVector implements UpdatableDeletedColumnVector {
32+
private boolean[] isDeleted;
33+
34+
public CometDeletedColumnVector(boolean[] isDeleted) {
35+
super(SparkSchemaUtil.convert(Types.BooleanType.get()), false);
36+
this.isDeleted = isDeleted;
37+
}
38+
39+
@Override
40+
public void setValue(boolean[] deleted) {
41+
this.isDeleted = deleted;
42+
}
43+
44+
boolean[] isDeleted() {
45+
return isDeleted;
46+
}
47+
48+
@Override
49+
public void setNumNulls(int numNulls) {
50+
throw new UnsupportedOperationException("Not implemented");
51+
}
52+
53+
@Override
54+
public void setNumValues(int numValues) {
55+
throw new UnsupportedOperationException("Not implemented");
56+
}
57+
58+
@Override
59+
public int numValues() {
60+
throw new UnsupportedOperationException("Not implemented");
61+
}
62+
63+
@Override
64+
public ValueVector getValueVector() {
65+
throw new UnsupportedOperationException("Not implemented");
66+
}
67+
68+
@Override
69+
public CometVector slice(int offset, int length) {
70+
throw new UnsupportedOperationException("Not implemented");
71+
}
72+
73+
@Override
74+
public void close() {}
75+
76+
@Override
77+
public boolean hasNull() {
78+
return false;
79+
}
80+
81+
@Override
82+
public int numNulls() {
83+
return 0;
84+
}
85+
86+
@Override
87+
public boolean isNullAt(int rowId) {
88+
return false;
89+
}
90+
91+
@Override
92+
public boolean getBoolean(int rowId) {
93+
return isDeleted[rowId];
94+
}
95+
96+
@Override
97+
public byte getByte(int rowId) {
98+
throw new UnsupportedOperationException();
99+
}
100+
101+
@Override
102+
public short getShort(int rowId) {
103+
throw new UnsupportedOperationException();
104+
}
105+
106+
@Override
107+
public int getInt(int rowId) {
108+
throw new UnsupportedOperationException();
109+
}
110+
111+
@Override
112+
public long getLong(int rowId) {
113+
throw new UnsupportedOperationException();
114+
}
115+
116+
@Override
117+
public float getFloat(int rowId) {
118+
throw new UnsupportedOperationException();
119+
}
120+
121+
@Override
122+
public double getDouble(int rowId) {
123+
throw new UnsupportedOperationException();
124+
}
125+
126+
@Override
127+
public ColumnarArray getArray(int rowId) {
128+
throw new UnsupportedOperationException();
129+
}
130+
131+
@Override
132+
public ColumnarMap getMap(int ordinal) {
133+
throw new UnsupportedOperationException();
134+
}
135+
136+
@Override
137+
public Decimal getDecimal(int rowId, int precision, int scale) {
138+
throw new UnsupportedOperationException();
139+
}
140+
141+
@Override
142+
public UTF8String getUTF8String(int rowId) {
143+
throw new UnsupportedOperationException();
144+
}
145+
146+
@Override
147+
public byte[] getBinary(int rowId) {
148+
throw new UnsupportedOperationException();
149+
}
150+
151+
@Override
152+
public ColumnVector getChild(int ordinal) {
153+
throw new UnsupportedOperationException();
154+
}
155+
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/DeletedColumnVector.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626
import org.apache.spark.sql.vectorized.ColumnarMap;
2727
import org.apache.spark.unsafe.types.UTF8String;
2828

29-
public class DeletedColumnVector extends ColumnVector {
29+
public class DeletedColumnVector extends ColumnVector implements UpdatableDeletedColumnVector {
3030
private boolean[] isDeleted;
3131

3232
public DeletedColumnVector(Type type) {
3333
super(SparkSchemaUtil.convert(type));
3434
}
3535

36+
@Override
3637
public void setValue(boolean[] deleted) {
3738
this.isDeleted = deleted;
3839
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.spark.data.vectorized;
20+
21+
public interface UpdatableDeletedColumnVector {
22+
void setValue(boolean[] isDeleted);
23+
}

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.iceberg.spark.ParquetReaderType;
4040
import org.apache.iceberg.spark.data.vectorized.ColumnVectorWithFilter;
4141
import org.apache.iceberg.spark.data.vectorized.ColumnarBatchUtil;
42-
import org.apache.iceberg.spark.data.vectorized.DeletedColumnVector;
42+
import org.apache.iceberg.spark.data.vectorized.UpdatableDeletedColumnVector;
4343
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
4444
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
4545
import org.apache.iceberg.types.TypeUtil;
@@ -95,9 +95,7 @@ protected CloseableIterable<ColumnarBatch> newBatchIterable(
9595
"Format: " + format + " not supported for batched reads");
9696
}
9797

98-
return deleteFilter == null
99-
? iterable
100-
: CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch);
98+
return CloseableIterable.transform(iterable, new BatchDeleteFilter(deleteFilter)::filterBatch);
10199
}
102100

103101
private CloseableIterable<ColumnarBatch> newParquetIterable(
@@ -177,6 +175,10 @@ static class BatchDeleteFilter {
177175
}
178176

179177
ColumnarBatch filterBatch(ColumnarBatch batch) {
178+
if (!needDeletes()) {
179+
return batch;
180+
}
181+
180182
ColumnVector[] vectors = new ColumnVector[batch.numCols()];
181183
for (int i = 0; i < batch.numCols(); i++) {
182184
vectors[i] = batch.column(i);
@@ -190,8 +192,8 @@ ColumnarBatch filterBatch(ColumnarBatch batch) {
190192
boolean[] isDeleted =
191193
ColumnarBatchUtil.buildIsDeleted(vectors, deletes, rowStartPosInBatch, numLiveRows);
192194
for (ColumnVector vector : vectors) {
193-
if (vector instanceof DeletedColumnVector) {
194-
((DeletedColumnVector) vector).setValue(isDeleted);
195+
if (vector instanceof UpdatableDeletedColumnVector) {
196+
((UpdatableDeletedColumnVector) vector).setValue(isDeleted);
195197
}
196198
}
197199
} else {
@@ -214,5 +216,10 @@ ColumnarBatch filterBatch(ColumnarBatch batch) {
214216
output.setNumRows(numLiveRows);
215217
return output;
216218
}
219+
220+
private boolean needDeletes() {
221+
return hasIsDeletedColumn
222+
|| (deletes != null && (deletes.hasEqDeletes() || deletes.hasPosDeletes()));
223+
}
217224
}
218225
}

0 commit comments

Comments
 (0)