diff --git a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java index febe1e9c536c5..df50d7fb0603e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/recovery/RelocationIT.java @@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.tests.util.English; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; @@ -36,12 +37,15 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndexingMemoryController; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; @@ -161,10 +165,88 @@ public void testSimpleRelocationNoIndexing() { assertHitCount(prepareSearch("test").setSize(0), 20); } + // This tests that relocation can successfully suspend index throttling to grab + // indexing permits required for relocation to succeed. + public void testSimpleRelocationWithIndexingPaused() throws Exception { + logger.info("--> starting [node1] ..."); + // Start node with PAUSE_INDEXING_ON_THROTTLE setting set to true. This means that if we activate + // index throttling for a shard on this node, it will pause indexing for that shard until throttling + // is deactivated. + final String node_1 = internalCluster().startNode( + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), true) + ); + + logger.info("--> creating test index ..."); + prepareCreate("test", indexSettings(1, 0)).get(); + + logger.info("--> index docs"); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value" + i).get(); + } + logger.info("--> flush so we have an actual index"); + indicesAdmin().prepareFlush().get(); + + logger.info("--> verifying count"); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), numDocs); + + logger.info("--> start another node"); + final String node_2 = internalCluster().startNode(); + ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNodes("2") + .get(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + // Activate index throttling on "test" index primary shard + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node_1); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + shard.activateThrottling(); + // Verify that indexing is paused for the throttled shard + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); + + // Try to index a document into the "test" index which is currently throttled + logger.info("--> Try to index a doc while indexing is paused"); + IndexRequestBuilder indexRequestBuilder = prepareIndex("test").setId(Integer.toString(20)).setSource("field", "value" + 20); + var future = indexRequestBuilder.execute(); + expectThrows(ElasticsearchException.class, () -> future.actionGet(500, TimeUnit.MILLISECONDS)); + // Verify that the new document has not been indexed indicating that the indexing thread is paused. + logger.info("--> verifying count is unchanged..."); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), numDocs); + + logger.info("--> relocate the shard from node1 to node2"); + updateIndexSettings(Settings.builder().put("index.routing.allocation.include._name", node_2), "test"); + ensureGreen(ACCEPTABLE_RELOCATION_TIME, "test"); + + // Relocation will suspend throttling for the paused shard, allow the indexing thread to proceed, thereby releasing + // the indexing permit it holds, in turn allowing relocation to acquire the permits and proceed. + clusterHealthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) + .setWaitForEvents(Priority.LANGUID) + .setWaitForNoRelocatingShards(true) + .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .get(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> verifying shard primary has relocated ..."); + indicesService = internalCluster().getInstance(IndicesService.class, node_2); + shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + assertThat(shard.routingEntry().primary(), equalTo(true)); + engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(false)); + logger.info("--> verifying count after relocation ..."); + future.actionGet(); + indicesAdmin().prepareRefresh().get(); + assertHitCount(prepareSearch("test").setSize(0), numDocs + 1); + } + public void testRelocationWhileIndexingRandom() throws Exception { int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4); int numberOfReplicas = randomBoolean() ? 0 : 1; int numberOfNodes = numberOfReplicas == 0 ? 2 : 3; + boolean throttleIndexing = randomBoolean(); logger.info( "testRelocationWhileIndexingRandom(numRelocations={}, numberOfReplicas={}, numberOfNodes={})", @@ -173,16 +255,22 @@ public void testRelocationWhileIndexingRandom() throws Exception { numberOfNodes ); + // Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless String[] nodes = new String[numberOfNodes]; logger.info("--> starting [node1] ..."); - nodes[0] = internalCluster().startNode(); + nodes[0] = internalCluster().startNode( + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) + ); logger.info("--> creating test index ..."); prepareCreate("test", indexSettings(1, numberOfReplicas)).get(); + // Randomly use pause throttling vs lock throttling, to verify that relocations proceed regardless for (int i = 2; i <= numberOfNodes; i++) { logger.info("--> starting [node{}] ...", i); - nodes[i - 1] = internalCluster().startNode(); + nodes[i - 1] = internalCluster().startNode( + Settings.builder().put(IndexingMemoryController.PAUSE_INDEXING_ON_THROTTLE.getKey(), randomBoolean()) + ); if (i != numberOfNodes) { ClusterHealthResponse healthResponse = clusterAdmin().prepareHealth(TEST_REQUEST_TIMEOUT) .setWaitForEvents(Priority.LANGUID) @@ -200,17 +288,37 @@ public void testRelocationWhileIndexingRandom() throws Exception { logger.info("--> {} docs indexed", numDocs); logger.info("--> starting relocations..."); - int nodeShiftBased = numberOfReplicas; // if we have replicas shift those + + // When we have a replica, the primary is on node 0 and replica is on node 1. We cannot move primary + // to a node containing the replica, so relocation of primary needs to happen between node 0 and 2. + // When there is no replica, we only have 2 nodes and primary relocates back and forth between node 0 and 1. for (int i = 0; i < numberOfRelocations; i++) { int fromNode = (i % 2); int toNode = fromNode == 0 ? 1 : 0; - fromNode += nodeShiftBased; - toNode += nodeShiftBased; + if (numberOfReplicas == 1) { + fromNode = fromNode == 1 ? 2 : 0; + toNode = toNode == 1 ? 2 : 0; + } + numDocs = scaledRandomIntBetween(200, 1000); + + // Throttle indexing on primary shard + if (throttleIndexing) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes[fromNode]); + IndexShard shard = indicesService.indexServiceSafe(resolveIndex("test")).getShard(0); + // Activate index throttling on "test" index primary shard + logger.info("--> activate throttling for shard on node {}...", nodes[fromNode]); + shard.activateThrottling(); + // Verify that indexing is throttled for this shard + Engine engine = shard.getEngineOrNull(); + assertThat(engine != null && engine.isThrottled(), equalTo(true)); + } logger.debug("--> Allow indexer to index [{}] documents", numDocs); indexer.continueIndexing(numDocs); logger.info("--> START relocate the shard from {} to {}", nodes[fromNode], nodes[toNode]); + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand("test", 0, nodes[fromNode], nodes[toNode])); + if (rarely()) { logger.debug("--> flushing"); indicesAdmin().prepareFlush().get(); @@ -219,11 +327,13 @@ public void testRelocationWhileIndexingRandom() throws Exception { .setWaitForEvents(Priority.LANGUID) .setWaitForNoRelocatingShards(true) .setTimeout(ACCEPTABLE_RELOCATION_TIME) + .setWaitForGreenStatus() .get(); assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); indexer.pauseIndexing(); logger.info("--> DONE relocate the shard from {} to {}", fromNode, toNode); } + logger.info("--> done relocations"); logger.info("--> waiting for indexing threads to stop ..."); indexer.stopAndAwaitStopped(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 1fdd1b52cfd51..5a6a72abdb89a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -483,7 +483,10 @@ protected static final class IndexThrottle { private final Condition pauseCondition = pauseIndexingLock.newCondition(); private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock); private volatile AtomicBoolean suspendThrottling = new AtomicBoolean(); - private final boolean pauseWhenThrottled; // Should throttling pause indexing ? + + // Should throttling pause indexing ? This is decided by the + // IndexingMemoryController#PAUSE_INDEXING_ON_THROTTLE setting for this node. + private final boolean pauseWhenThrottled; private volatile ReleasableLock lock = NOOP_LOCK; public IndexThrottle(boolean pause) { @@ -514,7 +517,6 @@ public Releasable acquireThrottle() { /** Activate throttling, which switches the lock to be a real lock */ public void activate() { assert lock == NOOP_LOCK : "throttling activated while already active"; - startOfThrottleNS = System.nanoTime(); if (pauseWhenThrottled) { lock = pauseLockReference; @@ -562,10 +564,14 @@ boolean isThrottled() { return lock != NOOP_LOCK; } + boolean isIndexingPaused() { + return (lock == pauseLockReference); + } + /** Suspend throttling to allow another task such as relocation to acquire all indexing permits */ public void suspendThrottle() { if (pauseWhenThrottled) { - try (Releasable releasableLock = pauseLockReference.acquire()) { + try (Releasable ignored = pauseLockReference.acquire()) { suspendThrottling.setRelease(true); pauseCondition.signalAll(); } @@ -575,7 +581,7 @@ public void suspendThrottle() { /** Reverse what was done in {@link #suspendThrottle()} */ public void resumeThrottle() { if (pauseWhenThrottled) { - try (Releasable releasableLock = pauseLockReference.acquire()) { + try (Releasable ignored = pauseLockReference.acquire()) { suspendThrottling.setRelease(false); pauseCondition.signalAll(); } @@ -2281,6 +2287,18 @@ public interface Warmer { */ public abstract void deactivateThrottling(); + /** + * If indexing is throttled to the point where it is paused completely, + * another task trying to get indexing permits might want to pause throttling + * by letting one thread pass at a time so that it does not get starved. + */ + public abstract void suspendThrottling(); + + /** + * Reverses a previous {@link #suspendThrottling} call. + */ + public abstract void resumeThrottling(); + /** * This method replays translog to restore the Lucene index which might be reverted previously. * This ensures that all acknowledged writes are restored correctly when this engine is promoted. diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1ca795f69257f..b6a9dcfc4ba6e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2862,6 +2862,16 @@ public void deactivateThrottling() { } } + @Override + public void suspendThrottling() { + throttle.suspendThrottle(); + } + + @Override + public void resumeThrottling() { + throttle.resumeThrottle(); + } + @Override public boolean isThrottled() { return throttle.isThrottled(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 2388154494ad4..800854dcedb0a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -500,6 +500,12 @@ public void activateThrottling() {} @Override public void deactivateThrottling() {} + @Override + public void suspendThrottling() {} + + @Override + public void resumeThrottling() {} + @Override public void trimUnreferencedTranslogFiles() {} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8dc7440c5dccf..53937953bee1a 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -810,7 +810,7 @@ public void relocated( ) throws IllegalIndexShardStateException, IllegalStateException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { - indexShardOperationPermits.blockOperations(new ActionListener<>() { + blockOperations(new ActionListener<>() { @Override public void onResponse(Releasable releasable) { boolean success = false; @@ -888,8 +888,13 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }, 30L, TimeUnit.MINUTES, EsExecutors.DIRECT_EXECUTOR_SERVICE); // Wait on current thread because this execution is wrapped by - // CancellableThreads and we want to be able to interrupt it + }, + 30L, + TimeUnit.MINUTES, + // Wait on current thread because this execution is wrapped by CancellableThreads and we want to be able to interrupt it + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + } } @@ -2750,6 +2755,7 @@ public IndexEventListener getIndexEventListener() { * setting is set to true, throttling will pause indexing completely. Otherwise, indexing will be throttled to one thread. */ public void activateThrottling() { + assert shardRouting.primary() : "only primaries can be throttled: " + shardRouting; try { getEngine().activateThrottling(); } catch (AlreadyClosedException ex) { @@ -2765,6 +2771,22 @@ public void deactivateThrottling() { } } + private void suspendThrottling() { + try { + getEngine().suspendThrottling(); + } catch (AlreadyClosedException ex) { + // ignore + } + } + + private void resumeThrottling() { + try { + getEngine().resumeThrottling(); + } catch (AlreadyClosedException ex) { + // ignore + } + } + private void handleRefreshException(Exception e) { if (e instanceof AlreadyClosedException) { // ignore @@ -3822,6 +3844,39 @@ private ActionListener wrapPrimaryOperationPermitListener(final Acti }); } + /** + * Immediately delays operations and uses the {@code executor} to wait for in-flight operations to finish and then acquires all + * permits. When all permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are + * started. Delayed operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in + * this case the {@code onFailure} handler will be invoked after delayed operations are released. + * + * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed. This listener should not throw. + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + * @param executor executor on which to wait for in-flight operations to finish and acquire all permits + */ + public void blockOperations( + final ActionListener onAcquired, + final long timeout, + final TimeUnit timeUnit, + final Executor executor + ) { + // In case indexing is paused on the shard, suspend throttling so that any currently paused task can + // go ahead and release the indexing permit it holds. + suspendThrottling(); + try { + indexShardOperationPermits.blockOperations( + ActionListener.runAfter(onAcquired, this::resumeThrottling), + timeout, + timeUnit, + executor + ); + } catch (IndexShardClosedException e) { + resumeThrottling(); + throw e; + } + } + private void asyncBlockOperations(ActionListener onPermitAcquired, long timeout, TimeUnit timeUnit) { final Releasable forceRefreshes = refreshListeners.forceRefreshes(); final ActionListener wrappedListener = ActionListener.wrap(r -> { @@ -3832,7 +3887,7 @@ private void asyncBlockOperations(ActionListener onPermitAcquired, l onPermitAcquired.onFailure(e); }); try { - indexShardOperationPermits.blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); + blockOperations(wrappedListener, timeout, timeUnit, threadPool.generic()); } catch (Exception e) { forceRefreshes.close(); throw e;