-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable query cancellation for MSQE + cancel using client-provided id #14823
base: master
Are you sure you want to change the base?
Conversation
} | ||
String clientQueryId = extractClientQueryId(sqlNodeAndOptions); | ||
if (StringUtils.isBlank(clientQueryId)) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) in general we don't recommend returning NULL as a coding practice
Is ClientQueryID a new concept? Is it same as requestID ? How does the support added here improve the existing Query Cancellation (which is also exposed to user IIRC) ? |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14823 +/- ##
============================================
+ Coverage 61.75% 63.66% +1.91%
- Complexity 207 1472 +1265
============================================
Files 2436 2710 +274
Lines 133233 152083 +18850
Branches 20636 23484 +2848
============================================
+ Hits 82274 96829 +14555
- Misses 44911 47978 +3067
- Partials 6048 7276 +1228
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Hi Siddharth, AFAIK, the current cancellation feature depends on the internal requestId generated by the broker itself. That request id is not returned until the query completes, so an external user requires first to ask for the active running queries, determine from the responded array the requestId assigned to the one he's interested in (just comparing the query body) and finally use the cancel operation to abort it. That's two back-and-forth trips between the user and the cluster. With a client-provided requestId he can skip one step, going straight to the cancel operation using his own ID to abort the query. |
As some extra context, the endgame of this is enable on UI a "Cancel" button the customer can use to abort an ongoing query. Using a query id provided by the customer or the UI itself, that can be done without need of any internal id retrieval. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding several comments but I wasn't able to read the whole PR.
Although I'm asking for changes, it is a good PR overall. We just need to finish the last mile.
} catch (InterruptedException e) { | ||
//TODO: handle interruption | ||
//Thread.currentThread().interrupt(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to fix this TODO before merging
public String cancelClientQuery( | ||
@ApiParam(value = "ClientQueryId given by the client", required = true) | ||
@PathParam("clientQueryId") String clientQueryId, | ||
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") | ||
@DefaultValue("3000") int timeoutMs, | ||
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") | ||
boolean verbose) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use the same endpoint we already have?
boolean enableQueryCancellation = | ||
Boolean.parseBoolean(config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); | ||
if (enableQueryCancellation) { | ||
_queriesById = new ConcurrentHashMap<>(); | ||
_clientQueryIds = new ConcurrentHashMap<>(); | ||
} else { | ||
_queriesById = null; | ||
_clientQueryIds = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not something we introduced in this PR, but something I think we need to take care of in the future:
We use BaseBrokerRequestHandler as the root/common state for the broker, probably for historical reasons. But that is not true. A single broker may have SSE, MSE, GRPC and even TSE queries running at the same time. It would be a better design to have a shared state between them instead of the trick we do with the delegate.
This is something we need to improve in the future
@@ -179,6 +198,9 @@ protected abstract BrokerResponse handleRequest(long requestId, String query, Sq | |||
@Nullable HttpHeaders httpHeaders, AccessControl accessControl) | |||
throws Exception; | |||
|
|||
protected abstract boolean handleCancel(long queryId, int timeoutMs, Executor executor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need javadoc here to explain how it should work. At least we should say that queryId may be a client or pinot generated id.
LOGGER.warn("Query cancellation cannot be performed due to unknown client query id: {}", clientQueryId); | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should't we throw here to notify the caller that the query id is incorrect?
@@ -810,14 +813,17 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO | |||
// can always list the running queries and cancel query again until it ends. Just that such race | |||
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to | |||
// servers takes time, but will address later if needed. | |||
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); | |||
LOGGER.debug("Keep track of running query: {}", requestId); | |||
String clientRequestId = maybeSaveQuery(requestId, sqlNodeAndOptions, query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another reason to split this method is that we may want to set brokerRespose.setClientRequestId
even if query cancellation is disabled.
public Map<Long, String> getRunningQueries() { | ||
Preconditions.checkState(isQueryCancellationEnabled(), "Query cancellation is not enabled on broker"); | ||
return new HashMap<>(_queriesById); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is safer and better practice to return an immutable view of the map instead. In the delegate we can create a copy we can modify.
@@ -547,6 +547,17 @@ public static long now() { | |||
return System.currentTimeMillis(); | |||
} | |||
|
|||
@ScalarFunction | |||
public static long sleep(long millis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the sleep trick. I mean, to have a function that wait some MS or until some epoch per row is great for tests. But given we didn't find a way to make evaluation lazy (sleep with constant arguments is executed at optimization phase) we had to call sleep as sleep(col + constant)
. That is the trick I don't like.
Instead, I suggest including an option or something like that that can be understood by different parts of the code so we can sleep whenever we want (in the broker, in the leaf operator or in SSE).
The sleep function can also be used to create attacks (see https://www.sqlinjection.net/time-based/). I understand the sleep function is not the topic of this PR but just a utility to test the PR, so I don't think it is correct to block the PR until we have a perfect sleep function. Therefore my suggestion is to at least change the implementation so this function only works if tests are enabled. We can do that by using this horrible Java trick:
boolean assertEnabled = false;
assert assertEnabled = true;
if (assertEnabled) {
Thread.sleep(millis);
}
public void testSleepFunction() { | ||
long startTime = System.currentTimeMillis(); | ||
testFunction("sleep(500)", Collections.emptyList(), new GenericRow(), result -> { | ||
assertTrue((long) result >= 500); | ||
}); | ||
long endTime = System.currentTimeMillis(); | ||
assertTrue(endTime - startTime >= 500); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can reduce time to something in the order of tens of millis
String clientRequestId = UUID.randomUUID().toString(); | ||
// tricky query: use sleep with some column data to avoid Calcite from optimizing it on compile time | ||
String sqlQuery = | ||
"SET " + CommonConstants.Broker.Request.QueryOptionKey.CLIENT_QUERY_ID + "='" + clientRequestId + "'; " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: directly use the option name instead of the const to make it easier to read.
clientQueryId
query option that can be used when using theclientQuery/{clientQueryId}
endpoint.sleep(ms)
function, as for today only recommended for testing purposes.Some refactor involved to reuse as much as possible cancellation logic between SSQE and MSQE.