diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java index 66bc94aa3..c1dd399f3 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5Client.java @@ -128,6 +128,9 @@ public class Rest5Client implements Closeable { private final WarningsHandler warningsHandler; private final boolean compressionEnabled; private final boolean metaHeaderEnabled; + private final boolean enableSingleNodeRetry; + private final int maxSingleNodeRetryAttempts; + private final AtomicInteger singleNodeRetryAttempts = new AtomicInteger(0); Rest5Client( CloseableHttpAsyncClient client, @@ -138,7 +141,9 @@ public class Rest5Client implements Closeable { NodeSelector nodeSelector, boolean strictDeprecationMode, boolean compressionEnabled, - boolean metaHeaderEnabled + boolean metaHeaderEnabled, + boolean enableSingleNodeRetry, + int maxSingleNodeRetryAttempts ) { this.client = client; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); @@ -148,6 +153,8 @@ public class Rest5Client implements Closeable { this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; this.compressionEnabled = compressionEnabled; this.metaHeaderEnabled = metaHeaderEnabled; + this.enableSingleNodeRetry = enableSingleNodeRetry; + this.maxSingleNodeRetryAttempts = maxSingleNodeRetryAttempts; setNodes(nodes); } @@ -290,6 +297,7 @@ public boolean isRunning() { * error */ public Response performRequest(Request request) throws IOException { + singleNodeRetryAttempts.set(0); InternalRequest internalRequest = new InternalRequest(request); return performRequest(nextNodes(), internalRequest, null); } @@ -308,8 +316,17 @@ private Response performRequest(final Iterator nodes, final InternalReques onFailure(context.node); Exception cause = extractAndWrapCause(e); addSuppressedException(previousException, cause); - if (isRetryableException(e) && nodes.hasNext()) { - return performRequest(nodes, request, cause); + if (isRetryableException(e)) { + if (nodes.hasNext()) { + return performRequest(nodes, request, cause); + // For single node scenario, we need to get a new iterator to check if the node can be retried + } else if (enableSingleNodeRetry) { + int attempts = singleNodeRetryAttempts.incrementAndGet(); + if (attempts <= maxSingleNodeRetryAttempts) { + // Try to get a new iterator with potentially retried nodes if single node retry is enabled + return performRequest(nextNodes(), request, cause); + } + } } if (cause instanceof IOException) { throw (IOException) cause; @@ -390,6 +407,7 @@ private ResponseOrResponseException convertResponse(InternalRequest request, Nod */ public Cancellable performRequestAsync(Request request, ResponseListener responseListener) { try { + singleNodeRetryAttempts.set(0); FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); InternalRequest internalRequest = new InternalRequest(request); @@ -418,10 +436,18 @@ public void completed(ClassicHttpResponse httpResponse) { context.node, httpResponse); if (responseOrResponseException.responseException == null) { listener.onSuccess(responseOrResponseException.response); - } else { + } + else { + listener.trackFailure(responseOrResponseException.responseException); if (nodes.hasNext()) { - listener.trackFailure(responseOrResponseException.responseException); performRequestAsync(nodes, request, listener); + } else if (enableSingleNodeRetry && singleNodeRetryAttempts.incrementAndGet() <= maxSingleNodeRetryAttempts) { + // Try to get a new iterator with potentially retried nodes if single node retry is enabled + try { + performRequestAsync(nextNodes(), request, listener); + } catch (IOException ioException) { + listener.onDefinitiveFailure(ioException); + } } else { listener.onDefinitiveFailure(responseOrResponseException.responseException); } @@ -437,9 +463,20 @@ public void failed(Exception failure) { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); onFailure(context.node); - if (isRetryableException(failure) && nodes.hasNext()) { + if (isRetryableException(failure)) { listener.trackFailure(failure); - performRequestAsync(nodes, request, listener); + if (nodes.hasNext()) { + performRequestAsync(nodes, request, listener); + } else if (enableSingleNodeRetry && singleNodeRetryAttempts.incrementAndGet() <= maxSingleNodeRetryAttempts) { + // Try to get a new iterator with potentially retried nodes if single node retry is enabled + try { + performRequestAsync(nextNodes(), request, listener); + } catch (IOException ioException) { + listener.onDefinitiveFailure(ioException); + } + } else { + listener.onDefinitiveFailure(failure); + } } else { listener.onDefinitiveFailure(failure); } @@ -472,7 +509,7 @@ public void cancelled() { */ private Iterator nextNodes() throws IOException { List nodes = this.nodes; - return selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector).iterator(); + return selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector, enableSingleNodeRetry).iterator(); } /** @@ -484,7 +521,8 @@ static Iterable selectNodes( List nodes, Map blacklist, AtomicInteger lastNodeIndex, - NodeSelector nodeSelector + NodeSelector nodeSelector, + boolean enableSingleNodeRetry ) throws IOException { /* * Sort the nodes into living and dead lists. @@ -493,7 +531,6 @@ static Iterable selectNodes( List deadNodes = null; if (!blacklist.isEmpty()) { deadNodes = new ArrayList<>(blacklist.size()); - new ArrayList<>(blacklist.size()); for (Node node : nodes) { DeadHostState deadness = blacklist.get(node.getHost()); if (deadness == null || deadness.shallBeRetried()) { @@ -508,6 +545,7 @@ static Iterable selectNodes( livingNodes.addAll(nodes); } + // If we have living nodes, use them if (!livingNodes.isEmpty()) { /* * Normal state: there is at least one living node. If the @@ -548,6 +586,22 @@ static Iterable selectNodes( return singletonList(Collections.min(selectedDeadNodes).node); } } + + // Special case: only one node and it's dead, but we need to try it anyway if enabled + if (enableSingleNodeRetry && nodes.size() == 1) { + Node singleNode = nodes.get(0); + try { + List singleNodeList = new ArrayList<>(1); + singleNodeList.add(singleNode); + nodeSelector.select(singleNodeList); + if (!singleNodeList.isEmpty()) { + return singletonList(singleNode); + } + } catch (Exception e) { + // Ignore, will throw the standard exception below + } + } + throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, living: " + livingNodes + " and dead: " + deadNodes); } @@ -991,4 +1045,4 @@ public InputStream asInput() { return new ByteArrayInputStream(this.buf, 0, this.count); } } -} +} \ No newline at end of file diff --git a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java index d35ffac3f..5eb9d5aa8 100644 --- a/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java +++ b/rest5-client/src/main/java/co/elastic/clients/transport/rest5_client/low_level/Rest5ClientBuilder.java @@ -91,6 +91,8 @@ public final class Rest5ClientBuilder { private boolean strictDeprecationMode = false; private boolean compressionEnabled = false; private boolean metaHeaderEnabled = true; + private boolean enableSingleNodeRetry = false; + private int maxSingleNodeRetryAttempts = 1; static { // Never fail on unknown version, even if an environment messed up their classpath enough that we @@ -301,6 +303,29 @@ public Rest5ClientBuilder setStrictDeprecationMode(boolean strictDeprecationMode return this; } + /** + * Whether the REST client should retry requests when only a single node is available. + * By default, this is disabled. + */ + public Rest5ClientBuilder setEnableSingleNodeRetry(boolean enableSingleNodeRetry) { + this.enableSingleNodeRetry = enableSingleNodeRetry; + return this; + } + + /** + * Sets the maximum number of retry attempts when single node retry is enabled. + * By default, this is 3. + * + * @throws IllegalArgumentException if {@code maxAttempts} is less than 0. + */ + public Rest5ClientBuilder setMaxSingleNodeRetryAttempts(int maxAttempts) { + if (maxAttempts < 0) { + throw new IllegalArgumentException("maxSingleNodeRetryAttempts must be >= 0"); + } + this.maxSingleNodeRetryAttempts = maxAttempts; + return this; + } + /** * Whether the REST client should compress requests using gzip content encoding and add the * "Accept-Encoding: gzip" @@ -394,7 +419,9 @@ public Rest5Client build() { nodeSelector, strictDeprecationMode, compressionEnabled, - metaHeaderEnabled + metaHeaderEnabled, + enableSingleNodeRetry, + maxSingleNodeRetryAttempts ); httpClient.start(); return restClient; @@ -483,4 +510,4 @@ private CloseableHttpAsyncClient createHttpClient() { } } -} +} \ No newline at end of file diff --git a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientMultipleHostsTests.java b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientMultipleHostsTests.java index 71dfd014b..376c74206 100644 --- a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientMultipleHostsTests.java +++ b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientMultipleHostsTests.java @@ -67,7 +67,7 @@ public Rest5Client createRestClient(NodeSelector nodeSelector) { } nodes = Collections.unmodifiableList(nodes); failureListener = new HostsTrackingFailureListener(); - return new Rest5Client(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false, false); + return new Rest5Client(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false, false, false, 1); } /** diff --git a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientSingleHostTests.java b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientSingleHostTests.java index 89f90f913..63815540c 100644 --- a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientSingleHostTests.java +++ b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientSingleHostTests.java @@ -80,6 +80,7 @@ import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.getHttpMethods; import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.getOkStatusCodes; import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.randomStatusCode; +import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.randomErrorRetryStatusCode; import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; @@ -130,7 +131,9 @@ public void createRestClient() { NodeSelector.ANY, strictDeprecationMode, false, - false + false, + false, + 1 ); } @@ -672,8 +675,8 @@ public void onFailure(Exception exception) { * normally the case for synchronous calls but {@link Rest5Client} performs * synchronous calls by performing asynchronous calls and blocking the * current thread until the call returns so it has to take special care - * to make sure that the caller shows up in the exception. We use this - * assertion to make sure that we don't break that "special care". + * to make sure that the caller shows up in the exception. We use + * this assertion to make sure that we don't break that "special care". */ private static void assertExceptionStackContainsCallingMethod(Throwable t) { // 0 is getStackTrace @@ -689,4 +692,41 @@ private static void assertExceptionStackContainsCallingMethod(Throwable t) { t.printStackTrace(new PrintWriter(stack)); fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); } -} + + /** + * Tests that single node retry works correctly when enabled with custom max attempts. + */ + @Test + public void testSingleNodeRetryWithCustomMaxAttempts() throws Exception { + // Create a client with single node retry enabled and max attempts = 3 + Rest5Client restClient = new Rest5Client( + httpClient, + defaultHeaders, + singletonList(node), + null, + failureListener, + NodeSelector.ANY, + strictDeprecationMode, + false, + false, + // single node retry is enabled + true, + // max attempts is 3 + 3 + ); + + // Test that requests are retried up to max attempts on retryable errors + for (String method : getHttpMethods()) { + int retryStatusCode = randomErrorRetryStatusCode(); + try { + performRequestSyncOrAsync(restClient, new Request(method, "/" + retryStatusCode)); + fail("request should have failed after max retry attempts"); + } catch (ResponseException e) { + assertEquals(retryStatusCode, e.getResponse().getStatusCode()); + // Verify that the failure listener was called multiple times (retries) + failureListener.assertCalled(singletonList(node)); + } + } + } + +} \ No newline at end of file diff --git a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientTests.java b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientTests.java index dc12c386a..1c86fa0e2 100644 --- a/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientTests.java +++ b/rest5-client/src/test/java/co/elastic/clients/transport/rest5_client/low_level/RestClientTests.java @@ -63,7 +63,7 @@ public void testCloseIsIdempotent() throws IOException { List nodes = singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class); Rest5Client restClient = new Rest5Client(closeableHttpAsyncClient, new Header[0], nodes, null, null, - null, false, false, false); + null, false, false, false, false, 1); restClient.close(); verify(closeableHttpAsyncClient, times(1)).close(); restClient.close(); @@ -304,9 +304,9 @@ public String toString() { * to being revived that the NodeSelector is ok with. */ assertEquals(singletonList(n1), Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(), - NodeSelector.ANY)); + NodeSelector.ANY, false)); assertEquals(singletonList(n2), Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(), - not1)); + not1, false)); /* * Try a NodeSelector that excludes all nodes. This should @@ -351,12 +351,12 @@ private void assertSelectLivingHosts( ) throws IOException { int iterations = 1000; AtomicInteger lastNodeIndex = new AtomicInteger(0); - assertEquals(expectedNodes, Rest5Client.selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector)); + assertEquals(expectedNodes, Rest5Client.selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector, false)); // Calling it again rotates the set of results for (int i = 1; i < iterations; i++) { Collections.rotate(expectedNodes, 1); assertEquals("iteration " + i, expectedNodes, Rest5Client.selectNodes(nodes, blacklist, - lastNodeIndex, nodeSelector)); + lastNodeIndex, nodeSelector, false)); } } @@ -371,7 +371,7 @@ private static String assertSelectAllRejected( NodeSelector nodeSelector ) { try { - Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(0), nodeSelector); + Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(0), nodeSelector, false); throw new AssertionError("expected selectHosts to fail"); } catch (IOException e) { return e.getMessage(); @@ -381,7 +381,7 @@ private static String assertSelectAllRejected( private static Rest5Client createRestClient() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); return new Rest5Client(mock(CloseableHttpAsyncClient.class), new Header[]{}, nodes, null, null, null - , false, false, false); + , false, false, false, false, 1); } @Test @@ -418,7 +418,7 @@ public void testIsRunning() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient client = mock(CloseableHttpAsyncClient.class); Rest5Client restClient = new Rest5Client(client, new Header[]{}, nodes, null, null, null, false, - false, false); + false, false, false, 1); when(client.getStatus()).thenReturn(IOReactorStatus.ACTIVE); assertTrue(restClient.isRunning()); @@ -440,7 +440,8 @@ private static void assertNodes(List nodes, AtomicInteger lastNodeIndex, i nodes, Collections.emptyMap(), lastNodeIndex, - NodeSelector.ANY + NodeSelector.ANY, + false ); List expectedNodes = nodes; int index = 0;