-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable query cancellation for MSQE + cancel using client-provided id #14823
base: master
Are you sure you want to change the base?
Changes from all commits
f7a9488
39a4f94
c969abd
8162bc6
aa8c120
7a5f713
a9d1e49
97e7b5d
65f73a0
e3a9a5e
fe5c846
ae3260c
2eb506e
5dd5409
e9bbdac
9dcc393
5ac7d9e
a46659c
110fb16
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,18 +19,24 @@ | |
package org.apache.pinot.broker.requesthandler; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.google.common.base.Preconditions; | ||
import com.google.common.collect.Maps; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Executor; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
import javax.ws.rs.WebApplicationException; | ||
import javax.ws.rs.core.HttpHeaders; | ||
import javax.ws.rs.core.MultivaluedMap; | ||
import javax.ws.rs.core.Response; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.apache.hc.client5.http.io.HttpClientConnectionManager; | ||
import org.apache.pinot.broker.api.AccessControl; | ||
import org.apache.pinot.broker.api.RequesterIdentity; | ||
import org.apache.pinot.broker.broker.AccessControlFactory; | ||
|
@@ -51,6 +57,7 @@ | |
import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory; | ||
import org.apache.pinot.spi.exception.BadQueryRequestException; | ||
import org.apache.pinot.spi.trace.RequestContext; | ||
import org.apache.pinot.spi.utils.CommonConstants; | ||
import org.apache.pinot.spi.utils.CommonConstants.Broker; | ||
import org.apache.pinot.sql.parsers.SqlNodeAndOptions; | ||
import org.slf4j.Logger; | ||
|
@@ -74,6 +81,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { | |
protected final QueryLogger _queryLogger; | ||
@Nullable | ||
protected final String _enableNullHandling; | ||
protected final Map<Long, String> _queriesById; | ||
protected final Map<Long, String> _clientQueryIds; | ||
Comment on lines
+84
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we document from what to what are we mapping here? |
||
|
||
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, | ||
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { | ||
|
@@ -90,6 +99,16 @@ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok | |
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); | ||
_queryLogger = new QueryLogger(config); | ||
_enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING); | ||
|
||
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} | ||
Comment on lines
+103
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not something we introduced in this PR, but something I think we need to take care of in the future: We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate. This is something we need to improve in the future |
||
} | ||
|
||
@Override | ||
|
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | ||
throws Exception; | ||
|
||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id. |
||
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) throws Exception; | ||
|
||
protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { | ||
statistics.setNumRowsResultSet(response.getNumRowsResultSet()); | ||
// TODO: Add partial result flag to RequestContext | ||
|
@@ -223,4 +245,65 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons | |
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); | ||
statistics.setTraceInfo(response.getTraceInfo()); | ||
} | ||
|
||
@Override | ||
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
return new HashMap<>(_queriesById); | ||
} | ||
Comment on lines
+250
to
+253
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is safer and better practice to return an immutable view of the map instead. In the delegate we can create a copy we can modify. |
||
|
||
@Override | ||
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, | ||
Map<String, Integer> serverResponses) | ||
throws Exception { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
try { | ||
return handleCancel(queryId, timeoutMs, executor, connMgr, serverResponses); | ||
} finally { | ||
maybeRemoveQuery(queryId); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean cancelQueryByClientId(String clientQueryId, int timeoutMs, Executor executor, | ||
HttpClientConnectionManager connMgr, Map<String, Integer> serverResponses) | ||
throws Exception { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
Optional<Long> requestId = _clientQueryIds.entrySet().stream() | ||
.filter(e -> clientQueryId.equals(e.getValue())).map(Map.Entry::getKey).findFirst(); | ||
if (requestId.isPresent()) { | ||
return cancelQuery(requestId.get(), timeoutMs, executor, connMgr, serverResponses); | ||
} else { | ||
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); | ||
return false; | ||
Comment on lines
+277
to
+278
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should't we throw here to notify the caller that the query id is incorrect? |
||
} | ||
} | ||
|
||
protected String maybeSaveQuery(long requestId, SqlNodeAndOptions sqlNodeAndOptions, String query) { | ||
if (isQueryCancellationEnabled()) { | ||
String clientRequestId = sqlNodeAndOptions.getOptions() != null | ||
? sqlNodeAndOptions.getOptions().get(Broker.Request.QueryOptionKey.CLIENT_QUERY_ID) : null; | ||
_queriesById.put(requestId, query); | ||
if (StringUtils.isNotBlank(clientRequestId)) { | ||
_clientQueryIds.put(requestId, clientRequestId); | ||
LOGGER.debug("Keep track of running query: {} (with client id {})", requestId, clientRequestId); | ||
} else { | ||
LOGGER.debug("Keep track of running query: {}", requestId); | ||
} | ||
return clientRequestId; | ||
} | ||
return null; | ||
} | ||
Comment on lines
+282
to
+296
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is not aligned with what I would expect by its name. This method does two different things I think we should split into two methods:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact the second method could be called |
||
|
||
protected void maybeRemoveQuery(long requestId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact we can call this method something like |
||
if (isQueryCancellationEnabled()) { | ||
_queriesById.remove(requestId); | ||
_clientQueryIds.remove(requestId); | ||
LOGGER.debug("Remove track of running query: {}", requestId); | ||
} | ||
} | ||
|
||
protected boolean isQueryCancellationEnabled() { | ||
return _queriesById != null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,6 @@ | |
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.stream.Collectors; | ||
import javax.annotation.Nullable; | ||
import javax.annotation.concurrent.ThreadSafe; | ||
import javax.ws.rs.WebApplicationException; | ||
|
@@ -142,9 +141,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ | |
protected final boolean _enableQueryLimitOverride; | ||
protected final boolean _enableDistinctCountBitmapOverride; | ||
protected final int _queryResponseLimit; | ||
protected final Map<Long, QueryServers> _serversById; | ||
// if >= 0, then overrides default limit of 10, otherwise setting is ignored | ||
protected final int _defaultQueryLimit; | ||
protected final Map<Long, QueryServers> _queriesById; | ||
protected final boolean _enableMultistageMigrationMetric; | ||
protected ExecutorService _multistageCompileExecutor; | ||
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue; | ||
|
@@ -162,11 +161,15 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro | |
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false); | ||
_queryResponseLimit = | ||
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT); | ||
if (this.isQueryCancellationEnabled()) { | ||
_serversById = new ConcurrentHashMap<>(); | ||
} else { | ||
_serversById = null; | ||
} | ||
_defaultQueryLimit = config.getProperty(Broker.CONFIG_OF_BROKER_DEFAULT_QUERY_LIMIT, | ||
Broker.DEFAULT_BROKER_QUERY_LIMIT); | ||
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null; | ||
|
||
_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, | ||
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC); | ||
|
@@ -215,31 +218,33 @@ public void shutDown() { | |
} | ||
} | ||
|
||
@Override | ||
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); | ||
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); | ||
} | ||
|
||
@VisibleForTesting | ||
Set<ServerInstance> getRunningServers(long requestId) { | ||
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); | ||
QueryServers queryServers = _queriesById.get(requestId); | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
QueryServers queryServers = _serversById.get(requestId); | ||
return queryServers != null ? queryServers._servers : Collections.emptySet(); | ||
} | ||
|
||
@Override | ||
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, | ||
protected void maybeRemoveQuery(long requestId) { | ||
super.maybeRemoveQuery(requestId); | ||
if (isQueryCancellationEnabled()) { | ||
_serversById.remove(requestId); | ||
} | ||
} | ||
|
||
@Override | ||
protected boolean handleCancel(long queryId, int timeoutMs, Executor executor, HttpClientConnectionManager connMgr, | ||
Map<String, Integer> serverResponses) | ||
throws Exception { | ||
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); | ||
QueryServers queryServers = _queriesById.get(requestId); | ||
QueryServers queryServers = _serversById.get(queryId); | ||
if (queryServers == null) { | ||
return false; | ||
} | ||
|
||
// TODO: Use different global query id for OFFLINE and REALTIME table after releasing 0.12.0. See QueryIdUtils for | ||
// details | ||
String globalQueryId = getGlobalQueryId(requestId); | ||
String globalQueryId = getGlobalQueryId(queryId); | ||
List<Pair<String, String>> serverUrls = new ArrayList<>(); | ||
for (ServerInstance serverInstance : queryServers._servers) { | ||
serverUrls.add(Pair.of(String.format("%s/query/%s", serverInstance.getAdminEndpoint(), globalQueryId), null)); | ||
|
@@ -810,7 +815,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |
} | ||
} | ||
BrokerResponseNative brokerResponse; | ||
if (_queriesById != null) { | ||
if (isQueryCancellationEnabled()) { | ||
// Start to track the running query for cancellation just before sending it out to servers to avoid any | ||
// potential failures that could happen before sending it out, like failures to calculate the routing table etc. | ||
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and | ||
|
@@ -819,14 +824,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |
// can always list the running queries and cancel query again until it ends. Just that such race | ||
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to | ||
// servers takes time, but will address later if needed. | ||
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | ||
LOGGER.debug("Keep track of running query: {}", requestId); | ||
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another reason to split this method is that we may want to set |
||
if (isQueryCancellationEnabled()) { | ||
_serversById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | ||
} | ||
try { | ||
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, | ||
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, | ||
requestContext); | ||
brokerResponse.setClientRequestId(clientRequestId); | ||
} finally { | ||
_queriesById.remove(requestId); | ||
maybeRemoveQuery(requestId); | ||
LOGGER.debug("Remove track of running query: {}", requestId); | ||
} | ||
} else { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the same endpoint we already have?