Skip to content

Suspend Index throttling when relocating #128797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dfe639f
pause indexing and race condition diags
ankikuma May 23, 2025
2601960
commit
ankikuma May 30, 2025
ec91a19
commit
ankikuma May 30, 2025
3ddb78b
refresh branch
ankikuma May 30, 2025
f12949e
commit
ankikuma Jun 2, 2025
90670f3
commit
ankikuma Jun 2, 2025
45e3799
commit
ankikuma Jun 3, 2025
cd43ab3
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 3, 2025
bf91cab
commit
ankikuma Jun 3, 2025
e642fea
address review comments
ankikuma Jun 4, 2025
a249357
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
a11d2fd
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
82a37f5
test failure
ankikuma Jun 4, 2025
560a035
remove commented code
ankikuma Jun 4, 2025
1bb089e
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
048f944
minor changes
ankikuma Jun 4, 2025
3e044bf
modified testRelocationWhileIndexingRandom
ankikuma Jun 4, 2025
7abedf2
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 4, 2025
76f59b6
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 4, 2025
4afc8c5
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
c5cce6e
update index settings
ankikuma Jun 5, 2025
8485a8e
test
ankikuma Jun 5, 2025
6315189
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
5e81c31
address comments
ankikuma Jun 5, 2025
661fa12
test
ankikuma Jun 5, 2025
77058ad
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 5, 2025
1d872c1
test
ankikuma Jun 5, 2025
129622a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jun 5, 2025
610bc0f
old changes
ankikuma Jun 5, 2025
e50c200
pull changes
ankikuma Jun 5, 2025
343d9f2
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 5, 2025
94f212a
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jun 5, 2025
6a2bf2b
fix test
ankikuma Jun 6, 2025
ee22887
[CI] Auto commit changes from spotless
elasticsearchmachine Jun 6, 2025
8e9d456
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jun 6, 2025
371dfbe
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 9, 2025
c9438b4
fix test + throttle only for primary
ankikuma Jul 11, 2025
31fe364
[CI] Auto commit changes from spotless
elasticsearchmachine Jul 11, 2025
9729ebf
Merge remote-tracking branch 'upstream/main' into 05192025/UnpauseInd…
ankikuma Jul 11, 2025
1134a5e
Merge branch '05192025/UnpauseIndexingForPermits' of github.com:ankik…
ankikuma Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.tests.util.English;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -36,12 +37,15 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.plugins.Plugin;
Expand Down Expand Up @@ -161,10 +165,88 @@ public void testSimpleRelocationNoIndexing() {
assertHitCount(prepareSearch("test").setSize(0), 20);
}

// This tests that relocation can successfully suspend index throttling to grab
// indexing permits required for relocation to succeed.
public void testSimpleRelocationWithIndexingPaused() throws Exception {
logger.info("--> starting [node1] ...");
// Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate
// index throttling for a shard on this node, it will pause indexing for that shard until throttling
// is deactivated.
final String node_1 = internalCluster().startNode(
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true)
);

logger.info("--> creating test index ...");
prepareCreate("test", indexSettings(1, 0)).get();

logger.info("--> index docs");
int numDocs = between(1, 10);
for (int i = 0; i < numDocs; i++) {
prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get();
}
logger.info("--> flush so we have an actual index");
indicesAdmin().prepareFlush().get();

logger.info("--> verifying count");
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), numDocs);

logger.info("--> start another node");
final String node_2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

// Activate index throttling on "test" index primary shard
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1);
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
shard.activateThrottling();
// Verify that indexing is paused for the throttled shard
Engine engine = shard.getEngineOrNull();
assertThat(engine != null && engine.isThrottled(), equalTo(true));

// Try to index a document into the "test" index which is currently throttled
logger.info("--> Try to index a doc while indexing is paused");
IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20);
var future = indexRequestBuilder.execute();
expectThrows(ElasticsearchException.class, () -> future.actionGet(500, TimeUnit.MILLISECONDS));
// Verify that the new document has not been indexed indicating that the indexing thread is paused.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wait for the thread to be blocked on the condition here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a change to wait for the future to complete, which times out. Were you thinking of something different ?

logger.info("--> verifying count is unchanged...");
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), numDocs);

logger.info("--> relocate the shard from node1 to node2");
updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", node_2), "test");
ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test");

// Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing
// the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed.
clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));

