Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/celeborn
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 838c823163030037f3223e3e9ae3afd62d153a3e
Choose a base ref
..
head repository: apache/celeborn
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 27f88d3042b42f1c7b7e678cde243155bbdc065f
Choose a head ref
Original file line number Diff line number Diff line change
@@ -353,18 +353,34 @@ public void switchServingState() {
} else {
pausePushDataCounter.increment();
if (lastState == ServingState.PUSH_AND_REPLICATE_PAUSED) {
logger.info("Trigger action: RESUME REPLICATE");
resumeReplicate();
} else if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pausePushDataStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
}
pausePush();
}
trimAllListeners();
break;
case PUSH_AND_REPLICATE_PAUSED:
if (canResumeByPinnedMemory()) {
resumeByPinnedMemory(servingState);
} else {
pausePush();
pauseReplicate();
pausePushDataAndReplicateCounter.increment();
if (lastState == ServingState.NONE_PAUSED) {
logger.info("Trigger action: PAUSE PUSH");
pausePushDataAndReplicateStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
}
logger.info("Trigger action: PAUSE REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
}
trimAllListeners();
break;
@@ -605,21 +621,6 @@ private void resumeReplicate() {
memoryPressureListener.onResume(TransportModuleConstants.REPLICATE_MODULE));
}

private void pausePush() {
logger.info("Trigger action: PAUSE PUSH");
pausePushDataStartTime = System.currentTimeMillis();
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.PUSH_MODULE));
}

private void pauseReplicate() {
logger.info("Trigger action: PAUSE REPLICATE");
memoryPressureListeners.forEach(
memoryPressureListener ->
memoryPressureListener.onPause(TransportModuleConstants.REPLICATE_MODULE));
}

public interface MemoryPressureListener {
void onPause(String moduleName);

Original file line number Diff line number Diff line change
@@ -173,50 +173,32 @@ class MemoryManagerSuite extends CelebornFunSuite {
memoryManager.registerMemoryListener(replicateListener)

// NONE PAUSED -> PAUSE PUSH
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(pushThreshold + 1)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
memoryManager.switchServingState()
assert(pushListener.isPause)
assert(!replicateListener.isPause)

// RESUME
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
memoryManager.switchServingState()
assert(!pushListener.isPause)

// PAUSE PUSH -> PAUSE PUSH AND REPLICATE
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(replicateThreshold + 1)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(replicateThreshold + 1)
memoryManager.switchServingState()
assert(pushListener.isPause)
assert(replicateListener.isPause)

// RESUME
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
memoryManager.switchServingState()
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
assert(memoryManager.servingState == ServingState.PUSH_PAUSED)

// KEEP PAUSE PUSH AND REPLICATE
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(replicateThreshold + 1)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(0L)
memoryManager.switchServingState()
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
assert(memoryManager.servingState == ServingState.NONE_PAUSED)

// PAUSE PUSH AND REPLICATE -> PAUSE PUSH
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(pushThreshold + 1)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(pushThreshold + 1)
// NONE PAUSED -> PAUSE PUSH AND REPLICATE
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(replicateThreshold + 1)
memoryManager.switchServingState()
assert(pushListener.isPause)
assert(!pushListener.isPause)
assert(!replicateListener.isPause)
assert(memoryManager.servingState == ServingState.PUSH_AND_REPLICATE_PAUSED)

// PAUSE PUSH -> NONE PAUSED
Mockito.when(memoryManager.getNettyPinnedDirectMemory).thenReturn(0L)
Mockito.when(memoryManager.getMemoryUsage).thenReturn(0L)
memoryManager.switchServingState()
assert(!pushListener.isPause)
assert(!replicateListener.isPause)

assert(memoryManager.servingState == ServingState.NONE_PAUSED)
MemoryManager.reset()
}