Skip to content

Commit 73c9e38

Browse files
infvgzhouyuan
andcommitted
Enable enhanced tests for spark 4.0 & fix failures
Co-authored-by: Yuan <yuanzhou@apache.org>
1 parent 999ba5f commit 73c9e38

File tree

5 files changed

+82
-5
lines changed

5 files changed

+82
-5
lines changed

.github/workflows/velox_backend_enhanced.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ jobs:
298298
java -version
299299
$MVN_CMD clean test -Pspark-4.0 -Pscala-2.13 -Pjava-17 -Pbackends-velox -Piceberg \
300300
-Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark40/spark_home/" \
301-
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
301+
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTest
302302
- name: Upload test report
303303
if: always()
304304
uses: actions/upload-artifact@v4

backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ case class IcebergColumnarBatchDataWriter(
4444
}
4545

4646
override def write(batch: ColumnarBatch): Unit = {
47+
// Pass the original batch to native code
48+
// The native code will use the schema (writeSchema) we provided during initialization
49+
// to determine which columns to write, effectively filtering out metadata columns
50+
// like __row_operation, _file, _pos that Spark 4.0 adds
4751
val batchHandle = ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
4852
jniWrapper.write(writer, batchHandle)
4953
}

backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/AbstractIcebergWriteExec.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,23 @@ import org.apache.spark.sql.types.StructType
2424
import org.apache.iceberg.spark.source.IcebergWriteUtil
2525
import org.apache.iceberg.types.TypeUtil
2626

27+
import scala.collection.JavaConverters._
28+
2729
abstract class AbstractIcebergWriteExec extends IcebergWriteExec {
2830

2931
// the writer factory works for both batch and streaming
3032
private def createIcebergDataWriteFactory(schema: StructType): IcebergDataWriteFactory = {
3133
val writeSchema = IcebergWriteUtil.getWriteSchema(write)
3234
val nestedField = TypeUtil.visit(writeSchema, new IcebergNestedFieldVisitor)
35+
// Filter out metadata columns from the Spark output schema and reorder to match Iceberg schema
36+
// Spark 4.0 may include metadata columns in the output schema during UPDATE operations,
37+
// but these should not be written to the Iceberg table
38+
val schemaFieldMap = schema.fields.map(f => f.name -> f).toMap
39+
val filteredFields =
40+
writeSchema.columns().asScala.flatMap(icebergCol => schemaFieldMap.get(icebergCol.name()))
41+
val filteredSchema = StructType(filteredFields.toArray)
3342
IcebergDataWriteFactory(
34-
schema,
43+
filteredSchema,
3544
getFileFormat(IcebergWriteUtil.getFileFormat(write)),
3645
IcebergWriteUtil.getDirectory(write),
3746
getCodec,

backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,4 +383,59 @@ class VeloxIcebergSuite extends IcebergSuite {
383383
}
384384
}
385385
}
386+
387+
test("iceberg read cow table - update after schema evolution") {
388+
withTable("iceberg_cow_update_evolved_tb") {
389+
spark.sql("""
390+
|create table iceberg_cow_update_evolved_tb (
391+
| id int,
392+
| name string,
393+
| age int
394+
|) using iceberg
395+
|tblproperties (
396+
| 'format-version' = '2',
397+
| 'write.delete.mode' = 'copy-on-write',
398+
| 'write.update.mode' = 'copy-on-write',
399+
| 'write.merge.mode' = 'copy-on-write'
400+
|)
401+
|""".stripMargin)
402+
403+
spark.sql("""
404+
|alter table iceberg_cow_update_evolved_tb
405+
|add columns (salary decimal(10, 2))
406+
|""".stripMargin)
407+
408+
spark.sql("""
409+
|insert into table iceberg_cow_update_evolved_tb values
410+
| (1, 'Name1', 23, 3400.00),
411+
| (2, 'Name2', 30, 5500.00),
412+
| (3, 'Name3', 35, 6500.00)
413+
|""".stripMargin)
414+
415+
val df = spark.sql("""
416+
|update iceberg_cow_update_evolved_tb
417+
|set name = 'Name4'
418+
|where id = 1
419+
|""".stripMargin)
420+
421+
assert(
422+
df.queryExecution.executedPlan
423+
.asInstanceOf[CommandResultExec]
424+
.commandPhysicalPlan
425+
.isInstanceOf[VeloxIcebergReplaceDataExec])
426+
427+
checkAnswer(
428+
spark.sql("""
429+
|select id, name, age, salary
430+
|from iceberg_cow_update_evolved_tb
431+
|order by id
432+
|""".stripMargin),
433+
Seq(
434+
Row(1, "Name4", 23, new java.math.BigDecimal("3400.00")),
435+
Row(2, "Name2", 30, new java.math.BigDecimal("5500.00")),
436+
Row(3, "Name1", 35, new java.math.BigDecimal("6500.00"))
437+
)
438+
)
439+
}
440+
}
386441
}

cpp/velox/compute/iceberg/IcebergWriter.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,10 @@ std::shared_ptr<IcebergInsertTableHandle> createIcebergInsertTableHandle(
154154
nestedField.children[i]));
155155
}
156156
}
157-
157+
158158
auto fileNameGenerator = std::make_shared<const GlutenIcebergFileNameGenerator>(
159159
partitionId, taskId, operationId, fileFormat);
160-
160+
161161
std::shared_ptr<const connector::hive::LocationHandle> locationHandle =
162162
std::make_shared<connector::hive::LocationHandle>(
163163
outputDirectoryPath, outputDirectoryPath, connector::hive::LocationHandle::TableType::kExisting);
@@ -212,7 +212,16 @@ IcebergWriter::IcebergWriter(
212212
}
213213

214214
void IcebergWriter::write(const VeloxColumnarBatch& batch) {
215-
dataSink_->appendData(batch.getRowVector());
215+
auto inputRowVector = batch.getRowVector();
216+
217+
auto outputRowVector = std::make_shared<RowVector>(
218+
pool_.get(),
219+
rowType_,
220+
inputRowVector->nulls(),
221+
inputRowVector->size(),
222+
inputRowVector->children());
223+
224+
dataSink_->appendData(outputRowVector);
216225
}
217226

218227
std::vector<std::string> IcebergWriter::commit() {

0 commit comments

Comments
 (0)