-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable querying policy-enabled table in MSQ, and use RestrictedDataSource as a base in DataSourceAnalysis. #17666
base: master
Are you sure you want to change the base?
Conversation
…le policy restriction in MSQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
had a first pass and have some questions and thoughts.
Also, maybe you could try to avoid reformatting entire files, all of these unrelated formatting changes make review harder than it should be. I know its just the tooling doing it to adhere to the style stuff, but my preference at least would be to do these cosmetic changes as you notice them as standalone PR to keep reviews simple.
*/ | ||
private <FactoryType extends FrameProcessorFactory<ProcessorReturnType, ManagerReturnType, ExtraInfoType>, ProcessorReturnType, ManagerReturnType, ExtraInfoType> void makeAndRunWorkProcessors() | ||
throws IOException | ||
private <FactoryT extends FrameProcessorFactory<ProcessorReturnT, ManagerReturnT, ExtraInfoT>, ProcessorReturnT, ManagerReturnT, ExtraInfoT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why change these names? just seems to add extra noise to the PR
final int channelSize = outputChannels.getAllChannels().size(); | ||
final int parallelismBoundedByChannelSize = channelSize == 0 ? parallelism : Math.min(parallelism, channelSize); | ||
final int maxOutstandingProcessors = Math.max(1, parallelismBoundedByChannelSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the old code seemed clearer and had comments that were nice
workerToTaskIds.compute(i, (workerId, taskIds) -> { | ||
if (taskIds == null) { | ||
taskIds = new ArrayList<>(); | ||
} | ||
taskIds.add(task.getId()); | ||
return taskIds; | ||
}); | ||
workerToTaskIds.computeIfAbsent(i, (unused) -> (new ArrayList<>())).add(task.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't equivalent, previously it would always add the taskId to the worker, now it only adds if the worker isn't there, is that ok?
@@ -198,6 +198,7 @@ private Function<SegmentReference, SegmentReference> createSegmentMapFunction() | |||
|
|||
DataSource inlineChannelData(final DataSource originalDataSource) | |||
{ | |||
// TODO: need to handle RestrictedInputNumberDataSource here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we typically don't leave TODO comments in the code, either do the thing or leave a bigger comment explaining the problem that needs addressed in the future
* join tree. | ||
*/ | ||
@JsonTypeName("restrictedInputNumber") | ||
public class RestrictedInputNumberDataSource implements DataSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be InputNumberRestrictedDataSource
instead?
* Computes a native druid query, must be called from the constructor. The returned query will be one of following: | ||
* <ul> | ||
* <li> {@link GroupByQuery} | ||
* <li> {@link WindowOperatorQuery} | ||
* <li> {@link TimeBoundaryQuery} | ||
* <li> {@link TimeseriesQuery} | ||
* <li> {@link TopNQuery} | ||
* <li> {@link ScanQuery} | ||
* </ul> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: afaik we do not publish javadocs, so these could just be a plain list
/** | ||
* Returns an updated {@link DruidQuery} based on the policy restrictions on tables. | ||
*/ | ||
public DruidQuery withPolicies(Map<String, Optional<Policy>> policyMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method seems off to me, like why isn't dataSource
updated as well? I think calling this makes a DruidQuery
that is in a bit of a strange state, and I think it basically only chill with the way that MSQTaskQueryMaker
is working since it only really modifies the query
part of the DruidQuery
.
I'm not sure what is better to do here, since it is maybe odd that MSQ is using DruidQuery
directly for stuff while this is filled with planner stuff, need to think about it a bit.
if (query.getDataSource().getAnalysis().isJoin()) { | ||
// Joins may require significant computation to compute the segmentMapFn. Offload it to a processor. | ||
return new SimpleSegmentMapFnProcessor(query); | ||
} else { | ||
// Non-joins are expected to have cheap-to-compute segmentMapFn. Do the computation in the factory thread, | ||
// without offloading to a processor. | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change? I guess the result is that above we always need a ChainedProcessorManager
, is that necessary?
@@ -1187,6 +1149,11 @@ public OutputChannels getOutputChannels() | |||
{ | |||
return outputChannels; | |||
} | |||
|
|||
public ListenableFuture<OutputChannels> waitResultReadyAndGetSanityCheckedChannels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this could just be called waitResultReady
/** | ||
* Verifies there is exactly one channel per partition. | ||
*/ | ||
public OutputChannels sanityCheck() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should call this verify()
or verifySingleChannel()
?
Description
This PR enables querying policy-enabled table in MSQ.
Key changed/added classes in this PR
DataSourceAnalysis
,getBaseTableDataSource
can now return the base ofRestrictedDataSource
. This is a more robust solution than using the underlying table as base.DruidQuery
, can also be created bywithPolicy
, which just applies policy restriction to the originalquery
.MSQTaskQueryMaker
would apply restrictions onDruidQuery
, instead of throw permission error.DataSourcePlan
can handleRestrictedDataSource
.RestrictedInputNumberDataSource
, which basically wraps a NumberDataSource with a policy, and itsSegmentMapFn
can be used to create aRestrictedSegment
.RunWorkOrder
, try to make a few refactors to make the code clear, no behavior change.ShufflePipelineBuilder.build()
, it was not clear before that the channel future should only be returned when the resultFuture is ready. Also, the sanity check is moved toOutputChannels
.BaseLeafFrameProcessorFactory.makeProcessors()
. Previously,makeSegmentMapFnProcessor
can return null. After this PR, it cannot. The difference is that now the SegmentMapFn from query.dataSource is handled inChainedProcessorManager
for all query (was just join query before).This PR has: