Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable query cancellation for MSQE + cancel using client-provided id #14823

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f7a9488
add cancelClientQuery operation for SingleStageBroker (only numerical…
albertobastos Jan 15, 2025
39a4f94
avoid synchronized BiMap and checkstyle
albertobastos Jan 16, 2025
c969abd
Merge remote-tracking branch 'origin' into cancel-with-cqid
albertobastos Jan 16, 2025
8162bc6
Merge branch 'master' into cancel-with-cqid
albertobastos Jan 24, 2025
aa8c120
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 27, 2025
7a5f713
add cancel feature (with queryId and clientQueryId) to MSQE, some ref…
albertobastos Jan 27, 2025
a9d1e49
set and delete clientRequestId on MSQE
albertobastos Jan 27, 2025
97e7b5d
fix unimplemented method
albertobastos Jan 27, 2025
65f73a0
fix I/O parameter and related tests
albertobastos Jan 27, 2025
e3a9a5e
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
fe5c846
add clientRequestId on response test
albertobastos Jan 28, 2025
ae3260c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
2eb506e
add sleep and random functions for further tests
albertobastos Jan 28, 2025
5dd5409
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 29, 2025
e9bbdac
override test server conf
albertobastos Jan 29, 2025
9dcc393
add missing superclass call
albertobastos Jan 29, 2025
5ac7d9e
add some cancel query test using internal sleep function with a trick
albertobastos Jan 29, 2025
a46659c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 30, 2025
110fb16
bring master
albertobastos Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,45 @@ public String cancelQuery(
.build());
}

@DELETE
@Path("clientQuery/{clientQueryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for"
+ "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling"
+ "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 404, message = "Query not found on the requested broker")
})
public String cancelClientQuery(
@ApiParam(value = "ClientQueryId given by the client", required = true)
@PathParam("clientQueryId") String clientQueryId,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
Comment on lines +429 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the same endpoint we already have?

try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQueryByClientId(clientQueryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled client query: " + clientQueryId;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
}
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format(
"Failed to cancel client query: %s on the broker due to error: %s", clientQueryId, e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(
String.format("Client query: %s not found on the broker", clientQueryId))
.build());
}

@GET
@Path("queries")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,24 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -51,6 +57,7 @@
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
Expand All @@ -74,6 +81,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final QueryLogger _queryLogger;
@Nullable
protected final String _enableNullHandling;
protected final Map<Long, String> _queriesById;
protected final Map<Long, String> _clientQueryIds;
Comment on lines +84 to +85
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document from what to what are we mapping here?


public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) {
Expand All @@ -90,6 +99,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryLogger = new QueryLogger(config);
_enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING);

boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}
Comment on lines +103 to +111
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not something we introduced in this PR, but something I think we need to take care of in the future:

We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.

This is something we need to improve in the future

}

@Override
Expand Down Expand Up @@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq
@Nullable HttpHeaders httpHeaders, AccessControl accessControl)
throws Exception;

protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.

HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception;

protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
// TODO: Add partial result flag to RequestContext
Expand Down Expand Up @@ -223,4 +245,65 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
statistics.setTraceInfo(response.getTraceInfo());
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return new HashMap<>(_queriesById);
}
Comment on lines +250 to +253
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is safer and better practice to return an immutable view of the map instead. In the delegate we can create a copy we can modify.


@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
try {
return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses);
} finally {
maybeRemoveQuery(queryId);
}
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
Optional<Long> requestId = _clientQueryIds.entrySet().stream()
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst();
if (requestId.isPresent()) {
return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses);
} else {
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
Comment on lines +277 to +278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should't we throw here to notify the caller that the query id is incorrect?

}
}

