Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable query cancellation for MSQE + cancel using client-provided id #14823

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f7a9488
add cancelClientQuery operation for SingleStageBroker (only numerical…
albertobastos Jan 15, 2025
39a4f94
avoid synchronized BiMap and checkstyle
albertobastos Jan 16, 2025
c969abd
Merge remote-tracking branch 'origin' into cancel-with-cqid
albertobastos Jan 16, 2025
8162bc6
Merge branch 'master' into cancel-with-cqid
albertobastos Jan 24, 2025
aa8c120
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 27, 2025
7a5f713
add cancel feature (with queryId and clientQueryId) to MSQE, some ref…
albertobastos Jan 27, 2025
a9d1e49
set and delete clientRequestId on MSQE
albertobastos Jan 27, 2025
97e7b5d
fix unimplemented method
albertobastos Jan 27, 2025
65f73a0
fix I/O parameter and related tests
albertobastos Jan 27, 2025
e3a9a5e
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
fe5c846
add clientRequestId on response test
albertobastos Jan 28, 2025
ae3260c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 28, 2025
2eb506e
add sleep and random functions for further tests
albertobastos Jan 28, 2025
5dd5409
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 29, 2025
e9bbdac
override test server conf
albertobastos Jan 29, 2025
9dcc393
add missing superclass call
albertobastos Jan 29, 2025
5ac7d9e
add some cancel query test using internal sleep function with a trick
albertobastos Jan 29, 2025
a46659c
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 30, 2025
110fb16
bring master
albertobastos Jan 30, 2025
47fb9bb
reuse same broker endpoint for internal and client-based cancellation
albertobastos Jan 31, 2025
52998d3
add javadoc
albertobastos Jan 31, 2025
e2678af
add mapping comments
albertobastos Jan 31, 2025
c201cf4
refactor base broker methods
albertobastos Jan 31, 2025
c60b953
return immutable view instead of copy
albertobastos Jan 31, 2025
6042ad2
enable sleep(ms) function only during testing
albertobastos Jan 31, 2025
b458a5b
reduce unit test wait time
albertobastos Jan 31, 2025
9d0f335
replace constant with literal on test
albertobastos Jan 31, 2025
1e00bf6
linter
albertobastos Jan 31, 2025
d3061ba
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Jan 31, 2025
a0e1e83
remove embarassing npe
albertobastos Jan 31, 2025
d0e6393
Merge branch 'master' of github.com:albertobastos/pinot into cancel-w…
albertobastos Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,45 @@ public String cancelQuery(
.build());
}

@DELETE
@Path("clientQuery/{clientQueryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the clientQueryId", notes = "No effect if no query exists for"
+ "the given clientQueryId on the requested broker. Query may continue to run for a short while after calling"
+ "cancel as it's done in a non-blocking manner. The cancel method can be called multiple times.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 404, message = "Query not found on the requested broker")
})
public String cancelClientQuery(
@ApiParam(value = "ClientQueryId given by the client", required = true)
@PathParam("clientQueryId") String clientQueryId,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the same endpoint we already have?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could expand the already existing endpoint by adding a @QueryParam to determine if the provided id is either internal or client-based, being internal as default.

The only drawback here is that internal ids are long whereas client ids are string, so type validation could no longer been done by Jersey but by the method itself.

47fb9bb786

The Controller scenario is different, though. There the existing endpoint is DELETE /query/{brokerId}/{queryId}, but for clientid-based cancellations we do not want to know the exact broker where the query felt into, so we need an endpoint such as DELETE /clientQuery/{clientQueryId}. Can't see how to unify these two.

try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQueryByClientId(clientQueryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled client query: " + clientQueryId;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
}
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format(
"Failed to cancel client query: %s on the broker due to error: %s", clientQueryId, e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(
String.format("Client query: %s not found on the broker", clientQueryId))
.build());
}

@GET
@Path("queries")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
Expand Down Expand Up @@ -141,6 +142,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final Map<Long, String> _clientQueryIds;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
Expand All @@ -160,7 +162,13 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
if (enableQueryCancellation) {
_queriesById = new ConcurrentHashMap<>();
_clientQueryIds = new ConcurrentHashMap<>();
} else {
_queriesById = null;
_clientQueryIds = null;
}

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
Expand Down Expand Up @@ -210,13 +218,13 @@ public void shutDown() {

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
return queryServers != null ? queryServers._servers : Collections.emptySet();
}
Expand All @@ -225,7 +233,12 @@ Set<ServerInstance> getRunningServers(long requestId) {
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
return cancelQueryByRequestId(requestId, timeoutMs, executor, connMgr, serverResponses);
}

private boolean cancelQueryByRequestId(long requestId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception {
QueryServers queryServers = _queriesById.get(requestId);
if (queryServers == null) {
return false;
Expand Down Expand Up @@ -275,6 +288,21 @@ public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, Htt
return true;
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker");
Optional<Long> requestId = _clientQueryIds.entrySet().stream()
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst();
if (requestId.isPresent()) {
return cancelQueryByRequestId(requestId.get(), timeoutMs, executor, connMgr, serverResponses);
} else {
LOGGER.warn("query cancellation cannot be performed due to unknown client query id: {}", clientQueryId);
return false;
}
}

@Override
protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions,
JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext,
Expand Down Expand Up @@ -797,7 +825,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}
BrokerResponseNative brokerResponse;
if (_queriesById != null) {
if (isQueryCancellationEnabled()) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any
// potential failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand All @@ -807,13 +835,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
String cqid = maybeSaveClientQueryId(requestId, sqlNodeAndOptions);
LOGGER.debug("Keep track of running query: {}", requestId);
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
brokerResponse.setClientRequestId(cqid);
} finally {
_queriesById.remove(requestId);
maybeRemoveClientQueryId(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
Expand Down Expand Up @@ -865,6 +896,48 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO
}
}

private String maybeSaveClientQueryId(long requestId, SqlNodeAndOptions sqlNodeAndOptions) {
if (!isQueryCancellationEnabled()) {
return null;
}
String clientQueryId = extractClientQueryId(sqlNodeAndOptions);
if (StringUtils.isBlank(clientQueryId)) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) in general we don't recommend returning NULL as a coding practice

}
String prev = _clientQueryIds.put(requestId, clientQueryId);
if (!clientQueryId.equals(prev)) {
LOGGER.warn("client query id override for id {} (old: {}, new: {})", requestId, prev, clientQueryId);
} else {
LOGGER.info("client query id stored for requestId {}: {}", requestId, clientQueryId);
}
return clientQueryId;
}

