From e29eb6f976a7476b33c394934c836a2f97130796 Mon Sep 17 00:00:00 2001 From: ashahid Date: Mon, 27 Jan 2025 19:53:38 -0800 Subject: [PATCH 1/6] SPARK-51016. Fixing the code of isInDeterminate boolean for a Stage --- .../scala/org/apache/spark/Dependency.scala | 1 + .../main/scala/org/apache/spark/rdd/RDD.scala | 4 ++ .../sql/catalyst/expressions/Expression.scala | 3 ++ .../expressions/namedExpressions.scala | 16 +++++++- .../expressions/ExpressionEvalHelper.scala | 2 +- .../expressions/NondeterministicSuite.scala | 34 ++++++++++++++++ .../exchange/ShuffleExchangeExec.scala | 8 +++- ...rojectedOrderingAndPartitioningSuite.scala | 40 +++++++++++++++++-- 8 files changed, 101 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 573608c4327e0..224fc597fcc11 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -83,6 +83,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false, + val isInDeterministic: Boolean = false, val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) extends Dependency[Product2[K, V]] with Logging { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 80db818b77e42..978d80db67c72 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -2079,6 +2079,7 @@ abstract class RDD[T: ClassTag]( private final lazy val _outputDeterministicLevel: DeterministicLevel.Value = getOutputDeterministicLevel + /** * Returns the deterministic level of this RDD's output. Please refer to [[DeterministicLevel]] * for the definition. @@ -2105,6 +2106,9 @@ abstract class RDD[T: ClassTag]( val deterministicLevelCandidates = dependencies.map { // The shuffle is not really happening, treat it like narrow dependency and assume the output // deterministic level of current RDD is same as parent. + case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic => + DeterministicLevel.INDETERMINATE + case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => dep.rdd.outputDeterministicLevel diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 4c83f92509ecd..98c220eed0151 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -115,6 +115,9 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) + lazy val exprValHasIndeterministicCharacter: Boolean = !deterministic || + this.references.exists(_.exprValHasIndeterministicCharacter) + def nullable: Boolean /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f5f35050401ba..bda7ad654f4e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -110,6 +110,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { @transient override lazy val references: AttributeSet = AttributeSet(this) + override lazy val exprValHasIndeterministicCharacter: Boolean = + metadata.contains(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT) && + metadata.getBoolean(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT) + def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute @@ -193,7 +197,13 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) + val mdForAttrib = if (this.exprValHasIndeterministicCharacter) { + new MetadataBuilder().withMetadata(metadata). + putBoolean(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() + } else { + metadata + } + AttributeReference(name, child.dataType, child.nullable, mdForAttrib)(exprId, qualifier) } else { UnresolvedAttribute.quoted(name) } @@ -385,6 +395,10 @@ case class AttributeReference( } } +object AttributeReference { + val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" +} + /** * A place holder used when printing expressions without debugging information such as the * expression id or the unresolved indicator. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 184f5a2a9485d..2347410d45377 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case _ => expr.mapChildren(replace) } - private def prepareEvaluation(expression: Expression): Expression = { + def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance() val resolver = ResolveTimeZone val expr = replace(resolver.resolveTimeZones(expression)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index bf1c930c0bd0b..983810a5fdaec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning} class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("MonotonicallyIncreasingID") { @@ -31,4 +32,37 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("InputFileName") { checkEvaluation(InputFileName(), "") } + + test("SPARK-51016: has Indeterministic Component") { + def assertIndeterminancyComponent(expression: Expression): Unit = + assert(prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertIndeterminancyComponent(MonotonicallyIncreasingID()) + val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() + assertIndeterminancyComponent(alias) + assertIndeterminancyComponent(alias.toAttribute) + assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L))) + assertIndeterminancyComponent( + HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5)) + assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } + + test("SPARK-51016: has Deterministic Component") { + def assertNoIndeterminancyComponent(expression: Expression): Unit = + assert(!prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertNoIndeterminancyComponent(Literal(1000L)) + val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")() + assertNoIndeterminancyComponent(alias) + assertNoIndeterminancyComponent(alias.toAttribute) + assertNoIndeterminancyComponent( + HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5)) + assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertNoIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 31a3f53eb7191..e5c12d31ee63f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -467,7 +467,10 @@ object ShuffleExchangeExec { }, isOrderSensitive = isOrderSensitive) } } - + val isIndeterministic = newPartitioning match { + case expr: Expression => expr.exprValHasIndeterministicCharacter + case _ => false + } // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough. @@ -476,6 +479,7 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, + isInDeterministic = isIndeterministic, shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) dependency diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f84..395c03b81055f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.execution -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{DeterministicLevel, RDD} +import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.functions.{col, floor, isnull, rand, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{LongType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +213,37 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-51016: ShuffleRDD using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22"), (3L, "33")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + + outerjoin.collect() + val finalPlan = outerjoin.queryExecution.executedPlan + val shuffleHJExec = finalPlan.children(0).asInstanceOf[ShuffledHashJoinExec] + assert(shuffleHJExec.left.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.INDETERMINATE) + + assert(shuffleHJExec.right.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.UNORDERED) + + assert(shuffleHJExec.execute().outputDeterministicLevel == DeterministicLevel.INDETERMINATE) + } + } } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode { From e3388156375ce3a0a3e710be5bb6bd5f0027096c Mon Sep 17 00:00:00 2001 From: ashahid Date: Mon, 27 Jan 2025 23:41:12 -0800 Subject: [PATCH 2/6] SPARK-51016. Fixing the code of isInDeterminate boolean for a Stage --- .../scala/org/apache/spark/Dependency.scala | 23 ++++++++++++++----- .../exchange/ShuffleExchangeExec.scala | 7 ++++-- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 224fc597fcc11..2751d7b3b2e04 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -79,14 +79,25 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, - val serializer: Serializer = SparkEnv.get.serializer, - val keyOrdering: Option[Ordering[K]] = None, - val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false, - val isInDeterministic: Boolean = false, - val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) + val serializer: Serializer, + val keyOrdering: Option[Ordering[K]], + val aggregator: Option[Aggregator[K, V, C]], + val mapSideCombine: Boolean, + val shuffleWriterProcessor: ShuffleWriteProcessor, + val isInDeterministic: Boolean) extends Dependency[Product2[K, V]] with Logging { + def this ( + rdd: RDD[_ <: Product2[K, V]], + partitioner: Partitioner, + serializer: Serializer = SparkEnv.get.serializer, + keyOrdering: Option[Ordering[K]] = None, + aggregator: Option[Aggregator[K, V, C]] = None, + mapSideCombine: Boolean = false, + shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor + ) = this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine, + shuffleWriterProcessor, false) + if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index e5c12d31ee63f..2e6261de40401 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -479,8 +479,11 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - isInDeterministic = isIndeterministic, - shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) + None, + None, + false, + shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics), + isIndeterministic) dependency } From 786053971edd6e6a5faaca6a5db1ccbb301bc350 Mon Sep 17 00:00:00 2001 From: ashahid Date: Tue, 28 Jan 2025 17:03:12 -0800 Subject: [PATCH 3/6] SPARK-51016. Fixing the code of isInDeterminate boolean for a Stage --- python/pyspark/pandas/internal.py | 6 ++++-- .../catalyst/expressions/namedExpressions.scala | 14 +++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py index 3f6831b600678..bda05017e135f 100644 --- a/python/pyspark/pandas/internal.py +++ b/python/pyspark/pandas/internal.py @@ -766,8 +766,9 @@ def __init__( for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - index_field.struct_field == struct_field + _drop_metadata(index_field.struct_field) == _drop_metadata(struct_field) for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) @@ -794,8 +795,9 @@ def __init__( for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - data_field.struct_field == struct_field + _drop_metadata(data_field.struct_field) == _drop_metadata(struct_field) for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 66987cdbf8d8d..421c2fd190a4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -112,8 +112,8 @@ abstract class Attribute extends LeafExpression with NamedExpression { override lazy val references: AttributeSet = AttributeSet(this) override lazy val exprValHasIndeterministicCharacter: Boolean = - metadata.contains(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT) && - metadata.getBoolean(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT) + metadata.contains(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) && + metadata.getBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute @@ -127,6 +127,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { } +object Attribute { + val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" +} + /** * Used to assign a new name to a computation. * For example the SQL expression "1 + 1 AS a" could be represented as follows: @@ -200,7 +204,7 @@ case class Alias(child: Expression, name: String)( if (resolved) { val mdForAttrib = if (this.exprValHasIndeterministicCharacter) { new MetadataBuilder().withMetadata(metadata). - putBoolean(AttributeReference.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() + putBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() } else { metadata } @@ -396,10 +400,6 @@ case class AttributeReference( } } -object AttributeReference { - val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" -} - /** * A place holder used when printing expressions without debugging information such as the * expression id or the unresolved indicator. From 3765d17da19806764fee2f5c3d4fb92bc7eae2ae Mon Sep 17 00:00:00 2001 From: ashahid Date: Mon, 10 Feb 2025 20:49:41 -0800 Subject: [PATCH 4/6] SPARK-51016: overriding the isDeterministic method in Shuffle stage --- .../main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db09d19d0acff..d49c709f83c51 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -94,4 +94,6 @@ private[spark] class ShuffleMapStage( .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } + + override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic } From afc4ef20a57e70d4b349e10eaed563ac7afe61da Mon Sep 17 00:00:00 2001 From: ashahid Date: Tue, 11 Feb 2025 09:01:19 -0800 Subject: [PATCH 5/6] SPARK-51016: overriding the isDeterministic method in Shuffle stage. fix tests failure --- .../main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index d49c709f83c51..38da377512067 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -95,5 +95,5 @@ private[spark] class ShuffleMapStage( .getOrElse(0 until numPartitions) } - override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic + override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic || super.isIndeterminate } From 790aaf0fb6c5cbb04a78786ffd6f2b96107c850c Mon Sep 17 00:00:00 2001 From: ashahid Date: Tue, 11 Feb 2025 17:29:29 -0800 Subject: [PATCH 6/6] SPARK-51016: overriding the isDeterministic method in Shuffle stage. added bug test --- .../spark/scheduler/ShuffleMapStageTest.scala | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala diff --git a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala new file mode 100644 index 0000000000000..da3f619c4a2c6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType + +class ShuffleMapStageTest extends SharedSparkSession { + + test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2) + spark.sparkContext.addSparkListener(new SparkListener() { + var i = 0 + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + if (stageSubmitted.stageInfo.shuffleDepId.isDefined) { + shuffleStages(i) = + spark.sparkContext.dagScheduler.shuffleIdToMapStage(stageSubmitted.stageInfo.stageId) + i +=1 + } + } + }); + outerjoin.collect() + assert(shuffleStages.filter(_.isIndeterminate).size == 1) + } + } +}