-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53898][CORE] Shuffle cleanup should not clean MapOutputTrackerMaster.shuffleStatuses in local cluster #52606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
case _ => | ||
} | ||
// Shuffle cleanup should not clean up shuffle metadata on the driver | ||
assert(mapOutputTrackerMaster.shuffleStatuses.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests fails this assert before the fix.
// through `ContextCleaner` when the shuffle is considered no longer referenced anywhere. | ||
// Otherwise, we might hit exceptions if there is any subsequent access (which still | ||
// reference that shuffle) to that shuffle metadata in `MapOutputTrackerMaster`. E.g., | ||
// an ongoing subquery could access the same shuffle metadata which could have been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we should mention this, since the ideal behavior would be to terminate any subqueries when the main query completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could be a race even if we terminating the subquery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main query has ended already. The subqueries results are anyway not going to be used (and is also a waste of resource in allowing it to continue).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand we should cancel the running subquery and we should do it. My point is the running subquery could still access MapOutputTrackerMaster
even if we cancel it right after the main query ends due to the race between them. So I think it's fine to mention it here.
case RemoveShuffle(shuffleId) => | ||
doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) { | ||
if (mapOutputTracker != null) { | ||
if (mapOutputTracker != null && !mapOutputTracker.isInstanceOf[MapOutputTrackerMaster]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be in check be in unregistershuffle
if we dont expect this to be called in master.
There can be someone else calling this same method on local mode in future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about that. That way requires us to pass isLocal
into MapOutputTrackerMaster
, involves more changes. But I also agree it's safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with local mode, we can't clean up shuffle files only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think shuffle cleanup still happens in local.
But the shuffle metadata cleanup only happens from ContextCleaner
in driver.
cc: @cloud-fan |
// an ongoing subquery could access the same shuffle metadata which could have been | ||
// cleaned up after the main query completes. Note this currently only happens in local | ||
// cluster where both driver and executor use the `MapOutputTrackerMaster`. | ||
mapOutputTracker.unregisterShuffle(shuffleId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so for non-local cluster, mapOutputTracker
is at executors and unregisterShuffle
only unregister the shuffle at executor side?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On executors, it would call the MapOutputTrackerWorker#unregisterShuffle, where the shuffle status are not cleaned up unlike in MapOutputTrackerMaster#unregisterShuffle
In local, we just have MapoutputTrackerMaster and ends up cleaning the shufflestatuses
MapOutputTrackerMaster
def unregisterShuffle(shuffleId: Int): Unit = {
shuffleStatuses.remove(shuffleId).foreach { shuffleStatus =>
shuffleStatus.invalidateSerializedMapOutputStatusCache()
shuffleStatus.invalidateSerializedMergeOutputStatusCache()
}
}
MapOutputTrackerWorker
def unregisterShuffle(shuffleId: Int): Unit = {
mapStatuses.remove(shuffleId)
mergeStatuses.remove(shuffleId)
shufflePushMergerLocations.remove(shuffleId)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so how do we clean up shuffle files with local mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shuffle files are cleaned by rpc RemoveShuffle
, sent from driver to executors and handled by BlockManagerStorageEndpoint
that deletes the files on the disk. And the rpc RemoveShuffle
nowadays can be raised in two ways: 1) ContextCleaner
2) Shuffle Cleanup feature at the end of a SQL query. These are the same for all the modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so shuffle files are already cleaned up before we reach here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea: shall we add a new method clearShuffleStatusCache
and call it here? The executor side shuffle status is more like a cache and the driver side one is single source of truth. MapOutputTrackerMaster#clearShuffleStatusCache
is noop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, so shuffle files are already cleaned up before we reach here?
No. We're handling the RemoveShuffle
rpc in BlockManagerStorageEndpoint
right at this point. The files are deteleted at the line 72 by shuffleManager.unregisterShuffle(shuffleId)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea: shall we add a new method clearShuffleStatusCache and call it here? The executor side shuffle status is more like a cache and the driver side one is single source of truth.
Actaully we already do this in non-local mode. In non-local mode, we call MapOutputTrackerWorker. unregisterShuffle()
to clean up the statues (that's exactly what line 68 does). In local mode, we don't use MapOutputTrackerWorker
so there is no cached statuses to clean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea it's the same thing, but clearShuffleStatusCache
is more explicit and clear than skipping unregisterShuffle
if the instance is MapOutputTrackerWorker
.
What changes were proposed in this pull request?
This PR fixes a bug where
MapOutputTrackerMaster.shuffleStatuses
is mistakenly cleaned up by Shuffle Cleanup feature in local cluster. The fix is done by avoid invokingmapOutputTracker.unregisterShuffle()
inBlockManagerStorageEndpoint
whenmapOutputTracker
isMapOutputTrackerMaster
as it only happens in local cluster (non-local cluster should useMapOutputTrackerWorker
instead).Why are the changes needed?
MapOutputTrackerMaster.shuffleStatuses
should only be cleaned whenContextCleaner
considers the shuffle is no longer referenced anywhere. Otherwise, any subsequent access (which still reference that shuffle) to the same shuffle metadata inMapOutputTrackerMaster
can lead toSparkException
and crash theSparkContext
. Note this currently only happens in local cluster due to both driver and executor use theMapOutputTrackerMaster
. E.g., an ongoing subquery could access the same shuffle metadata which could have been removed after the main query completes. See the detailed discussion at #52213 (comment).Does this PR introduce any user-facing change?
No.
How was this patch tested?
Updated the existing tests.
Was this patch authored or co-authored using generative AI tooling?
No.