From a9c9add1ad1dea707ab483a586b2432602227f2c Mon Sep 17 00:00:00 2001 From: Lukas Rupprecht Date: Fri, 31 Jan 2025 07:58:58 -0800 Subject: [PATCH 1/2] makes DataSkippingStatsTracker aware of codegen configs --- .../sql/delta/stats/DataSkippingStatsTracker.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala index eaff49795ba..34643e153bb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala @@ -87,14 +87,11 @@ class DeltaTaskStatisticsTracker( // This projection combines the intermediate results stored by aggBuffer with the values of the // currently processed row and updates aggBuffer in place. - private val updateStats: MutableProjection = GenerateMutableProjection.generate( - expressions = JoinedProjection.bind( - aggBufferAttrs, - dataCols, - aggregates.flatMap(_.updateExpressions)), - inputSchema = Nil, - useSubexprElimination = true - ) + private val updateStats: MutableProjection = { + val expressions = JoinedProjection.bind( + aggBufferAttrs, dataCols, aggregates.flatMap(_.updateExpressions)) + MutableProjection.create(expressions, Nil) + } // This executes the whole statsColExpr in order to compute the final stats value for the file. // In order to evaluate it, we have to replace its aggregate functions with the corresponding From 0b9391244e6fb7981bf540115c4a72b3d48e7d17 Mon Sep 17 00:00:00 2001 From: Lukas Rupprecht Date: Tue, 4 Feb 2025 11:55:37 -0800 Subject: [PATCH 2/2] addresses comment --- .../spark/sql/delta/stats/DataSkippingStatsTracker.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala index 34643e153bb..5e80c400177 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingStatsTracker.scala @@ -88,8 +88,8 @@ class DeltaTaskStatisticsTracker( // This projection combines the intermediate results stored by aggBuffer with the values of the // currently processed row and updates aggBuffer in place. private val updateStats: MutableProjection = { - val expressions = JoinedProjection.bind( - aggBufferAttrs, dataCols, aggregates.flatMap(_.updateExpressions)) + val aggs = aggregates.flatMap(_.updateExpressions) + val expressions = JoinedProjection.bind(aggBufferAttrs, dataCols, aggs) MutableProjection.create(expressions, Nil) }