Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,10 @@ private[spark] class MapOutputTrackerMaster(
}
}

/** Unregister shuffle data */
/**
* Unregister shuffle metadata. This currently should only be called through
* `ContextCleaner` when the shuffle is considered no longer referenced anywhere.
*/
def unregisterShuffle(shuffleId: Int): Unit = {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
shuffleStatus.invalidateSerializedMapOutputStatusCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future}

import org.apache.spark.{MapOutputTracker, SparkEnv}
import org.apache.spark.{MapOutputTracker, MapOutputTrackerMaster, SparkEnv}
import org.apache.spark.internal.{Logging, MessageWithContext}
import org.apache.spark.internal.LogKeys.{BLOCK_ID, BROADCAST_ID, RDD_ID, SHUFFLE_ID}
import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv}
Expand Down Expand Up @@ -57,7 +57,14 @@ class BlockManagerStorageEndpoint(

case RemoveShuffle(shuffleId) =>
doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) {
if (mapOutputTracker != null) {
if (mapOutputTracker != null && !mapOutputTracker.isInstanceOf[MapOutputTrackerMaster]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in check be in unregistershuffle if we dont expect this to be called in master.
There can be someone else calling this same method on local mode in future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that. That way requires us to pass isLocal into MapOutputTrackerMaster, involves more changes. But I also agree it's safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with local mode, we can't clean up shuffle files only?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think shuffle cleanup still happens in local.
But the shuffle metadata cleanup only happens from ContextCleaner in driver.

// SPARK-53898: `MapOutputTrackerMaster.unregisterShuffle()` should only be called
// through `ContextCleaner` when the shuffle is considered no longer referenced anywhere.
// Otherwise, we might hit exceptions if there is any subsequent access (which still
// reference that shuffle) to that shuffle metadata in `MapOutputTrackerMaster`. E.g.,
// an ongoing subquery could access the same shuffle metadata which could have been
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should mention this, since the ideal behavior would be to terminate any subqueries when the main query completes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be a race even if we terminating the subquery?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main query has ended already. The subqueries results are anyway not going to be used (and is also a waste of resource in allowing it to continue).

Copy link
Member Author

@Ngone51 Ngone51 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we should cancel the running subquery and we should do it. My point is the running subquery could still access MapOutputTrackerMaster even if we cancel it right after the main query ends due to the race between them. So I think it's fine to mention it here.

// cleaned up after the main query completes. Note this currently only happens in local
// cluster where both driver and executor use the `MapOutputTrackerMaster`.
mapOutputTracker.unregisterShuffle(shuffleId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so for non-local cluster, mapOutputTracker is at executors and unregisterShuffle only unregister the shuffle at executor side?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On executors, it would call the MapOutputTrackerWorker#unregisterShuffle, where the shuffle status are not cleaned up unlike in MapOutputTrackerMaster#unregisterShuffle

In local, we just have MapoutputTrackerMaster and ends up cleaning the shufflestatuses

MapOutputTrackerMaster

  def unregisterShuffle(shuffleId: Int): Unit = {
    shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
      shuffleStatus.invalidateSerializedMapOutputStatusCache()
      shuffleStatus.invalidateSerializedMergeOutputStatusCache()
    }
  }

MapOutputTrackerWorker

def unregisterShuffle(shuffleId: Int): Unit = {
    mapStatuses.remove(shuffleId)
    mergeStatuses.remove(shuffleId)
    shufflePushMergerLocations.remove(shuffleId)
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so how do we clean up shuffle files with local mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle files are cleaned by rpc RemoveShuffle, sent from driver to executors and handled by BlockManagerStorageEndpoint that deletes the files on the disk. And the rpc RemoveShuffle nowadays can be raised in two ways: 1) ContextCleaner 2) Shuffle Cleanup feature at the end of a SQL query. These are the same for all the modes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so shuffle files are already cleaned up before we reach here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea: shall we add a new method clearShuffleStatusCache and call it here? The executor side shuffle status is more like a cache and the driver side one is single source of truth. MapOutputTrackerMaster#clearShuffleStatusCache is noop.

Copy link
Member Author

@Ngone51 Ngone51 Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, so shuffle files are already cleaned up before we reach here?

No. We're handling the RemoveShuffle rpc in BlockManagerStorageEndpoint right at this point. The files are deteleted at the line 72 by shuffleManager.unregisterShuffle(shuffleId).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea: shall we add a new method clearShuffleStatusCache and call it here? The executor side shuffle status is more like a cache and the driver side one is single source of truth.

Actaully we already do this in non-local mode. In non-local mode, we call MapOutputTrackerWorker. unregisterShuffle() to clean up the statues (that's exactly what line 68 does). In local mode, we don't use MapOutputTrackerWorker so there is no cached statuses to clean.

Copy link
Contributor

@cloud-fan cloud-fan Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it's the same thing, but clearShuffleStatusCache is more explicit and clear than skipping unregisterShuffle if the instance is MapOutputTrackerWorker.

}
val shuffleManager = SparkEnv.get.shuffleManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import scala.collection.mutable
import scala.io.Source
import scala.util.Try

import org.apache.spark.MapOutputTrackerMaster
import org.apache.spark.sql.{AnalysisException, ExtendedExplainGenerator, FastOperator, SaveMode}
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, UnresolvedFunction, UnresolvedRelation}
Expand Down Expand Up @@ -320,11 +321,15 @@ class QueryExecutionSuite extends SharedSparkSession {

private def cleanupShuffles(): Unit = {
val blockManager = spark.sparkContext.env.blockManager
val mapOutputTrackerMaster =
spark.sparkContext.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
blockManager.diskBlockManager.getAllBlocks().foreach {
case ShuffleIndexBlockId(shuffleId, _, _) =>
spark.sparkContext.shuffleDriverComponents.removeShuffle(shuffleId, true)
case _ =>
}
// Shuffle cleanup should not clean up shuffle metadata on the driver
assert(mapOutputTrackerMaster.shuffleStatuses.nonEmpty)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests fails this assert before the fix.

}

test("SPARK-53413: Cleanup shuffle dependencies for commands") {
Expand Down