private boolean maybeRemoveClientQueryId(long requestId) {
if (!isQueryCancellationEnabled()) {
return false;
}
// we protected insertion with isBlank, so null is enough to assume that no entry exists
String clientQueryId = _clientQueryIds.remove(requestId);
if (clientQueryId != null) {
LOGGER.debug("client query id {} removed for requestId {}", clientQueryId, requestId);
return true;
} else {
return false;
}
}

private String extractClientQueryId(SqlNodeAndOptions sqlNodeAndOptions) {
if (sqlNodeAndOptions.getOptions() == null) {
return null;
}
return sqlNodeAndOptions.getOptions().get(QueryOptionKey.CLIENT_QUERY_ID);
}

private boolean isQueryCancellationEnabled() {
return _queriesById != null;
}

@VisibleForTesting
static String addRoutingPolicyInErrMsg(String errorMessage, String realtimeRoutingPolicy,
String offlineRoutingPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,19 @@ default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang, Strin
boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;

/**
* Cancel a query as identified by the clientQueryId provided externally. This method is non-blocking so the query may
* still run for a while after calling this method. This cancel method can be called multiple times.
* @param clientQueryId the Id assigned to the query by the client
* @param timeoutMs timeout to wait for servers to respond the cancel requests
* @param executor to send cancel requests to servers in parallel
* @param connMgr to provide the http connections
* @param serverResponses to collect cancel responses from all servers if a map is provided
* @return true if there is a running query for the given clientQueryId.
*/
boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses);
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
// TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if
// not found, try on the singleStaged engine.
return _singleStageBrokerRequestHandler.cancelQueryByClientId(
clientQueryId, timeoutMs, executor, connMgr, serverResponses);
}

private CursorResponse getCursorResponse(Integer numRows, BrokerResponse response)
throws Exception {
if (numRows == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
throw new UnsupportedOperationException();
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
// TODO: Support query cancellation for multi-stage engine
throw new UnsupportedOperationException();
}

/**
* Returns the string representation of the Set of Strings with a limit on the number of elements.
* @param setOfStrings Set of strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
return false;
}

@Override
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor,
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses)
throws Exception {
// TODO: Implement this.
return false;
}

private RangeTimeSeriesRequest buildRangeTimeSeriesRequest(String language, String queryParamString)
throws URISyntaxException {
List<NameValuePair> pairs = URLEncodedUtils.parse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@ default int getExceptionsSize() {
*/
void setRequestId(String requestId);

/**
* Returns the client request IF of the query (if any).
*/
String getClientRequestId();

/**
* Sets the (optional) client requestID of the query;
*/
void setClientRequestId(String clientRequestId);

/**
* Returns the broker ID that handled the query.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "timeUsedMs",
"requestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
"minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer",
"numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs",
"offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter",
"numEntriesScannedPostFilter", "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched", "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
"numSegmentsPrunedByServer", "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", "numSegmentsPrunedByValue",
"brokerReduceTimeMs", "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
"realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
"realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs",
"explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried"
Expand All @@ -72,6 +72,7 @@ public class BrokerResponseNative implements BrokerResponse {
private boolean _numGroupsLimitReached = false;
private long _timeUsedMs = 0L;
private String _requestId;
private String _clientRequestId;
private String _brokerId;
private long _numDocsScanned = 0L;
private long _totalDocs = 0L;
Expand Down Expand Up @@ -227,6 +228,16 @@ public void setRequestId(String requestId) {
_requestId = requestId;
}

@Override
public String getClientRequestId() {
return _clientRequestId;
}

@Override
public void setClientRequestId(String clientRequestId) {
_clientRequestId = clientRequestId;
}

@Override
public String getBrokerId() {
return _brokerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached", "maxRowsInJoinReached",
"maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "brokerId",
"numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried",
"numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"maxRowsInWindowReached", "timeUsedMs", "stageStats", "maxRowsInOperator", "requestId", "clientRequestId",
"brokerId", "numDocsScanned", "totalDocs", "numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", "numConsumingSegmentsMatched",
"minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", "numSegmentsPrunedByServer", "numSegmentsPrunedInvalid",
"numSegmentsPrunedByLimit", "numSegmentsPrunedByValue", "brokerReduceTimeMs", "offlineThreadCpuTimeNs",
Expand Down Expand Up @@ -182,6 +182,17 @@ public String getRequestId() {
return _requestId;
}

@Override
public String getClientRequestId() {
// TODO: support cqid for MSQE
return null;
}

@Override
public void setClientRequestId(String clientRequestId) {
// TODO: support cqid for MSQE
}

@Override
public void setRequestId(String requestId) {
_requestId = requestId;
Expand Down
Loading
Loading