Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1792][FOLLOWUP] Keep resume for a while after resumeByPinnedMemory #3099

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerDirectMemoryPressureCheckIntervalMs: Long = get(WORKER_DIRECT_MEMORY_CHECK_INTERVAL)
def workerPinnedMemoryCheckEnabled: Boolean = get(WORKER_PINNED_MEMORY_CHECK_ENABLED)
def workerPinnedMemoryCheckIntervalMs: Long = get(WORKER_PINNED_MEMORY_CHECK_INTERVAL)
def workerResumeByPinnedMemoryKeepTime: Long = get(WORKER_RESUME_BY_PINNED_MEMORY_KEEP_TIME)
def workerDirectMemoryReportIntervalSecond: Long = get(WORKER_DIRECT_MEMORY_REPORT_INTERVAL)
def workerDirectMemoryTrimChannelWaitInterval: Long =
get(WORKER_DIRECT_MEMORY_TRIM_CHANNEL_WAIT_INTERVAL)
Expand Down Expand Up @@ -3740,6 +3741,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

val WORKER_RESUME_BY_PINNED_MEMORY_KEEP_TIME: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.resumeByPinnedMemory.keepTime")
.categories("worker")
.doc("Time of worker to stay in resume state after resumeByPinnedMemory, " +
"prevent workers from repeatedly switching between resume and pause states")
.version("0.6.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val WORKER_DIRECT_MEMORY_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.monitor.memory.report.interval")
.withAlternative("celeborn.worker.memory.reportInterval")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ license: |
| celeborn.worker.monitor.memory.trimFlushWaitInterval | 1s | false | Wait time after worker trigger StorageManger to flush data. | 0.3.0 | |
| celeborn.worker.monitor.pinnedMemory.check.enabled | true | false | If true, MemoryManager will check worker should resume by pinned memory used. | 0.6.0 | |
| celeborn.worker.monitor.pinnedMemory.check.interval | 10s | false | Interval of worker direct pinned memory checking, only takes effect when celeborn.network.memory.allocator.pooled and celeborn.worker.monitor.pinnedMemory.check.enabled are enabled. | 0.6.0 | |
| celeborn.worker.monitor.resumeByPinnedMemory.keepTime | 1s | false | Time of worker to stay in resume state after resumeByPinnedMemory, prevent workers from repeatedly switching between resume and pause states | 0.6.0 | |
| celeborn.worker.partition.initial.readBuffersMax | 1024 | false | Max number of initial read buffers | 0.3.0 | |
| celeborn.worker.partition.initial.readBuffersMin | 1 | false | Min number of initial read buffers | 0.3.0 | |
| celeborn.worker.partitionSorter.directMemoryRatioThreshold | 0.1 | false | Max ratio of partition sorter's memory for sorting, when reserved memory is higher than max partition sorter memory, partition sorter will stop sorting. If this value is set to 0, partition files sorter will skip memory check and ServingState check. | 0.2.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class MemoryManager {
private boolean pinnedMemoryCheckEnabled;
private long pinnedMemoryCheckInterval;
private long pinnedMemoryLastCheckTime = 0;
private long workerResumeByPinnedMemoryKeepTime;