protected String maybeSaveQuery(long requestId, SqlNodeAndOptions sqlNodeAndOptions, String query) {
if (isQueryCancellationEnabled()) {
String clientRequestId = sqlNodeAndOptions.getOptions() != null
? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null;
_queriesById.put(requestId, query);
if (StringUtils.isNotBlank(clientRequestId)) {
_clientQueryIds.put(requestId, clientRequestId);
LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId);
} else {
LOGGER.debug("Keep track of running query: {}", requestId);
}
return clientRequestId;
}
return null;
}
Comment on lines +282 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not aligned with what I would expect by its name. This method does two different things I think we should split into two methods:

  1. extracts the client request id
  2. optionally associates a request id to to the client id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact the second method could be called onQueryStart and SSE could use it to update _serversById


protected void maybeRemoveQuery(long requestId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the maybe prefix? Just call it remove. If query cancellation is not enabled, to remove it is a NOOP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact we can call this method something like onQueryFinish. The fact that what this class does here is to remove the query id from the different maps is not important for the caller.

if (isQueryCancellationEnabled()) {
_queriesById.remove(requestId);
_clientQueryIds.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
}

protected boolean isQueryCancellationEnabled() {
return _queriesById != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -142,7 +141,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableQueryLimitOverride;
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final Map<Long, QueryServers> _serversById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
Expand All @@ -160,9 +159,11 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
if (this.isQueryCancellationEnabled()) {
_serversById = new ConcurrentHashMap<>();
} else {
_serversById = null;
}

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
Expand All @@ -172,9 +173,9 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
}

LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, "
+ "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId,
+ "query log max rate: {}", getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
this.isQueryCancellationEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you remove the , query cancellation enabled: {} part on propose? It looks like we have an extra parameter now.

}

@Override
Expand Down Expand Up @@ -210,31 +211,33 @@ public void shutDown() {
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
QueryServers queryServers = _serversById.get(requestId);
return queryServers != null ? queryServers._servers : Collections.emptySet();
}

@Override
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
protected void maybeRemoveQuery(long requestId) {
super.maybeRemoveQuery(requestId);
if (isQueryCancellationEnabled()) {
_serversById.remove(requestId);
}
}

@Override
protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
QueryServers queryServers = _serversById.get(queryId);
if (queryServers == null) {
return false;
}

// TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for
// details
String globalQueryId = getGlobalQueryId(requestId);
String globalQueryId = getGlobalQueryId(queryId);
List<Pair<String, String>> serverUrls = new ArrayList<>();
for (ServerInstance serverInstance : queryServers._servers) {
serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null));
Expand Down Expand Up @@ -801,7 +804,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
BrokerResponseNative brokerResponse;
if (_queriesById != null) {
if (isQueryCancellationEnabled()) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand All @@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason to split this method is that we may want to set brokerRespose.setClientRequestId even if query cancellation is disabled.

if (isQueryCancellationEnabled()) {
_serversById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
}
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse.setClientRequestId(clientRequestId);
} finally {
_queriesById.remove(requestId);
maybeRemoveQuery(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin
boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;

/**
* Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may
* still run for a while after calling this method. This cancel method can be called multiple times.
* @param clientQueryId the Id assigned to the query by the client
* @param timeoutMs timeout to wait for servers to respond the cancel requests
* @param executor to send cancel requests to servers in parallel
* @param connMgr to provide the http connections
* @param serverResponses to collect cancel responses from all servers if a map is provided
* @return true if there is a running query for the given clientQueryId.
*/
boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,37 @@ public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, String

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
// running queries with those from singleStaged engine. Both engines share the same request Id generator, so
// the query will have unique ids across the two engines.
return _singleStageBrokerRequestHandler.getRunningQueries();
// Both engines share the same request Id generator, so the query will have unique ids across the two engines.
Map<Long, String> queries = _singleStageBrokerRequestHandler.getRunningQueries();
if (_multiStageBrokerRequestHandler != null) {
queries.putAll(_multiStageBrokerRequestHandler.getRunningQueries());
}
return queries;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
// TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if
// not found, try on the singleStaged engine.
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQuery(
queryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
if (_multiStageBrokerRequestHandler != null && _multiStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses)) {
return true;
}
return _singleStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses);
}

private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
throws Exception {
if (numRows == null) {
Expand Down
Loading
Loading