Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix missing required columns of CursorBuildSpec from RowFilterPolicy #17664

Next Next commit
fix missing required columns of CursorBuildSpec from RowFilterPolicy
changes:
* RowFilterPolicy.visit now uses new method FilteredCursorFactory.addFilter to re-use the CursorBuildSpec transform of FilteredCursorFactory of adding a filter and its required columns to a CursorBuildSpec
* Added javadoc to CursorFactory, CursorHolder, and CursorBuildSpec to clarify usage
clintropolis committed Jan 24, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit b4b792fd60a51f15982a133d9e76acb91621fc35
Original file line number Diff line number Diff line change
@@ -39,6 +39,11 @@ public interface Policy
/**
* Apply this policy to a {@link CursorBuildSpec} to seamlessly enforce policies for cursor-based queries. The
* application must encapsulate 100% of the requirements of this policy.
* <p>
* Any transforms done to the spec must be sure to update {@link CursorBuildSpec#getPhysicalColumns()} and
* {@link CursorBuildSpec#getVirtualColumns()} as needed to ensure the underlying
* {@link org.apache.druid.segment.CursorHolder} that is being restricted has accurate information about the set of
* required columns.
*/
CursorBuildSpec visit(CursorBuildSpec spec);

Original file line number Diff line number Diff line change
@@ -23,12 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.FilteredCursorFactory;

import javax.annotation.Nonnull;
import java.util.Arrays;
import java.util.Objects;

/**
@@ -58,12 +56,7 @@ public DimFilter getRowFilter()
@Override
public CursorBuildSpec visit(CursorBuildSpec spec)
{
CursorBuildSpec.CursorBuildSpecBuilder builder = CursorBuildSpec.builder(spec);
final Filter filter = spec.getFilter();
final Filter policyFilter = this.rowFilter.toFilter();

builder.setFilter(Filters.and(Arrays.asList(policyFilter, filter)));
return builder.build();
return FilteredCursorFactory.addFilter(spec, rowFilter);
}

@Override
120 changes: 112 additions & 8 deletions processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.vector.VectorCursor;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;

@@ -34,6 +35,23 @@
import java.util.List;
import java.util.Set;

/**
* Defines the plan for how the reader will scan, filter, transform, group and aggregate, and or order the data from a
* {@link CursorFactory} so that an appropriate {@link CursorHolder} can be constructed. The {@link CursorBuildSpec}
* includes physical and virtual columns will be read from the data, a {@link Filter} so that the {@link Cursor} and
* {@link VectorCursor} only provide matching rows, and details on how the scanned, transformed, and filtered data will
* be grouped, aggegated, and ordered if applicable to allow {@link CursorHolder} construction to provide optimized
* {@link Cursor} or {@link VectorCursor} such as providing cursors for pre-aggregated data with
* {@link org.apache.druid.segment.projections.Projections}.
*
* @see #getFilter()
* @see #getInterval()
* @see #getPhysicalColumns()
* @see #getVirtualColumns()
* @see #getGroupingColumns()
* @see #getAggregators()
* @see #getPreferredOrdering()
*/
public class CursorBuildSpec
{
public static final CursorBuildSpec FULL_SCAN = builder().build();
@@ -114,7 +132,7 @@ public Interval getInterval()

/**
* Set of physical columns required from a cursor. If null, and {@link #groupingColumns} is null or empty and
* {@link #aggregators} is null or empty, then a {@link CursorHolder} must assume that ALL columns are required
* {@link #aggregators} is null or empty, then a {@link CursorHolder} must assume that ALL columns are required.
*/
@Nullable
public Set<String> getPhysicalColumns()
@@ -235,6 +253,7 @@ public static class CursorBuildSpecBuilder
private List<OrderBy> preferredOrdering = Collections.emptyList();

private QueryContext queryContext = QueryContext.empty();

@Nullable
private QueryMetrics<?> queryMetrics;

@@ -257,7 +276,17 @@ private CursorBuildSpecBuilder(CursorBuildSpec buildSpec)
}

/**
* @see CursorBuildSpec#getFilter()
* @see CursorBuildSpec#getFilter() for usage.
*/
@Nullable
public Filter getFilter()
{
return filter;
}

/**
* @see CursorBuildSpec#getFilter() for usage. All {@link Filter#getRequiredColumns()} must be explicitly added to
* {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}.
*/
public CursorBuildSpecBuilder setFilter(@Nullable Filter filter)
{
@@ -266,7 +295,15 @@ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter)
}

/**
* @see CursorBuildSpec#getInterval()
* @see CursorBuildSpec#getInterval() for usage.
*/
public Interval getInterval()
{
return interval;
}

/**
* @see CursorBuildSpec#getInterval() for usage.
*/
public CursorBuildSpecBuilder setInterval(Interval interval)
{
@@ -275,7 +312,20 @@ public CursorBuildSpecBuilder setInterval(Interval interval)
}

/**
* @see CursorBuildSpec#getPhysicalColumns()
* @see CursorBuildSpec#getPhysicalColumns() for usage.
*/
@Nullable
public Set<String> getPhysicalColumns()
{
return physicalColumns;
}

