Skip to content

[SPARK-51016][SQL] Fix for incorrect results on retry for Left Outer Join with indeterministic join keys #49708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,25 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)
val serializer: Serializer,
val keyOrdering: Option[Ordering[K]],
val aggregator: Option[Aggregator[K, V, C]],
val mapSideCombine: Boolean,
val shuffleWriterProcessor: ShuffleWriteProcessor,
val isInDeterministic: Boolean)
extends Dependency[Product2[K, V]] with Logging {

def this (
rdd: RDD[_ <: Product2[K, V]],
partitioner: Partitioner,
serializer: Serializer = SparkEnv.get.serializer,
keyOrdering: Option[Ordering[K]] = None,
aggregator: Option[Aggregator[K, V, C]] = None,
mapSideCombine: Boolean = false,
shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor
) = this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine,
shuffleWriterProcessor, false)

if (mapSideCombine) {
require(aggregator.isDefined, "Map-side combine without Aggregator specified!")
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,9 @@ abstract class RDD[T: ClassTag](
val deterministicLevelCandidates = dependencies.map {
// The shuffle is not really happening, treat it like narrow dependency and assume the output
// deterministic level of current RDD is same as parent.
case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic =>
DeterministicLevel.INDETERMINATE

case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) =>
dep.rdd.outputDeterministicLevel

Expand Down
239 changes: 143 additions & 96 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ private[spark] class ResultStage(
*/
override def findMissingPartitions(): Seq[Int] = {
val job = activeJob.get
(0 until job.numPartitions).filter(id => !job.finished(id))
val allPartitions = (0 until job.numPartitions)
if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) {
allPartitions
} else {
allPartitions.filter(id => !job.finished(id))
}
}

override def toString: String = "ResultStage " + id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,13 @@ private[spark] class ShuffleMapStage(

/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) {
0 until numPartitions
} else {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions)
}
}

override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic || super.isIndeterminate
}
68 changes: 62 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.mutable.HashSet

