Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] clean failed shuffle disk #3109

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.spark
import java.util
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import java.util.concurrent.atomic.AtomicReference

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.scheduler.{RunningStageManager, RunningStageManagerImpl}

import org.apache.celeborn.client.LifecycleManager
import org.apache.celeborn.common.internal.Logging

private[celeborn] object FailedShuffleCleaner extends Logging {

private val lifecycleManager = new AtomicReference[LifecycleManager](null)
// in celeborn ids
private val shufflesToBeCleand = new LinkedBlockingQueue[Int]()
private val cleanedShuffleIds = new mutable.HashSet[Int]
// celeborn shuffle id to stage id referred to it
private[celeborn] val celebornShuffleIdToReferringStages =
new ConcurrentHashMap[Int, mutable.HashSet[Int]]()

private val lock = new Object
val RUNNING_STAGE_CHECKER_CLASS = "CELEBORN_TEST_RUNNING_STAGE_CHECKER_IMPL"

private[celeborn] var runningStageManager: RunningStageManager = buildRunningStageChecker()

// for testing
private def buildRunningStageChecker(): RunningStageManager = {
if (System.getProperty(RUNNING_STAGE_CHECKER_CLASS) == null) {
new RunningStageManagerImpl
} else {
val className = System.getProperty(RUNNING_STAGE_CHECKER_CLASS)
val claz = Class.forName(className)
claz.getDeclaredConstructor().newInstance().asInstanceOf[RunningStageManager]
}
}

// for test
def reset(): Unit = {
lifecycleManager.set(null)
shufflesToBeCleand.clear()
cleanedShuffleIds.clear()
celebornShuffleIdToReferringStages.clear()
runningStageManager = buildRunningStageChecker()
}

def addShuffleIdReferringStage(celebornShuffleId: Int, appShuffleIdentifier: String): Unit = {
// this is only implemented/tested with Spark for now
val Array(_, stageId, _) = appShuffleIdentifier.split('-')
celebornShuffleIdToReferringStages.putIfAbsent(celebornShuffleId, new mutable.HashSet[Int]())
lock.synchronized {
celebornShuffleIdToReferringStages.get(celebornShuffleId).add(stageId.toInt)
}
}

private def onlyCurrentStageReferred(celebornShuffleId: Int, stageId: Int): Boolean = {
val ret = celebornShuffleIdToReferringStages.get(celebornShuffleId).size == 1 &&
celebornShuffleIdToReferringStages.get(celebornShuffleId).contains(stageId)
if (ret) {
logInfo(s"only stage $stageId refers to shuffle $celebornShuffleId, adding for clean up")
}
ret
}

def addShuffleIdToBeCleaned(
celebornShuffleId: Int,
appShuffleIdentifier: String): Unit = {
val Array(appShuffleId, stageId, _) = appShuffleIdentifier.split('-')
lifecycleManager.get().getShuffleIdMapping.get(appShuffleId.toInt).foreach {
case (pastAppShuffleIdentifier, (celebornShuffleId, _)) => {
if (!celebornShuffleIdToReferringStages.containsKey(celebornShuffleId)
|| onlyCurrentStageReferred(celebornShuffleId, stageId.toInt)
|| noRunningDownstreamStage(celebornShuffleId)
|| !committedSuccessfully(celebornShuffleId)) {
val Array(_, stageId, attemptId) = pastAppShuffleIdentifier.split('-')
shufflesToBeCleand.put(celebornShuffleId)
}
}
}
}

private def committedSuccessfully(celebornShuffleId: Int): Boolean = {
val ret = !lifecycleManager.get().commitManager.getCommitHandler(celebornShuffleId)
.isStageDataLost(celebornShuffleId)
if (!ret) {
logInfo(s"shuffle $celebornShuffleId is failed to commit, adding for cleaning up")
}
ret
}

def setLifecycleManager(ref: LifecycleManager): Unit = {
lifecycleManager.compareAndSet(null, ref)
}

private def noRunningDownstreamStage(shuffleId: Int): Boolean = {
val allReferringStageIds = celebornShuffleIdToReferringStages.get(shuffleId)
require(allReferringStageIds != null, s"no stage referring to shuffle $shuffleId")
val ret =
allReferringStageIds.count(stageId => runningStageManager.isRunningStage(stageId)) == 0
if (ret) {
logInfo(s"no running downstream stages refers to $shuffleId")
} else {
logInfo(s"there is more than one running downstream stage referring to shuffle $shuffleId," +
s" ignore it for cleanup ")
}
ret
}

private val cleanerThread = new Thread() {
override def run(): Unit = {
while (true) {
val allShuffleIds = new util.ArrayList[Int]
shufflesToBeCleand.drainTo(allShuffleIds)
allShuffleIds.asScala.foreach { shuffleId =>
if (!cleanedShuffleIds.contains(shuffleId)) {
lifecycleManager.get().unregisterShuffle(shuffleId)
logInfo(s"sent unregister shuffle request for shuffle $shuffleId (celeborn shuffle id)")
cleanedShuffleIds += shuffleId
}
}
Thread.sleep(1000)
}
}
}

cleanerThread.setName("shuffle cleaner thread")
cleanerThread.setDaemon(true)
cleanerThread.start()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

object SparkContextHelper {

def env: SparkEnv = {
assert(SparkContext.getActive.isDefined)
SparkContext.getActive.get.env
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler

import org.apache.spark.SparkContext

trait RunningStageManager {
def isRunningStage(stageId: Int): Boolean
}

class RunningStageManagerImpl extends RunningStageManager {
private def dagScheduler = SparkContext.getActive.get.dagScheduler
override def isRunningStage(stageId: Int): Boolean = {
dagScheduler.runningStages.map(_.id).contains(stageId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ private void initializeLifecycleManager(String appId) {

lifecycleManager.registerShuffleTrackerCallback(
shuffleId -> SparkUtils.unregisterAllMapOutput(mapOutputTracker, shuffleId));
if (lifecycleManager.conf().clientFetchCleanFailedShuffle()) {
lifecycleManager.registerGetShuffleIdForWriterCallback(
(celebornShuffleId, appShuffleIdentifier) ->
SparkUtils.addWriterShuffleIdsToBeCleaned(
lifecycleManager, celebornShuffleId, appShuffleIdentifier));
lifecycleManager.registerGetShuffleIdForReaderCallback(
(celebornShuffleId, appShuffleIdentifier) ->
SparkUtils.addShuffleIdRefCount(
lifecycleManager, celebornShuffleId, appShuffleIdentifier));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.LifecycleManager;
import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.util.JavaUtils;
import org.apache.celeborn.reflect.DynConstructors;
import org.apache.celeborn.reflect.DynFields;
import org.apache.celeborn.reflect.DynMethods;
import org.apache.celeborn.spark.FailedShuffleCleaner;

public class SparkUtils {
private static final Logger LOG = LoggerFactory.getLogger(SparkUtils.class);
Expand Down Expand Up @@ -462,4 +464,16 @@ public static void addSparkListener(SparkListener listener) {
sparkContext.addSparkListener(listener);
}
}

public static void addWriterShuffleIdsToBeCleaned(
LifecycleManager lifecycleManager, int celebornShuffeId, String appShuffleIdentifier) {
FailedShuffleCleaner.setLifecycleManager(lifecycleManager);
FailedShuffleCleaner.addShuffleIdToBeCleaned(celebornShuffeId, appShuffleIdentifier);
}

public static void addShuffleIdRefCount(
LifecycleManager lifecycleManager, int celebornShuffeId, String appShuffleIdentifier) {
FailedShuffleCleaner.setLifecycleManager(lifecycleManager);
FailedShuffleCleaner.addShuffleIdReferringStage(celebornShuffeId, appShuffleIdentifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
logInfo(s"reuse existing shuffleId $id for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
id
} else {
// this branch means it is a redo of previous write stage
if (isBarrierStage) {
// unregister previous shuffle(s) which are still valid
val mapUpdates = shuffleIds.filter(_._2._2).map { kv =>
Expand All @@ -905,6 +906,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}
val newShuffleId = shuffleIdGenerator.getAndIncrement()
logInfo(s"generate new shuffleId $newShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier")
getShuffleIdForWriterCallback.foreach(callback =>
callback.accept(newShuffleId, appShuffleIdentifier))
shuffleIds.put(appShuffleIdentifier, (newShuffleId, true))
newShuffleId
}
Expand All @@ -918,11 +921,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
} else {
shuffleIds.values.filter(v => v._2).map(v => v._1).toSeq.reverse.find(
areAllMapTasksEnd) match {
case Some(shuffleId) =>
case Some(celebornShuffleId) =>
getShuffleIdForReaderCallback.foreach(callback =>
callback.accept(celebornShuffleId, appShuffleIdentifier))
val pbGetShuffleIdResponse = {
logDebug(
s"get shuffleId $shuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
PbGetShuffleIdResponse.newBuilder().setShuffleId(shuffleId).build()
s"get shuffleId $celebornShuffleId for appShuffleId $appShuffleId appShuffleIdentifier $appShuffleIdentifier isWriter $isWriter")
PbGetShuffleIdResponse.newBuilder().setShuffleId(celebornShuffleId).build()
}
context.reply(pbGetShuffleIdResponse)
case None =>
Expand Down Expand Up @@ -1805,6 +1810,17 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
appShuffleTrackerCallback = Some(callback)
}

// expecting celeborn shuffle id and application shuffle identifier
@volatile private var getShuffleIdForWriterCallback: Option[BiConsumer[Integer, String]] = None
def registerGetShuffleIdForWriterCallback(callback: BiConsumer[Integer, String]): Unit = {
getShuffleIdForWriterCallback = Some(callback)
}
// expecting celeborn shuffle id and application shuffle identifier
@volatile private var getShuffleIdForReaderCallback: Option[BiConsumer[Integer, String]] = None
def registerGetShuffleIdForReaderCallback(callback: BiConsumer[Integer, String]): Unit = {
getShuffleIdForReaderCallback = Some(callback)
}

def registerAppShuffleDeterminate(appShuffleId: Int, determinate: Boolean): Unit = {
appShuffleDeterminateMap.put(appShuffleId, determinate)
}
Expand Down Expand Up @@ -1844,4 +1860,5 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
case _ =>
}

def getShuffleIdMapping = shuffleIdMapping
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ReducePartitionCommitHandler(

private val getReducerFileGroupRequest =
JavaUtils.newConcurrentHashMap[Int, util.Set[RpcCallContext]]()
private val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private[celeborn] val dataLostShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val stageEndShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val inProcessStageEndShuffleSet = ConcurrentHashMap.newKeySet[Int]()
private val shuffleMapperAttempts = JavaUtils.newConcurrentHashMap[Int, Array[Int]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def clientFetchMaxRetriesForEachReplica: Int = get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientStageRerunEnabled: Boolean = get(CLIENT_STAGE_RERUN_ENABLED)
def clientFetchCleanFailedShuffle: Boolean = get(CLIENT_FETCH_CLEAN_FAILED_SHUFFLE)
def clientFetchExcludeWorkerOnFailureEnabled: Boolean =
get(CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED)
def clientFetchExcludedWorkerExpireTimeout: Long =
Expand Down Expand Up @@ -4689,6 +4690,14 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val CLIENT_FETCH_CLEAN_FAILED_SHUFFLE: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.fetch.cleanFailedShuffle")
.categories("client")
.version("0.6.0")
.doc("whether to clean those disk space occupied by shuffles which cannot be fetched")
.booleanConf
.createWithDefault(false)

val CLIENT_FETCH_EXCLUDE_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.fetch.excludeWorkerOnFailure.enabled")
.categories("client")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ license: |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether to revise lost shuffles. | 0.6.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 | |
| celeborn.client.spark.push.dynamicWriteMode.enabled | false | false | Whether to dynamically switch push write mode based on conditions.If true, shuffle mode will be only determined by partition count | 0.5.0 | |
| celeborn.client.spark.push.dynamicWriteMode.partitionNum.threshold | 2000 | false | Threshold of shuffle partition number for dynamically switching push writer mode. When the shuffle partition number is greater than this value, use the sort-based shuffle writer for memory efficiency; otherwise use the hash-based shuffle writer for speed. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is true. | 0.5.0 | |
| celeborn.client.spark.push.sort.memory.maxMemoryFactor | 0.4 | false | the max portion of executor memory which can be used for SortBasedWriter buffer (only valid when celeborn.client.spark.push.sort.memory.useAdaptiveThreshold is enabled | 0.5.0 | |
Expand Down
Loading
Loading