@VisibleForTesting
public static MemoryManager initialize(CelebornConf conf) {
Expand Down Expand Up @@ -134,6 +135,7 @@ private MemoryManager(CelebornConf conf, StorageManager storageManager, Abstract
long checkInterval = conf.workerDirectMemoryPressureCheckIntervalMs();
this.pinnedMemoryCheckEnabled = conf.workerPinnedMemoryCheckEnabled();
this.pinnedMemoryCheckInterval = conf.workerPinnedMemoryCheckIntervalMs();
this.workerResumeByPinnedMemoryKeepTime = conf.workerResumeByPinnedMemoryKeepTime();
long reportInterval = conf.workerDirectMemoryReportIntervalSecond();
double readBufferTargetRatio = conf.readBufferTargetRatio();
long readBufferTargetUpdateInterval = conf.readBufferTargetUpdateInterval();
Expand Down Expand Up @@ -338,6 +340,8 @@ public void switchServingState() {
case PUSH_PAUSED:
if (canResumeByPinnedMemory()) {
resumeByPinnedMemory(servingState);
} else if (keepResumeByPinnedMemory()) {
// do nothing, keep resume for a while
} else {
pausePushDataCounter.increment();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
Expand All @@ -363,6 +367,8 @@ public void switchServingState() {
case PUSH_AND_REPLICATE_PAUSED:
if (canResumeByPinnedMemory()) {
resumeByPinnedMemory(servingState);
} else if (keepResumeByPinnedMemory()) {
// do nothing, keep resume for a while
} else {
pausePushDataAndReplicateCounter.increment();
logger.info("Trigger action: PAUSE PUSH");
Expand Down Expand Up @@ -610,6 +616,12 @@ && getPinnedMemory() / (double) (maxDirectMemory) < pinnedMemoryResumeRatio) {
}
}

private boolean keepResumeByPinnedMemory() {
return pinnedMemoryCheckEnabled
&& System.currentTimeMillis() - pinnedMemoryLastCheckTime
< workerResumeByPinnedMemoryKeepTime;
}

private void resumePush() {
logger.info("Trigger action: RESUME PUSH");
memoryPressureListeners.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.celeborn.service.deploy.memory

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.DurationInt

import org.mockito.{Mockito, MockitoSugar}
Expand Down Expand Up @@ -216,6 +218,71 @@ class MemoryManagerSuite extends CelebornFunSuite {
MemoryManager.reset()
}

test("[CELEBORN-1792] Test MemoryManager keep resume a while by pinned memory") {
val conf = new CelebornConf()
conf.set(CelebornConf.WORKER_DIRECT_MEMORY_CHECK_INTERVAL.key, "300s")
conf.set(CelebornConf.WORKER_PINNED_MEMORY_CHECK_INTERVAL.key, "0")
conf.set(CelebornConf.WORKER_RESUME_BY_PINNED_MEMORY_KEEP_TIME.key, "1s")
MemoryManager.reset()
val memoryManager = MockitoSugar.spy(MemoryManager.initialize(conf))
val maxDirectorMemory = memoryManager.maxDirectMemory
val pushThreshold =
(conf.workerDirectMemoryRatioToPauseReceive * maxDirectorMemory).longValue()
val replicateThreshold =
(conf.workerDirectMemoryRatioToPauseReplicate * maxDirectorMemory).longValue()
val channelsLimiter = new MockChannelsLimiter()
memoryManager.registerMemoryListener(channelsLimiter)

// NONE PAUSED -> PAUSE PUSH
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_PAUSED)

// KEEP PAUSE PUSH, BUT CHANNELS KEEP RESUME
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(pushThreshold + 1)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_PAUSED)

TimeUnit.SECONDS.sleep(1)
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(pushThreshold + 1)
memoryManager.switchServingState()
assert(!channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_PAUSED)

Mockito.when(memoryManager.getMemoryUsage).thenReturn(0L)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.NONE_PAUSED)

// NONE PAUSED -> PAUSE PUSH AND REPLICATE
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(replicateThreshold + 1)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_AND_REPLICATE_PAUSED)

// KEEP PAUSE PUSH AND REPLICATE, BUT CHANNELS KEEP RESUME
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(replicateThreshold + 1)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_AND_REPLICATE_PAUSED)

TimeUnit.SECONDS.sleep(1)
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(replicateThreshold + 1)
memoryManager.switchServingState()
assert(!channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.PUSH_AND_REPLICATE_PAUSED)

Mockito.when(memoryManager.getMemoryUsage).thenReturn(0L)
memoryManager.switchServingState()
assert(channelsLimiter.isResume)
assert(memoryManager.servingState == ServingState.NONE_PAUSED)
MemoryManager.reset()
}

class MockMemoryPressureListener(
val belongModuleName: String,
var isPause: Boolean = false) extends MemoryPressureListener {
Expand All @@ -235,4 +302,19 @@ class MemoryManagerSuite extends CelebornFunSuite {
// do nothing
}
}

class MockChannelsLimiter(var isResume: Boolean = false) extends MemoryPressureListener {
override def onPause(moduleName: String): Unit = {
isResume = false
}

override def onResume(moduleName: String): Unit = {
isResume = true
}

override def onTrim(): Unit = {
// do nothing
}
}

}
Loading