import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -63,6 +65,12 @@ private[scheduler] abstract class Stage(
val resourceProfileId: Int)
extends Logging {

@volatile
private var attemptIdAllPartitionsMissing: Int = -1

private val stageReattemptLock = new ReentrantReadWriteLock()
private val stageReadLock = stageReattemptLock.readLock()
private val stageWriteLock = stageReattemptLock.writeLock()
val numPartitions = rdd.partitions.length

/** Set of jobs that this stage belongs to. */
Expand Down Expand Up @@ -100,12 +108,21 @@ private[scheduler] abstract class Stage(
def makeNewStageAttempt(
numPartitionsToCompute: Int,
taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
resourceProfileId = resourceProfileId)
nextAttemptId += 1
val writeLockTaken = this.acquireStageWriteLock()
try {
val metrics = new TaskMetrics
metrics.register(rdd.sparkContext)
_latestInfo = StageInfo.fromStage(
this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences,
resourceProfileId = resourceProfileId)
nextAttemptId += 1
// clear the entry in the allPartitionsAsMissing set
attemptIdAllPartitionsMissing = -1
} finally {
if (writeLockTaken) {
this.releaseStageWriteLock()
}
}
}

/** Forward the nextAttemptId if skipped and get visited for the first time. */
Expand All @@ -131,4 +148,43 @@ private[scheduler] abstract class Stage(
def isIndeterminate: Boolean = {
rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

def treatAllPartitionsMissing(attemptId: Int): Boolean =
this.attemptIdAllPartitionsMissing == attemptId

def markAttemptIdForAllPartitionsMissing(attemptId: Int): Unit =
this.attemptIdAllPartitionsMissing = attemptId

def acquireStageReadLock(): Unit = {
this.stageReadLock.lockInterruptibly()
val prevSet = Stage.threadHoldingReadLock.get()
Stage.threadHoldingReadLock.set(prevSet + this.id)
}

def releaseStageReadLock(): Unit = {
val prevSet = Stage.threadHoldingReadLock.get()
Stage.threadHoldingReadLock.set(prevSet - this.id)
this.stageReadLock.unlock()
}

def acquireStageWriteLock(): Boolean = {
if (Stage.threadHoldingReadLock.get().contains(this.id)) {
false
} else {
stageWriteLock.lockInterruptibly()
true
}
}

def releaseStageWriteLock(): Unit = {
stageWriteLock.unlock()
}
}

object Stage {
private val threadHoldingReadLock = new ThreadLocal[Set[Int]] {
override protected def initialValue(): Set[Int] = {
Set.empty[Int]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3192,9 +3192,14 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val failedStages = scheduler.failedStages.toSeq
assert(failedStages.map(_.id) == Seq(1, 2))
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
// TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all
// the partitions need to be retried
/* assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0)) */
assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0))
}.head.findMissingPartitions() == Seq(0, 1))
// The result stage is still waiting for its 2 tasks to complete
assert(failedStages.collect {
case stage: ResultStage => stage
Expand Down Expand Up @@ -4163,9 +4168,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val failedStages = scheduler.failedStages.toSeq
assert(failedStages.map(_.id) == Seq(1, 2))
// Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry.
// TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all
// the partitions need to be retried
/*
assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0))
*/
assert(failedStages.collect {
case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage
}.head.findMissingPartitions() == Seq(0, 1))
// The result stage is still waiting for its 2 tasks to complete
assert(failedStages.collect {
case stage: ResultStage => stage
Expand Down
6 changes: 4 additions & 2 deletions python/pyspark/pandas/internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,9 @@ def __init__(
for index_field, struct_field in zip(index_fields, struct_fields)
), (index_fields, struct_fields)
else:
# TODO(SPARK-42965): For some reason, the metadata of StructField is different
assert all(
index_field.struct_field == struct_field
_drop_metadata(index_field.struct_field) == _drop_metadata(struct_field)
for index_field, struct_field in zip(index_fields, struct_fields)
), (index_fields, struct_fields)

Expand All @@ -794,8 +795,9 @@ def __init__(
for data_field, struct_field in zip(data_fields, struct_fields)
), (data_fields, struct_fields)
else:
# TODO(SPARK-42965): For some reason, the metadata of StructField is different
assert all(
data_field.struct_field == struct_field
_drop_metadata(data_field.struct_field) == _drop_metadata(struct_field)
for data_field, struct_field in zip(data_fields, struct_fields)
), (data_fields, struct_fields)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ abstract class Expression extends TreeNode[Expression] {
*/
lazy val deterministic: Boolean = children.forall(_.deterministic)

lazy val exprValHasIndeterministicCharacter: Boolean = !deterministic ||
this.references.exists(_.exprValHasIndeterministicCharacter)

def nullable: Boolean

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ abstract class Attribute extends LeafExpression with NamedExpression {
@transient
override lazy val references: AttributeSet = AttributeSet(this)

override lazy val exprValHasIndeterministicCharacter: Boolean =
metadata.contains(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) &&
metadata.getBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT)

def withNullability(newNullability: Boolean): Attribute
def withQualifier(newQualifier: Seq[String]): Attribute
def withName(newName: String): Attribute
Expand All @@ -123,6 +127,10 @@ abstract class Attribute extends LeafExpression with NamedExpression {

}

object Attribute {
val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent"
}

/**
* Used to assign a new name to a computation.
* For example the SQL expression "1 + 1 AS a" could be represented as follows:
Expand Down Expand Up @@ -194,7 +202,13 @@ case class Alias(child: Expression, name: String)(

override def toAttribute: Attribute = {
if (resolved) {
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)
val mdForAttrib = if (this.exprValHasIndeterministicCharacter) {
new MetadataBuilder().withMetadata(metadata).
putBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build()
} else {
metadata
}
AttributeReference(name, child.dataType, child.nullable, mdForAttrib)(exprId, qualifier)
} else {
UnresolvedAttribute.quoted(name)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB
case _ => expr.mapChildren(replace)
}

private def prepareEvaluation(expression: Expression): Expression = {
def prepareEvaluation(expression: Expression): Expression = {
val serializer = new JavaSerializer(new SparkConf()).newInstance()
val resolver = ResolveTimeZone
val expr = replace(resolver.resolveTimeZones(expression))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning}

class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("MonotonicallyIncreasingID") {
Expand All @@ -31,4 +32,37 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
test("InputFileName") {
checkEvaluation(InputFileName(), "")
}

test("SPARK-51016: has Indeterministic Component") {
def assertIndeterminancyComponent(expression: Expression): Unit =
assert(prepareEvaluation(expression).exprValHasIndeterministicCharacter)

assertIndeterminancyComponent(MonotonicallyIncreasingID())
val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")()
assertIndeterminancyComponent(alias)
assertIndeterminancyComponent(alias.toAttribute)
assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L)))
assertIndeterminancyComponent(
HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5))
assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5))
assertIndeterminancyComponent(
RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5))
assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5))
}

test("SPARK-51016: has Deterministic Component") {
def assertNoIndeterminancyComponent(expression: Expression): Unit =
assert(!prepareEvaluation(expression).exprValHasIndeterministicCharacter)

assertNoIndeterminancyComponent(Literal(1000L))
val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")()
assertNoIndeterminancyComponent(alias)
assertNoIndeterminancyComponent(alias.toAttribute)
assertNoIndeterminancyComponent(
HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5))
assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5))
assertNoIndeterminancyComponent(
RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5))
assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
Expand Down Expand Up @@ -467,7 +467,10 @@ object ShuffleExchangeExec {
}, isOrderSensitive = isOrderSensitive)
}
}

val isIndeterministic = newPartitioning match {
case expr: Expression => expr.exprValHasIndeterministicCharacter
case _ => false
}
// Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds
// are in the form of (partitionId, row) and every partitionId is in the expected range
// [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough.
Expand All @@ -476,7 +479,11 @@ object ShuffleExchangeExec {
rddWithPartitionIds,
new PartitionIdPassthrough(part.numPartitions),
serializer,
shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics))
None,
None,
false,
shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics),
isIndeterministic)

dependency
}
Expand Down
Loading