logger.info("--> verifying shard primary has relocated ...");
indicesService = internalCluster().getInstance(IndicesService.class, node_2);
shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
assertThat(shard.routingEntry().primary(), equalTo(true));
engine = shard.getEngineOrNull();
assertThat(engine != null && engine.isThrottled(), equalTo(false));
logger.info("--> verifying count after relocation ...");
future.actionGet();
indicesAdmin().prepareRefresh().get();
assertHitCount(prepareSearch("test").setSize(0), numDocs + 1);
}

public void testRelocationWhileIndexingRandom() throws Exception {
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
int numberOfReplicas = randomBoolean() ? 0 : 1;
int numberOfNodes = numberOfReplicas == 0 ? 2 : 3;
boolean throttleIndexing = randomBoolean();

logger.info(
"testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})",
Expand All @@ -173,16 +255,22 @@ public void testRelocationWhileIndexingRandom() throws Exception {
numberOfNodes
);

// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless
String[] nodes = new String[numberOfNodes];
logger.info("--> starting [node1] ...");
nodes[0] = internalCluster().startNode();
nodes[0] = internalCluster().startNode(
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())
);

logger.info("--> creating test index ...");
prepareCreate("test", indexSettings(1, numberOfReplicas)).get();

// Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless
for (int i = 2; i <= numberOfNodes; i++) {
logger.info("--> starting [node{}] ...", i);
nodes[i - 1] = internalCluster().startNode();
nodes[i - 1] = internalCluster().startNode(
Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean())
);
if (i != numberOfNodes) {
ClusterHealthResponse healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT)
.setWaitForEvents(Priority.LANGUID)
Expand All @@ -200,17 +288,37 @@ public void testRelocationWhileIndexingRandom() throws Exception {
logger.info("--> {} docs indexed", numDocs);

logger.info("--> starting relocations...");
int nodeShiftBased = numberOfReplicas; // if we have replicas shift those

// When we have a replica, the primary is on node 0 and replica is on node 1. We cannot move primary
// to a node containing the replica, so relocation of primary needs to happen between node 0 and 2.
// When there is no replica, we only have 2 nodes and primary relocates back and forth between node 0 and 1.
for (int i = 0; i < numberOfRelocations; i++) {
int fromNode = (i % 2);
int toNode = fromNode == 0 ? 1 : 0;
fromNode += nodeShiftBased;
toNode += nodeShiftBased;
if (numberOfReplicas == 1) {
fromNode = fromNode == 1 ? 2 : 0;
toNode = toNode == 1 ? 2 : 0;
}

numDocs = scaledRandomIntBetween(200, 1000);

// Throttle indexing on primary shard
if (throttleIndexing) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]);
IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0);
// Activate index throttling on "test" index primary shard
logger.info("--> activate throttling for shard on node {}...", nodes[fromNode]);
shard.activateThrottling();
// Verify that indexing is throttled for this shard
Engine engine = shard.getEngineOrNull();
assertThat(engine != null && engine.isThrottled(), equalTo(true));
}
logger.debug("--> Allow indexer to index [{}] documents", numDocs);
indexer.continueIndexing(numDocs);
logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]);

ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode]));

if (rarely()) {
logger.debug("--> flushing");
indicesAdmin().prepareFlush().get();
Expand All @@ -219,11 +327,13 @@ public void testRelocationWhileIndexingRandom() throws Exception {
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.setWaitForGreenStatus()
.get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
indexer.pauseIndexing();
logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode);
}