/**
* @see CursorBuildSpec#getPhysicalColumns() for usage. The backing value is not automatically populated by calls to
* {@link #setFilter(Filter)}, {@link #setVirtualColumns(VirtualColumns)}, {@link #setAggregators(List)}, or
* {@link #setPreferredOrdering(List)}, so this must be explicitly set for all required physical columns. If set to
* null, and {@link #groupingColumns} is null or empty and {@link #aggregators} is null or empty, then a
* {@link CursorHolder} must assume that ALL columns are required
*/
public CursorBuildSpecBuilder setPhysicalColumns(@Nullable Set<String> physicalColumns)
{
@@ -284,7 +334,16 @@ public CursorBuildSpecBuilder setPhysicalColumns(@Nullable Set<String> physicalC
}

/**
* @see CursorBuildSpec#getVirtualColumns()
* @see CursorBuildSpec#getVirtualColumns() for usage. All {@link VirtualColumn#requiredColumns()} must be
* explicitly added to {@link #physicalColumns} if it is set to a non-null value.
*/
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}

/**
* @see CursorBuildSpec#getVirtualColumns() for usage.
*/
public CursorBuildSpecBuilder setVirtualColumns(VirtualColumns virtualColumns)
{
@@ -293,7 +352,16 @@ public CursorBuildSpecBuilder setVirtualColumns(VirtualColumns virtualColumns)
}

/**
* @see CursorBuildSpec#getGroupingColumns()
* @see CursorBuildSpec#getGroupingColumns() for usage.
*/
@Nullable
public List<String> getGroupingColumns()
{
return groupingColumns;
}

