diff --git a/docs/changelog/119897.yaml b/docs/changelog/119897.yaml new file mode 100644 index 0000000000000..87c5890f9fde1 --- /dev/null +++ b/docs/changelog/119897.yaml @@ -0,0 +1,5 @@ +pr: 119897 +summary: Fix ESQL async get while task is being cancelled +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java index 217274f963aec..2236304d8c4d0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java @@ -121,7 +121,7 @@ private void getSearchResponseFromTask( ) { try { final Task task = store.getTaskAndCheckAuthentication(taskManager, searchId, asyncTaskClass); - if (task == null || task.isCancelled()) { + if (task == null || (updateInitialResultsInStore && task.isCancelled())) { getSearchResponseFromIndex(searchId, request, nowInMillis, listener); return; } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java index f85de51101af5..163cbc8491298 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java @@ -10,9 +10,11 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; @@ -34,8 +36,11 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty; import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; /** @@ -112,6 +117,59 @@ public void testBasicAsyncExecution() throws Exception { } } + public void testGetAsyncWhileQueryTaskIsBeingCancelled() throws Exception { + try (var initialResponse = sendAsyncQuery()) { + assertThat(initialResponse.asyncExecutionId(), isPresent()); + assertThat(initialResponse.isRunning(), is(true)); + String id = initialResponse.asyncExecutionId().get(); + // ensure we have started Lucene operators + assertBusy(() -> { + var tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .setDetailed(true) + .get() + .getTasks() + .stream() + .filter(t -> t.description().contains("_LuceneSourceOperator")) + .toList(); + assertThat(tasks.size(), greaterThanOrEqualTo(1)); + }); + client().admin().cluster().prepareCancelTasks().setActions(EsqlQueryAction.NAME + "[a]").get(); + assertBusy(() -> { + List tasks = getEsqlQueryTasks().stream().filter(TaskInfo::cancelled).toList(); + assertThat(tasks, not(empty())); + }); + // get the result while the query is being cancelled + { + var getResultsRequest = new GetAsyncResultRequest(id); + getResultsRequest.setWaitForCompletionTimeout(timeValueMillis(10)); + getResultsRequest.setKeepAlive(randomKeepAlive()); + var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest); + try (var resp = future.get()) { + assertThat(initialResponse.asyncExecutionId(), isPresent()); + assertThat(resp.asyncExecutionId().get(), equalTo(id)); + assertThat(resp.isRunning(), is(true)); + } + } + // release the permits to allow the query to proceed + scriptPermits.release(numberOfDocs()); + // get the result after the cancellation is done + { + var getResultsRequest = new GetAsyncResultRequest(id); + getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(10)); + getResultsRequest.setKeepAlive(randomKeepAlive()); + var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest); + TaskCancelledException error = expectThrows(TaskCancelledException.class, future::actionGet); + assertThat(error.getMessage(), equalTo("by user request")); + } + assertTrue(deleteAsyncId(id).isAcknowledged()); + } finally { + scriptPermits.drainPermits(); + } + } + public void testAsyncCancellation() throws Exception { try (var initialResponse = sendAsyncQuery()) { assertThat(initialResponse.asyncExecutionId(), isPresent());