Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ public class Rest5Client implements Closeable {
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
private final boolean metaHeaderEnabled;
private final boolean enableSingleNodeRetry;
private final int maxSingleNodeRetryAttempts;
private final AtomicInteger singleNodeRetryAttempts = new AtomicInteger(0);

Rest5Client(
CloseableHttpAsyncClient client,
Expand All @@ -138,7 +141,9 @@ public class Rest5Client implements Closeable {
NodeSelector nodeSelector,
boolean strictDeprecationMode,
boolean compressionEnabled,
boolean metaHeaderEnabled
boolean metaHeaderEnabled,
boolean enableSingleNodeRetry,
int maxSingleNodeRetryAttempts
) {
this.client = client;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
Expand All @@ -148,6 +153,8 @@ public class Rest5Client implements Closeable {
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.metaHeaderEnabled = metaHeaderEnabled;
this.enableSingleNodeRetry = enableSingleNodeRetry;
this.maxSingleNodeRetryAttempts = maxSingleNodeRetryAttempts;
setNodes(nodes);
}

Expand Down Expand Up @@ -290,6 +297,7 @@ public boolean isRunning() {
* error
*/
public Response performRequest(Request request) throws IOException {
singleNodeRetryAttempts.set(0);
InternalRequest internalRequest = new InternalRequest(request);
return performRequest(nextNodes(), internalRequest, null);
}
Expand All @@ -308,8 +316,17 @@ private Response performRequest(final Iterator<Node> nodes, final InternalReques
onFailure(context.node);
Exception cause = extractAndWrapCause(e);
addSuppressedException(previousException, cause);
if (isRetryableException(e) && nodes.hasNext()) {
return performRequest(nodes, request, cause);
if (isRetryableException(e)) {
if (nodes.hasNext()) {
return performRequest(nodes, request, cause);
// For single node scenario, we need to get a new iterator to check if the node can be retried
} else if (enableSingleNodeRetry) {
int attempts = singleNodeRetryAttempts.incrementAndGet();
if (attempts <= maxSingleNodeRetryAttempts) {
// Try to get a new iterator with potentially retried nodes if single node retry is enabled
return performRequest(nextNodes(), request, cause);
}
}
}
if (cause instanceof IOException) {
throw (IOException) cause;
Expand Down Expand Up @@ -390,6 +407,7 @@ private ResponseOrResponseException convertResponse(InternalRequest request, Nod
*/
public Cancellable performRequestAsync(Request request, ResponseListener responseListener) {
try {
singleNodeRetryAttempts.set(0);
FailureTrackingResponseListener failureTrackingResponseListener =
new FailureTrackingResponseListener(responseListener);
InternalRequest internalRequest = new InternalRequest(request);
Expand Down Expand Up @@ -418,10 +436,18 @@ public void completed(ClassicHttpResponse httpResponse) {
context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
listener.onSuccess(responseOrResponseException.response);
} else {
}
else {
listener.trackFailure(responseOrResponseException.responseException);
if (nodes.hasNext()) {
listener.trackFailure(responseOrResponseException.responseException);
performRequestAsync(nodes, request, listener);
} else if (enableSingleNodeRetry && singleNodeRetryAttempts.incrementAndGet() <= maxSingleNodeRetryAttempts) {
// Try to get a new iterator with potentially retried nodes if single node retry is enabled
try {
performRequestAsync(nextNodes(), request, listener);
} catch (IOException ioException) {
listener.onDefinitiveFailure(ioException);
}
} else {
listener.onDefinitiveFailure(responseOrResponseException.responseException);
}
Expand All @@ -437,9 +463,20 @@ public void failed(Exception failure) {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node,
failure);
onFailure(context.node);
if (isRetryableException(failure) && nodes.hasNext()) {
if (isRetryableException(failure)) {
listener.trackFailure(failure);
performRequestAsync(nodes, request, listener);
if (nodes.hasNext()) {
performRequestAsync(nodes, request, listener);
} else if (enableSingleNodeRetry && singleNodeRetryAttempts.incrementAndGet() <= maxSingleNodeRetryAttempts) {
// Try to get a new iterator with potentially retried nodes if single node retry is enabled
try {
performRequestAsync(nextNodes(), request, listener);
} catch (IOException ioException) {
listener.onDefinitiveFailure(ioException);
}
} else {
listener.onDefinitiveFailure(failure);
}
} else {
listener.onDefinitiveFailure(failure);
}
Expand Down Expand Up @@ -472,7 +509,7 @@ public void cancelled() {
*/
private Iterator<Node> nextNodes() throws IOException {
List<Node> nodes = this.nodes;
return selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector).iterator();
return selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector, enableSingleNodeRetry).iterator();
}

/**
Expand All @@ -484,7 +521,8 @@ static Iterable<Node> selectNodes(
List<Node> nodes,
Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex,
NodeSelector nodeSelector
NodeSelector nodeSelector,
boolean enableSingleNodeRetry
) throws IOException {
/*
* Sort the nodes into living and dead lists.
Expand All @@ -493,7 +531,6 @@ static Iterable<Node> selectNodes(
List<DeadNode> deadNodes = null;
if (!blacklist.isEmpty()) {
deadNodes = new ArrayList<>(blacklist.size());
new ArrayList<>(blacklist.size());
for (Node node : nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null || deadness.shallBeRetried()) {
Expand All @@ -508,6 +545,7 @@ static Iterable<Node> selectNodes(
livingNodes.addAll(nodes);
}

// If we have living nodes, use them
if (!livingNodes.isEmpty()) {
/*
* Normal state: there is at least one living node. If the
Expand Down Expand Up @@ -548,6 +586,22 @@ static Iterable<Node> selectNodes(
return singletonList(Collections.min(selectedDeadNodes).node);
}
}

// Special case: only one node and it's dead, but we need to try it anyway if enabled
if (enableSingleNodeRetry && nodes.size() == 1) {
Node singleNode = nodes.get(0);
try {
List<Node> singleNodeList = new ArrayList<>(1);
singleNodeList.add(singleNode);
nodeSelector.select(singleNodeList);
if (!singleNodeList.isEmpty()) {
return singletonList(singleNode);
}
} catch (Exception e) {
// Ignore, will throw the standard exception below
}
}

throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, living: " + livingNodes + " and dead: " + deadNodes);
}

Expand Down Expand Up @@ -991,4 +1045,4 @@ public InputStream asInput() {
return new ByteArrayInputStream(this.buf, 0, this.count);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public final class Rest5ClientBuilder {
private boolean strictDeprecationMode = false;
private boolean compressionEnabled = false;
private boolean metaHeaderEnabled = true;
private boolean enableSingleNodeRetry = false;
private int maxSingleNodeRetryAttempts = 1;

static {
// Never fail on unknown version, even if an environment messed up their classpath enough that we
Expand Down Expand Up @@ -301,6 +303,29 @@ public Rest5ClientBuilder setStrictDeprecationMode(boolean strictDeprecationMode
return this;
}

/**
* Whether the REST client should retry requests when only a single node is available.
* By default, this is disabled.
*/
public Rest5ClientBuilder setEnableSingleNodeRetry(boolean enableSingleNodeRetry) {
this.enableSingleNodeRetry = enableSingleNodeRetry;
return this;
}

/**
* Sets the maximum number of retry attempts when single node retry is enabled.
* By default, this is 3.
*
* @throws IllegalArgumentException if {@code maxAttempts} is less than 0.
*/
public Rest5ClientBuilder setMaxSingleNodeRetryAttempts(int maxAttempts) {
if (maxAttempts < 0) {
throw new IllegalArgumentException("maxSingleNodeRetryAttempts must be >= 0");
}
this.maxSingleNodeRetryAttempts = maxAttempts;
return this;
}

/**
* Whether the REST client should compress requests using gzip content encoding and add the
* "Accept-Encoding: gzip"
Expand Down Expand Up @@ -394,7 +419,9 @@ public Rest5Client build() {
nodeSelector,
strictDeprecationMode,
compressionEnabled,
metaHeaderEnabled
metaHeaderEnabled,
enableSingleNodeRetry,
maxSingleNodeRetryAttempts
);
httpClient.start();
return restClient;
Expand Down Expand Up @@ -483,4 +510,4 @@ private CloseableHttpAsyncClient createHttpClient() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Rest5Client createRestClient(NodeSelector nodeSelector) {
}
nodes = Collections.unmodifiableList(nodes);
failureListener = new HostsTrackingFailureListener();
return new Rest5Client(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false, false);
return new Rest5Client(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false, false, false, false, 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.getHttpMethods;
import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.getOkStatusCodes;
import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.randomStatusCode;
import static co.elastic.clients.transport.rest5_client.low_level.RestClientTestUtil.randomErrorRetryStatusCode;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -130,7 +131,9 @@ public void createRestClient() {
NodeSelector.ANY,
strictDeprecationMode,
false,
false
false,
false,
1
);
}

Expand Down Expand Up @@ -672,8 +675,8 @@ public void onFailure(Exception exception) {
* normally the case for synchronous calls but {@link Rest5Client} performs
* synchronous calls by performing asynchronous calls and blocking the
* current thread until the call returns so it has to take special care
* to make sure that the caller shows up in the exception. We use this
* assertion to make sure that we don't break that "special care".
* to make sure that the caller shows up in the exception. We use
* this assertion to make sure that we don't break that "special care".
*/
private static void assertExceptionStackContainsCallingMethod(Throwable t) {
// 0 is getStackTrace
Expand All @@ -689,4 +692,41 @@ private static void assertExceptionStackContainsCallingMethod(Throwable t) {
t.printStackTrace(new PrintWriter(stack));
fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack);
}
}

/**
* Tests that single node retry works correctly when enabled with custom max attempts.
*/
@Test
public void testSingleNodeRetryWithCustomMaxAttempts() throws Exception {
// Create a client with single node retry enabled and max attempts = 3
Rest5Client restClient = new Rest5Client(
httpClient,
defaultHeaders,
singletonList(node),
null,
failureListener,
NodeSelector.ANY,
strictDeprecationMode,
false,
false,
// single node retry is enabled
true,
// max attempts is 3
3
);

// Test that requests are retried up to max attempts on retryable errors
for (String method : getHttpMethods()) {
int retryStatusCode = randomErrorRetryStatusCode();
try {
performRequestSyncOrAsync(restClient, new Request(method, "/" + retryStatusCode));
fail("request should have failed after max retry attempts");
} catch (ResponseException e) {
assertEquals(retryStatusCode, e.getResponse().getStatusCode());
// Verify that the failure listener was called multiple times (retries)
failureListener.assertCalled(singletonList(node));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testCloseIsIdempotent() throws IOException {
List<Node> nodes = singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class);
Rest5Client restClient = new Rest5Client(closeableHttpAsyncClient, new Header[0], nodes, null, null,
null, false, false, false);
null, false, false, false, false, 1);
restClient.close();
verify(closeableHttpAsyncClient, times(1)).close();
restClient.close();
Expand Down Expand Up @@ -304,9 +304,9 @@ public String toString() {
* to being revived that the NodeSelector is ok with.
*/
assertEquals(singletonList(n1), Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(),
NodeSelector.ANY));
NodeSelector.ANY, false));
assertEquals(singletonList(n2), Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(),
not1));
not1, false));

/*
* Try a NodeSelector that excludes all nodes. This should
Expand Down Expand Up @@ -351,12 +351,12 @@ private void assertSelectLivingHosts(
) throws IOException {
int iterations = 1000;
AtomicInteger lastNodeIndex = new AtomicInteger(0);
assertEquals(expectedNodes, Rest5Client.selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector));
assertEquals(expectedNodes, Rest5Client.selectNodes(nodes, blacklist, lastNodeIndex, nodeSelector, false));
// Calling it again rotates the set of results
for (int i = 1; i < iterations; i++) {
Collections.rotate(expectedNodes, 1);
assertEquals("iteration " + i, expectedNodes, Rest5Client.selectNodes(nodes, blacklist,
lastNodeIndex, nodeSelector));
lastNodeIndex, nodeSelector, false));
}
}

Expand All @@ -371,7 +371,7 @@ private static String assertSelectAllRejected(
NodeSelector nodeSelector
) {
try {
Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(0), nodeSelector);
Rest5Client.selectNodes(nodes, blacklist, new AtomicInteger(0), nodeSelector, false);
throw new AssertionError("expected selectHosts to fail");
} catch (IOException e) {
return e.getMessage();
Expand All @@ -381,7 +381,7 @@ private static String assertSelectAllRejected(
private static Rest5Client createRestClient() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
return new Rest5Client(mock(CloseableHttpAsyncClient.class), new Header[]{}, nodes, null, null, null
, false, false, false);
, false, false, false, false, 1);
}

@Test
Expand Down Expand Up @@ -418,7 +418,7 @@ public void testIsRunning() {
List<Node> nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200)));
CloseableHttpAsyncClient client = mock(CloseableHttpAsyncClient.class);
Rest5Client restClient = new Rest5Client(client, new Header[]{}, nodes, null, null, null, false,
false, false);
false, false, false, 1);

when(client.getStatus()).thenReturn(IOReactorStatus.ACTIVE);
assertTrue(restClient.isRunning());
Expand All @@ -440,7 +440,8 @@ private static void assertNodes(List<Node> nodes, AtomicInteger lastNodeIndex, i
nodes,
Collections.<HttpHost, DeadHostState>emptyMap(),
lastNodeIndex,
NodeSelector.ANY
NodeSelector.ANY,
false
);
List<Node> expectedNodes = nodes;
int index = 0;
Expand Down