/**
* @see CursorBuildSpec#getGroupingColumns() for usage.
*/
public CursorBuildSpecBuilder setGroupingColumns(@Nullable List<String> groupingColumns)
{
@@ -302,7 +370,17 @@ public CursorBuildSpecBuilder setGroupingColumns(@Nullable List<String> grouping
}

/**
* @see CursorBuildSpec#getAggregators()
* @see CursorBuildSpec#getAggregators() for usage.
*/
@Nullable
public List<AggregatorFactory> getAggregators()
{
return aggregators;
}

/**
* @see CursorBuildSpec#getAggregators() for usage. All {@link AggregatorFactory#requiredFields()} must be
* explicitly added to {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}.
*/
public CursorBuildSpecBuilder setAggregators(@Nullable List<AggregatorFactory> aggregators)
{
@@ -311,14 +389,31 @@ public CursorBuildSpecBuilder setAggregators(@Nullable List<AggregatorFactory> a
}

/**
* @see CursorBuildSpec#getPreferredOrdering()
* @see CursorBuildSpec#getPreferredOrdering() for usage.
*/
public List<OrderBy> getPreferredOrdering()
{
return preferredOrdering;
}

/**
* @see CursorBuildSpec#getPreferredOrdering() for usage. All {@link OrderBy#getColumnName()} must be explicitly
* added to {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}.
*/
public CursorBuildSpecBuilder setPreferredOrdering(List<OrderBy> preferredOrdering)
{
this.preferredOrdering = preferredOrdering;
return this;
}

/**
* @see CursorBuildSpec#getQueryContext()
*/
public QueryContext getQueryContext()
{
return queryContext;
}

/**
* @see CursorBuildSpec#getQueryContext()
*/
@@ -328,6 +423,15 @@ public CursorBuildSpecBuilder setQueryContext(QueryContext queryContext)
return this;
}

/**
* @see CursorBuildSpec#getQueryMetrics()
*/
@Nullable
public QueryMetrics<?> getQueryMetrics()
{
return queryMetrics;
}

/**
* @see CursorBuildSpec#getQueryMetrics()
*/
Original file line number Diff line number Diff line change
@@ -26,6 +26,10 @@

public interface CursorFactory extends ColumnInspector
{
/**
* Creates a {@link CursorHolder} for a given {@link CursorBuildSpec} which describes how the reader is going to
* scan, filter, transform, group and aggregate, and/or order the data.
*/
CursorHolder makeCursorHolder(CursorBuildSpec spec);

/**
Original file line number Diff line number Diff line change
@@ -31,6 +31,22 @@
import java.util.Collections;
import java.util.List;

/**
* Provides {@link Cursor} and if available, {@link VectorCursor} which readers can use to scan a set of rows defined
* originally from a {@link CursorBuildSpec} which describes how data is to be scanned, transformed, filter, grouped and
* aggregated, and/or ordered.
* <p>
* If {@link #canVectorize()} then {@link #asVectorCursor()} will return a non-null value allowing for processing rows
* in batches instead of individually.
* <p>
* If {@link #isPreAggregated()} is true, readers which are aggregating must call
* {@link #getAggregatorsForPreAggregated()} to get the updated set of {@link AggregatorFactory} to correctly process
* results
* <p>
* {@link #getOrdering()} defines how the data is ordered in the {@link Cursor} or {@link VectorCursor}, allowing
* readers to compare to their own desired ordering and potentially skip ordering their own results if it is
* pre-ordered.
*/
public interface CursorHolder extends Closeable
{
/**
@@ -60,8 +76,9 @@ default boolean canVectorize()
}

/**
* Returns true if the {@link Cursor} or {@link VectorCursor} contains pre-aggregated columns for all
* {@link AggregatorFactory} specified in {@link CursorBuildSpec#getAggregators()}.
* Returns true if the {@link Cursor} or {@link VectorCursor} contains pre-aggregated columns for all grouping columns
* specified in {@link CursorBuildSpec#getGroupingColumns()} and all {@link AggregatorFactory} specified in
* {@link CursorBuildSpec#getAggregators()}.
* <p>
* If this method returns true, {@link ColumnSelectorFactory} and
* {@link org.apache.druid.segment.vector.VectorColumnSelectorFactory} created from {@link Cursor} and
@@ -90,7 +107,7 @@ default List<AggregatorFactory> getAggregatorsForPreAggregated()
/**
* Returns cursor ordering, which may or may not match {@link CursorBuildSpec#getPreferredOrdering()}. If returns
* an empty list then the cursor has no defined ordering.
*
* <p>
* Cursors associated with this holder return rows in this ordering, using the natural comparator for the column type.
* Includes {@link ColumnHolder#TIME_COLUMN_NAME} if appropriate.
*/
Original file line number Diff line number Diff line change
@@ -45,32 +45,10 @@ public FilteredCursorFactory(CursorFactory delegate, @Nullable DimFilter filter)
@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
final CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder = CursorBuildSpec.builder(spec);
final Filter newFilter;
final Set<String> physicalColumns;
if (filter != null) {
if (spec.getFilter() == null) {
newFilter = filter.toFilter();
} else {
newFilter = Filters.and(Arrays.asList(spec.getFilter(), filter.toFilter()));
}
if (spec.getPhysicalColumns() != null) {
physicalColumns = new HashSet<>(spec.getPhysicalColumns());
for (String column : filter.getRequiredColumns()) {
if (!spec.getVirtualColumns().exists(column)) {
physicalColumns.add(column);
}
}
} else {
physicalColumns = null;
}
} else {
newFilter = spec.getFilter();
physicalColumns = spec.getPhysicalColumns();
if (filter == null) {
return delegate.makeCursorHolder(spec);
}
buildSpecBuilder.setFilter(newFilter)
.setPhysicalColumns(physicalColumns);
return delegate.makeCursorHolder(buildSpecBuilder.build());
return delegate.makeCursorHolder(addFilter(CursorBuildSpec.builder(spec), filter).build());
}

@Override
@@ -85,4 +63,34 @@ public ColumnCapabilities getColumnCapabilities(String column)
{
return delegate.getColumnCapabilities(column);
}

/**
* Adds a {@link Filter} from a {@link DimFilter} and its required physical columns to a
* {@link CursorBuildSpec.CursorBuildSpecBuilder}. If the {@link Filter} requires virtual columns, they must already
* be added to the {@link CursorBuildSpec.CursorBuildSpecBuilder} or they will be considered physical columns.
*/
public static CursorBuildSpec.CursorBuildSpecBuilder addFilter(
CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder,
DimFilter filter
)
{
final Filter newFilter;
final Set<String> physicalColumns;
if (buildSpecBuilder.getFilter() == null) {
newFilter = filter.toFilter();
} else {
newFilter = Filters.and(Arrays.asList(buildSpecBuilder.getFilter(), filter.toFilter()));
}
if (buildSpecBuilder.getPhysicalColumns() != null) {
physicalColumns = new HashSet<>(buildSpecBuilder.getPhysicalColumns());
for (String column : filter.getRequiredColumns()) {
if (!buildSpecBuilder.getVirtualColumns().exists(column)) {
physicalColumns.add(column);
}
}
} else {
physicalColumns = null;
}
return buildSpecBuilder.setFilter(newFilter).setPhysicalColumns(physicalColumns);
}
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.EqualityFilter;
@@ -82,12 +83,17 @@ public void testVisit()
public void testVisit_combineFilters()
{
Filter filter = new EqualityFilter("col0", ColumnType.STRING, "val0", null);
CursorBuildSpec spec = CursorBuildSpec.builder().setFilter(filter).build();
CursorBuildSpec spec = CursorBuildSpec.builder()
.setFilter(filter).
setPhysicalColumns(filter.getRequiredColumns())
.build();

DimFilter policyFilter = new EqualityFilter("col", ColumnType.STRING, "val", null);
final RowFilterPolicy policy = RowFilterPolicy.from(policyFilter);

Filter expected = new AndFilter(ImmutableList.of(policyFilter.toFilter(), filter));
Assert.assertEquals(expected, policy.visit(spec).getFilter());
final CursorBuildSpec transformed = policy.visit(spec);
Assert.assertEquals(expected, transformed.getFilter());
Assert.assertEquals(ImmutableSet.of("col", "col0"), transformed.getPhysicalColumns());
}
}