logger.info("--> done relocations");
logger.info("--> waiting for indexing threads to stop ...");
indexer.stopAndAwaitStopped();
Expand Down
26 changes: 22 additions & 4 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,10 @@ protected static final class IndexThrottle {
private final Condition pauseCondition = pauseIndexingLock.newCondition();
private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
private volatile AtomicBoolean suspendThrottling = new AtomicBoolean();
private final boolean pauseWhenThrottled; // Should throttling pause indexing ?

// Should throttling pause indexing ? This is decided by the
// IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node.
private final boolean pauseWhenThrottled;
private volatile ReleasableLock lock = NOOP_LOCK;

public IndexThrottle(boolean pause) {
Expand Down Expand Up @@ -514,7 +517,6 @@ public Releasable acquireThrottle() {
/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";

startOfThrottleNS = System.nanoTime();
if (pauseWhenThrottled) {
lock = pauseLockReference;
Expand Down Expand Up @@ -562,10 +564,14 @@ boolean isThrottled() {
return lock != NOOP_LOCK;
}

boolean isIndexingPaused() {
return (lock == pauseLockReference);
}

/** Suspend throttling to allow another task such as relocation to acquire all indexing permits */
public void suspendThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
try (Releasable ignored = pauseLockReference.acquire()) {
suspendThrottling.setRelease(true);
pauseCondition.signalAll();
}
Expand All @@ -575,7 +581,7 @@ public void suspendThrottle() {
/** Reverse what was done in {@link #suspendThrottle()} */
public void resumeThrottle() {
if (pauseWhenThrottled) {
try (Releasable releasableLock = pauseLockReference.acquire()) {
try (Releasable ignored = pauseLockReference.acquire()) {
suspendThrottling.setRelease(false);
pauseCondition.signalAll();
}
Expand Down Expand Up @@ -2281,6 +2287,18 @@ public interface Warmer {
*/
public abstract void deactivateThrottling();

/**
* If indexing is throttled to the point where it is paused completely,
* another task trying to get indexing permits might want to pause throttling
* by letting one thread pass at a time so that it does not get starved.
*/
public abstract void suspendThrottling();

/**
* Reverses a previous {@link #suspendThrottling} call.
*/
public abstract void resumeThrottling();

/**
* This method replays translog to restore the Lucene index which might be reverted previously.
* This ensures that all acknowledged writes are restored correctly when this engine is promoted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2862,6 +2862,16 @@ public void deactivateThrottling() {
}
}

@Override
public void suspendThrottling() {
throttle.suspendThrottle();
}

@Override
public void resumeThrottling() {
throttle.resumeThrottle();
}

@Override
public boolean isThrottled() {
return throttle.isThrottled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@ public void activateThrottling() {}
@Override
public void deactivateThrottling() {}

@Override
public void suspendThrottling() {}

@Override
public void resumeThrottling() {}

@Override
public void trimUnreferencedTranslogFiles() {}

Expand Down
63 changes: 59 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ public void relocated(
) throws IllegalIndexShardStateException, IllegalStateException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
indexShardOperationPermits.blockOperations(new ActionListener<>() {
blockOperations(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
boolean success = false;
Expand Down Expand Up @@ -888,8 +888,13 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
}, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by
// CancellableThreads and we want to be able to interrupt it
},
30L,
TimeUnit.MINUTES,
// Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it
EsExecutors.DIRECT_EXECUTOR_SERVICE
);

}
}

Expand Down Expand Up @@ -2750,6 +2755,7 @@ public IndexEventListener getIndexEventListener() {
* setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread.
*/
public void activateThrottling() {
assert shardRouting.primary() : "only primaries can be throttled: " + shardRouting;
try {
getEngine().activateThrottling();
} catch (AlreadyClosedException ex) {
Expand All @@ -2765,6 +2771,22 @@ public void deactivateThrottling() {
}
}

private void suspendThrottling() {
try {
getEngine().suspendThrottling();
} catch (AlreadyClosedException ex) {
// ignore
}
}

private void resumeThrottling() {
try {
getEngine().resumeThrottling();
} catch (AlreadyClosedException ex) {
// ignore
}
}

private void handleRefreshException(Exception e) {
if (e instanceof AlreadyClosedException) {
// ignore
Expand Down Expand Up @@ -3822,6 +3844,39 @@ private ActionListener<Releasable> wrapPrimaryOperationPermitListener(final Acti
});
}

/**
* Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all
* permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are
* started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in
* this case the {@code onFailure} handler will be invoked after delayed operations are released.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed. This listener should not throw.
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param executor executor on which to wait for in-flight operations to finish and acquire all permits
*/
public void blockOperations(
final ActionListener<Releasable> onAcquired,
final long timeout,
final TimeUnit timeUnit,
final Executor executor
) {
// In case indexing is paused on the shard, suspend throttling so that any currently paused task can
// go ahead and release the indexing permit it holds.
suspendThrottling();
try {
indexShardOperationPermits.blockOperations(
ActionListener.runAfter(onAcquired, this::resumeThrottling),
timeout,
timeUnit,
executor
);
} catch (IndexShardClosedException e) {
resumeThrottling();
throw e;
}
}

private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
Expand All @@ -3832,7 +3887,7 @@ private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, l
onPermitAcquired.onFailure(e);
});
try {
indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic());
} catch (Exception e) {
forceRefreshes.close();
throw e;
Expand Down