From b4b792fd60a51f15982a133d9e76acb91621fc35 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 15:51:56 -0800 Subject: [PATCH 1/7] fix missing required columns of CursorBuildSpec from RowFilterPolicy changes: * RowFilterPolicy.visit now uses new method FilteredCursorFactory.addFilter to re-use the CursorBuildSpec transform of FilteredCursorFactory of adding a filter and its required columns to a CursorBuildSpec * Added javadoc to CursorFactory, CursorHolder, and CursorBuildSpec to clarify usage --- .../org/apache/druid/query/policy/Policy.java | 5 + .../druid/query/policy/RowFilterPolicy.java | 11 +- .../apache/druid/segment/CursorBuildSpec.java | 120 ++++++++++++++++-- .../apache/druid/segment/CursorFactory.java | 4 + .../apache/druid/segment/CursorHolder.java | 23 +++- .../druid/segment/FilteredCursorFactory.java | 58 +++++---- .../query/policy/RowFilterPolicyTest.java | 10 +- 7 files changed, 184 insertions(+), 47 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/policy/Policy.java b/processing/src/main/java/org/apache/druid/query/policy/Policy.java index 11a1aff4fbb8..16d54f753af3 100644 --- a/processing/src/main/java/org/apache/druid/query/policy/Policy.java +++ b/processing/src/main/java/org/apache/druid/query/policy/Policy.java @@ -39,6 +39,11 @@ public interface Policy /** * Apply this policy to a {@link CursorBuildSpec} to seamlessly enforce policies for cursor-based queries. The * application must encapsulate 100% of the requirements of this policy. + *

+ * Any transforms done to the spec must be sure to update {@link CursorBuildSpec#getPhysicalColumns()} and + * {@link CursorBuildSpec#getVirtualColumns()} as needed to ensure the underlying + * {@link org.apache.druid.segment.CursorHolder} that is being restricted has accurate information about the set of + * required columns. */ CursorBuildSpec visit(CursorBuildSpec spec); diff --git a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java index 97620381344a..755b6cb4e28c 100644 --- a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java +++ b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java @@ -23,12 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.query.filter.DimFilter; -import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.filter.Filters; +import org.apache.druid.segment.FilteredCursorFactory; import javax.annotation.Nonnull; -import java.util.Arrays; import java.util.Objects; /** @@ -58,12 +56,7 @@ public DimFilter getRowFilter() @Override public CursorBuildSpec visit(CursorBuildSpec spec) { - CursorBuildSpec.CursorBuildSpecBuilder builder = CursorBuildSpec.builder(spec); - final Filter filter = spec.getFilter(); - final Filter policyFilter = this.rowFilter.toFilter(); - - builder.setFilter(Filters.and(Arrays.asList(policyFilter, filter))); - return builder.build(); + return FilteredCursorFactory.addFilter(spec, rowFilter); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java index da1cd06b9b00..e0a16af017d3 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.vector.VectorCursor; import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; @@ -34,6 +35,23 @@ import java.util.List; import java.util.Set; +/** + * Defines the plan for how the reader will scan, filter, transform, group and aggregate, and or order the data from a + * {@link CursorFactory} so that an appropriate {@link CursorHolder} can be constructed. The {@link CursorBuildSpec} + * includes physical and virtual columns will be read from the data, a {@link Filter} so that the {@link Cursor} and + * {@link VectorCursor} only provide matching rows, and details on how the scanned, transformed, and filtered data will + * be grouped, aggegated, and ordered if applicable to allow {@link CursorHolder} construction to provide optimized + * {@link Cursor} or {@link VectorCursor} such as providing cursors for pre-aggregated data with + * {@link org.apache.druid.segment.projections.Projections}. + * + * @see #getFilter() + * @see #getInterval() + * @see #getPhysicalColumns() + * @see #getVirtualColumns() + * @see #getGroupingColumns() + * @see #getAggregators() + * @see #getPreferredOrdering() + */ public class CursorBuildSpec { public static final CursorBuildSpec FULL_SCAN = builder().build(); @@ -114,7 +132,7 @@ public Interval getInterval() /** * Set of physical columns required from a cursor. If null, and {@link #groupingColumns} is null or empty and - * {@link #aggregators} is null or empty, then a {@link CursorHolder} must assume that ALL columns are required + * {@link #aggregators} is null or empty, then a {@link CursorHolder} must assume that ALL columns are required. */ @Nullable public Set getPhysicalColumns() @@ -235,6 +253,7 @@ public static class CursorBuildSpecBuilder private List preferredOrdering = Collections.emptyList(); private QueryContext queryContext = QueryContext.empty(); + @Nullable private QueryMetrics queryMetrics; @@ -257,7 +276,17 @@ private CursorBuildSpecBuilder(CursorBuildSpec buildSpec) } /** - * @see CursorBuildSpec#getFilter() + * @see CursorBuildSpec#getFilter() for usage. + */ + @Nullable + public Filter getFilter() + { + return filter; + } + + /** + * @see CursorBuildSpec#getFilter() for usage. All {@link Filter#getRequiredColumns()} must be explicitly added to + * {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}. */ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter) { @@ -266,7 +295,15 @@ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter) } /** - * @see CursorBuildSpec#getInterval() + * @see CursorBuildSpec#getInterval() for usage. + */ + public Interval getInterval() + { + return interval; + } + + /** + * @see CursorBuildSpec#getInterval() for usage. */ public CursorBuildSpecBuilder setInterval(Interval interval) { @@ -275,7 +312,20 @@ public CursorBuildSpecBuilder setInterval(Interval interval) } /** - * @see CursorBuildSpec#getPhysicalColumns() + * @see CursorBuildSpec#getPhysicalColumns() for usage. + */ + @Nullable + public Set getPhysicalColumns() + { + return physicalColumns; + } + + /** + * @see CursorBuildSpec#getPhysicalColumns() for usage. The backing value is not automatically populated by calls to + * {@link #setFilter(Filter)}, {@link #setVirtualColumns(VirtualColumns)}, {@link #setAggregators(List)}, or + * {@link #setPreferredOrdering(List)}, so this must be explicitly set for all required physical columns. If set to + * null, and {@link #groupingColumns} is null or empty and {@link #aggregators} is null or empty, then a + * {@link CursorHolder} must assume that ALL columns are required */ public CursorBuildSpecBuilder setPhysicalColumns(@Nullable Set physicalColumns) { @@ -284,7 +334,16 @@ public CursorBuildSpecBuilder setPhysicalColumns(@Nullable Set physicalC } /** - * @see CursorBuildSpec#getVirtualColumns() + * @see CursorBuildSpec#getVirtualColumns() for usage. All {@link VirtualColumn#requiredColumns()} must be + * explicitly added to {@link #physicalColumns} if it is set to a non-null value. + */ + public VirtualColumns getVirtualColumns() + { + return virtualColumns; + } + + /** + * @see CursorBuildSpec#getVirtualColumns() for usage. */ public CursorBuildSpecBuilder setVirtualColumns(VirtualColumns virtualColumns) { @@ -293,7 +352,16 @@ public CursorBuildSpecBuilder setVirtualColumns(VirtualColumns virtualColumns) } /** - * @see CursorBuildSpec#getGroupingColumns() + * @see CursorBuildSpec#getGroupingColumns() for usage. + */ + @Nullable + public List getGroupingColumns() + { + return groupingColumns; + } + + /** + * @see CursorBuildSpec#getGroupingColumns() for usage. */ public CursorBuildSpecBuilder setGroupingColumns(@Nullable List groupingColumns) { @@ -302,7 +370,17 @@ public CursorBuildSpecBuilder setGroupingColumns(@Nullable List grouping } /** - * @see CursorBuildSpec#getAggregators() + * @see CursorBuildSpec#getAggregators() for usage. + */ + @Nullable + public List getAggregators() + { + return aggregators; + } + + /** + * @see CursorBuildSpec#getAggregators() for usage. All {@link AggregatorFactory#requiredFields()} must be + * explicitly added to {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}. */ public CursorBuildSpecBuilder setAggregators(@Nullable List aggregators) { @@ -311,7 +389,16 @@ public CursorBuildSpecBuilder setAggregators(@Nullable List a } /** - * @see CursorBuildSpec#getPreferredOrdering() + * @see CursorBuildSpec#getPreferredOrdering() for usage. + */ + public List getPreferredOrdering() + { + return preferredOrdering; + } + + /** + * @see CursorBuildSpec#getPreferredOrdering() for usage. All {@link OrderBy#getColumnName()} must be explicitly + * added to {@link #virtualColumns} if virtual or, if set to a non-null value, {@link #physicalColumns}. */ public CursorBuildSpecBuilder setPreferredOrdering(List preferredOrdering) { @@ -319,6 +406,14 @@ public CursorBuildSpecBuilder setPreferredOrdering(List preferredOrderi return this; } + /** + * @see CursorBuildSpec#getQueryContext() + */ + public QueryContext getQueryContext() + { + return queryContext; + } + /** * @see CursorBuildSpec#getQueryContext() */ @@ -328,6 +423,15 @@ public CursorBuildSpecBuilder setQueryContext(QueryContext queryContext) return this; } + /** + * @see CursorBuildSpec#getQueryMetrics() + */ + @Nullable + public QueryMetrics getQueryMetrics() + { + return queryMetrics; + } + /** * @see CursorBuildSpec#getQueryMetrics() */ diff --git a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java index b2547ebe38c0..2effd1b5fa2f 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java @@ -26,6 +26,10 @@ public interface CursorFactory extends ColumnInspector { + /** + * Creates a {@link CursorHolder} for a given {@link CursorBuildSpec} which describes how the reader is going to + * scan, filter, transform, group and aggregate, and/or order the data. + */ CursorHolder makeCursorHolder(CursorBuildSpec spec); /** diff --git a/processing/src/main/java/org/apache/druid/segment/CursorHolder.java b/processing/src/main/java/org/apache/druid/segment/CursorHolder.java index 808e1eb53846..3ce8d813c6c3 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorHolder.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorHolder.java @@ -31,6 +31,22 @@ import java.util.Collections; import java.util.List; +/** + * Provides {@link Cursor} and if available, {@link VectorCursor} which readers can use to scan a set of rows defined + * originally from a {@link CursorBuildSpec} which describes how data is to be scanned, transformed, filter, grouped and + * aggregated, and/or ordered. + *

+ * If {@link #canVectorize()} then {@link #asVectorCursor()} will return a non-null value allowing for processing rows + * in batches instead of individually. + *

+ * If {@link #isPreAggregated()} is true, readers which are aggregating must call + * {@link #getAggregatorsForPreAggregated()} to get the updated set of {@link AggregatorFactory} to correctly process + * results + *

+ * {@link #getOrdering()} defines how the data is ordered in the {@link Cursor} or {@link VectorCursor}, allowing + * readers to compare to their own desired ordering and potentially skip ordering their own results if it is + * pre-ordered. + */ public interface CursorHolder extends Closeable { /** @@ -60,8 +76,9 @@ default boolean canVectorize() } /** - * Returns true if the {@link Cursor} or {@link VectorCursor} contains pre-aggregated columns for all - * {@link AggregatorFactory} specified in {@link CursorBuildSpec#getAggregators()}. + * Returns true if the {@link Cursor} or {@link VectorCursor} contains pre-aggregated columns for all grouping columns + * specified in {@link CursorBuildSpec#getGroupingColumns()} and all {@link AggregatorFactory} specified in + * {@link CursorBuildSpec#getAggregators()}. *

* If this method returns true, {@link ColumnSelectorFactory} and * {@link org.apache.druid.segment.vector.VectorColumnSelectorFactory} created from {@link Cursor} and @@ -90,7 +107,7 @@ default List getAggregatorsForPreAggregated() /** * Returns cursor ordering, which may or may not match {@link CursorBuildSpec#getPreferredOrdering()}. If returns * an empty list then the cursor has no defined ordering. - * + *

* Cursors associated with this holder return rows in this ordering, using the natural comparator for the column type. * Includes {@link ColumnHolder#TIME_COLUMN_NAME} if appropriate. */ diff --git a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java index 5a120f7ebb8b..15e0bd0c468e 100644 --- a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java @@ -45,32 +45,10 @@ public FilteredCursorFactory(CursorFactory delegate, @Nullable DimFilter filter) @Override public CursorHolder makeCursorHolder(CursorBuildSpec spec) { - final CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder = CursorBuildSpec.builder(spec); - final Filter newFilter; - final Set physicalColumns; - if (filter != null) { - if (spec.getFilter() == null) { - newFilter = filter.toFilter(); - } else { - newFilter = Filters.and(Arrays.asList(spec.getFilter(), filter.toFilter())); - } - if (spec.getPhysicalColumns() != null) { - physicalColumns = new HashSet<>(spec.getPhysicalColumns()); - for (String column : filter.getRequiredColumns()) { - if (!spec.getVirtualColumns().exists(column)) { - physicalColumns.add(column); - } - } - } else { - physicalColumns = null; - } - } else { - newFilter = spec.getFilter(); - physicalColumns = spec.getPhysicalColumns(); + if (filter == null) { + return delegate.makeCursorHolder(spec); } - buildSpecBuilder.setFilter(newFilter) - .setPhysicalColumns(physicalColumns); - return delegate.makeCursorHolder(buildSpecBuilder.build()); + return delegate.makeCursorHolder(addFilter(CursorBuildSpec.builder(spec), filter).build()); } @Override @@ -85,4 +63,34 @@ public ColumnCapabilities getColumnCapabilities(String column) { return delegate.getColumnCapabilities(column); } + + /** + * Adds a {@link Filter} from a {@link DimFilter} and its required physical columns to a + * {@link CursorBuildSpec.CursorBuildSpecBuilder}. If the {@link Filter} requires virtual columns, they must already + * be added to the {@link CursorBuildSpec.CursorBuildSpecBuilder} or they will be considered physical columns. + */ + public static CursorBuildSpec.CursorBuildSpecBuilder addFilter( + CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder, + DimFilter filter + ) + { + final Filter newFilter; + final Set physicalColumns; + if (buildSpecBuilder.getFilter() == null) { + newFilter = filter.toFilter(); + } else { + newFilter = Filters.and(Arrays.asList(buildSpecBuilder.getFilter(), filter.toFilter())); + } + if (buildSpecBuilder.getPhysicalColumns() != null) { + physicalColumns = new HashSet<>(buildSpecBuilder.getPhysicalColumns()); + for (String column : filter.getRequiredColumns()) { + if (!buildSpecBuilder.getVirtualColumns().exists(column)) { + physicalColumns.add(column); + } + } + } else { + physicalColumns = null; + } + return buildSpecBuilder.setFilter(newFilter).setPhysicalColumns(physicalColumns); + } } diff --git a/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java b/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java index 193843dbfcad..1d30d9d5ba04 100644 --- a/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java +++ b/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.query.filter.EqualityFilter; @@ -82,12 +83,17 @@ public void testVisit() public void testVisit_combineFilters() { Filter filter = new EqualityFilter("col0", ColumnType.STRING, "val0", null); - CursorBuildSpec spec = CursorBuildSpec.builder().setFilter(filter).build(); + CursorBuildSpec spec = CursorBuildSpec.builder() + .setFilter(filter). + setPhysicalColumns(filter.getRequiredColumns()) + .build(); DimFilter policyFilter = new EqualityFilter("col", ColumnType.STRING, "val", null); final RowFilterPolicy policy = RowFilterPolicy.from(policyFilter); Filter expected = new AndFilter(ImmutableList.of(policyFilter.toFilter(), filter)); - Assert.assertEquals(expected, policy.visit(spec).getFilter()); + final CursorBuildSpec transformed = policy.visit(spec); + Assert.assertEquals(expected, transformed.getFilter()); + Assert.assertEquals(ImmutableSet.of("col", "col0"), transformed.getPhysicalColumns()); } } From d5e883cb58ee5b0f3a93243e64dda71254fe7215 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 15:57:33 -0800 Subject: [PATCH 2/7] fix formatting --- .../org/apache/druid/query/policy/RowFilterPolicyTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java b/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java index 1d30d9d5ba04..265b7322092a 100644 --- a/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java +++ b/processing/src/test/java/org/apache/druid/query/policy/RowFilterPolicyTest.java @@ -84,8 +84,8 @@ public void testVisit_combineFilters() { Filter filter = new EqualityFilter("col0", ColumnType.STRING, "val0", null); CursorBuildSpec spec = CursorBuildSpec.builder() - .setFilter(filter). - setPhysicalColumns(filter.getRequiredColumns()) + .setFilter(filter) + .setPhysicalColumns(filter.getRequiredColumns()) .build(); DimFilter policyFilter = new EqualityFilter("col", ColumnType.STRING, "val", null); From b6ae92c4f7651a46e65a0df4990ffceeb3dd3230 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 16:05:41 -0800 Subject: [PATCH 3/7] move FilteredCursorFactory.addFilter to CursorBuildSpecBuilder --- .../druid/query/policy/RowFilterPolicy.java | 2 +- .../apache/druid/segment/CursorBuildSpec.java | 33 +++++++++++++++++ .../druid/segment/FilteredCursorFactory.java | 37 +------------------ 3 files changed, 35 insertions(+), 37 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java index 755b6cb4e28c..833cda9ba124 100644 --- a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java +++ b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java @@ -56,7 +56,7 @@ public DimFilter getRowFilter() @Override public CursorBuildSpec visit(CursorBuildSpec spec) { - return FilteredCursorFactory.addFilter(spec, rowFilter); + return CursorBuildSpec.builder(spec).addFilter(rowFilter.toFilter()).build(); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java index e0a16af017d3..e19f50773fd8 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java @@ -26,12 +26,15 @@ import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.filter.Filters; import org.apache.druid.segment.vector.VectorCursor; import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -294,6 +297,36 @@ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter) return this; } + /** + * @see CursorBuildSpec#getFilter() for usage. Adds a {@link Filter} to the builder, if {@link #filter} is already + * set, the existing and new filters will be combined with an {@link org.apache.druid.segment.filter.AndFilter}. + * If {@link #physicalColumns} is set, {@link Filter#getRequiredColumns()} which are not present in + * {@link #virtualColumns} will be added to the existing set of {@link #physicalColumns}. + */ + public CursorBuildSpecBuilder addFilter( + Filter filterToAdd + ) + { + final Filter newFilter; + final Set newPhysicalColumns; + if (filter == null) { + newFilter = filterToAdd; + } else { + newFilter = Filters.and(Arrays.asList(filter, filterToAdd)); + } + if (physicalColumns != null) { + newPhysicalColumns = new HashSet<>(physicalColumns); + for (String column : filterToAdd.getRequiredColumns()) { + if (!virtualColumns.exists(column)) { + physicalColumns.add(column); + } + } + } else { + newPhysicalColumns = null; + } + return setFilter(newFilter).setPhysicalColumns(newPhysicalColumns); + } + /** * @see CursorBuildSpec#getInterval() for usage. */ diff --git a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java index 15e0bd0c468e..852615060c98 100644 --- a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java @@ -20,15 +20,10 @@ package org.apache.druid.segment; import org.apache.druid.query.filter.DimFilter; -import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.RowSignature; -import org.apache.druid.segment.filter.Filters; import javax.annotation.Nullable; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; public class FilteredCursorFactory implements CursorFactory { @@ -48,7 +43,7 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec) if (filter == null) { return delegate.makeCursorHolder(spec); } - return delegate.makeCursorHolder(addFilter(CursorBuildSpec.builder(spec), filter).build()); + return delegate.makeCursorHolder(CursorBuildSpec.builder(spec).addFilter(filter.toFilter()).build()); } @Override @@ -63,34 +58,4 @@ public ColumnCapabilities getColumnCapabilities(String column) { return delegate.getColumnCapabilities(column); } - - /** - * Adds a {@link Filter} from a {@link DimFilter} and its required physical columns to a - * {@link CursorBuildSpec.CursorBuildSpecBuilder}. If the {@link Filter} requires virtual columns, they must already - * be added to the {@link CursorBuildSpec.CursorBuildSpecBuilder} or they will be considered physical columns. - */ - public static CursorBuildSpec.CursorBuildSpecBuilder addFilter( - CursorBuildSpec.CursorBuildSpecBuilder buildSpecBuilder, - DimFilter filter - ) - { - final Filter newFilter; - final Set physicalColumns; - if (buildSpecBuilder.getFilter() == null) { - newFilter = filter.toFilter(); - } else { - newFilter = Filters.and(Arrays.asList(buildSpecBuilder.getFilter(), filter.toFilter())); - } - if (buildSpecBuilder.getPhysicalColumns() != null) { - physicalColumns = new HashSet<>(buildSpecBuilder.getPhysicalColumns()); - for (String column : filter.getRequiredColumns()) { - if (!buildSpecBuilder.getVirtualColumns().exists(column)) { - physicalColumns.add(column); - } - } - } else { - physicalColumns = null; - } - return buildSpecBuilder.setFilter(newFilter).setPhysicalColumns(physicalColumns); - } } From 649f7b4c48d96ecf0b56dc38f5ff87c472549d80 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 16:07:36 -0800 Subject: [PATCH 4/7] unused import --- .../main/java/org/apache/druid/query/policy/RowFilterPolicy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java index 833cda9ba124..4d761ab1a205 100644 --- a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java +++ b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import org.apache.druid.query.filter.DimFilter; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.FilteredCursorFactory; import javax.annotation.Nonnull; import java.util.Objects; From 8100be39b946ffc57adda1f0697fdee63c5c9621 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 16:12:39 -0800 Subject: [PATCH 5/7] rename addFilter to andFilter to make role more obvious --- .../druid/query/policy/RowFilterPolicy.java | 2 +- .../apache/druid/segment/CursorBuildSpec.java | 62 ++++++++++--------- .../druid/segment/FilteredCursorFactory.java | 2 +- 3 files changed, 34 insertions(+), 32 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java index 4d761ab1a205..a83c89d4268d 100644 --- a/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java +++ b/processing/src/main/java/org/apache/druid/query/policy/RowFilterPolicy.java @@ -55,7 +55,7 @@ public DimFilter getRowFilter() @Override public CursorBuildSpec visit(CursorBuildSpec spec) { - return CursorBuildSpec.builder(spec).addFilter(rowFilter.toFilter()).build(); + return CursorBuildSpec.builder(spec).andFilter(rowFilter.toFilter()).build(); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java index e19f50773fd8..00e8a242e7e6 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java @@ -297,36 +297,6 @@ public CursorBuildSpecBuilder setFilter(@Nullable Filter filter) return this; } - /** - * @see CursorBuildSpec#getFilter() for usage. Adds a {@link Filter} to the builder, if {@link #filter} is already - * set, the existing and new filters will be combined with an {@link org.apache.druid.segment.filter.AndFilter}. - * If {@link #physicalColumns} is set, {@link Filter#getRequiredColumns()} which are not present in - * {@link #virtualColumns} will be added to the existing set of {@link #physicalColumns}. - */ - public CursorBuildSpecBuilder addFilter( - Filter filterToAdd - ) - { - final Filter newFilter; - final Set newPhysicalColumns; - if (filter == null) { - newFilter = filterToAdd; - } else { - newFilter = Filters.and(Arrays.asList(filter, filterToAdd)); - } - if (physicalColumns != null) { - newPhysicalColumns = new HashSet<>(physicalColumns); - for (String column : filterToAdd.getRequiredColumns()) { - if (!virtualColumns.exists(column)) { - physicalColumns.add(column); - } - } - } else { - newPhysicalColumns = null; - } - return setFilter(newFilter).setPhysicalColumns(newPhysicalColumns); - } - /** * @see CursorBuildSpec#getInterval() for usage. */ @@ -474,6 +444,38 @@ public CursorBuildSpecBuilder setQueryMetrics(@Nullable QueryMetrics queryMet return this; } + + + /** + * Adds a {@link Filter} to the builder, if {@link #filter} is already set, the existing and new filters will be + * combined with an {@link org.apache.druid.segment.filter.AndFilter}. If {@link #physicalColumns} is set, + * {@link Filter#getRequiredColumns()} which are not present in {@link #virtualColumns} will be added to the + * existing set of {@link #physicalColumns}. + */ + public CursorBuildSpecBuilder andFilter( + Filter filterToAdd + ) + { + final Filter newFilter; + final Set newPhysicalColumns; + if (filter == null) { + newFilter = filterToAdd; + } else { + newFilter = Filters.and(Arrays.asList(filter, filterToAdd)); + } + if (physicalColumns != null) { + newPhysicalColumns = new HashSet<>(physicalColumns); + for (String column : filterToAdd.getRequiredColumns()) { + if (!virtualColumns.exists(column)) { + physicalColumns.add(column); + } + } + } else { + newPhysicalColumns = null; + } + return setFilter(newFilter).setPhysicalColumns(newPhysicalColumns); + } + public CursorBuildSpec build() { return new CursorBuildSpec( diff --git a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java index 852615060c98..007543f76019 100644 --- a/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/FilteredCursorFactory.java @@ -43,7 +43,7 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec) if (filter == null) { return delegate.makeCursorHolder(spec); } - return delegate.makeCursorHolder(CursorBuildSpec.builder(spec).addFilter(filter.toFilter()).build()); + return delegate.makeCursorHolder(CursorBuildSpec.builder(spec).andFilter(filter.toFilter()).build()); } @Override From 535f40c2a0203e207a25f7fc1c9c5e3f1e1d3711 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 24 Jan 2025 17:34:18 -0800 Subject: [PATCH 6/7] fix mistake in moving function --- .../src/main/java/org/apache/druid/segment/CursorBuildSpec.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java index 00e8a242e7e6..3e754e232fce 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java @@ -467,7 +467,7 @@ public CursorBuildSpecBuilder andFilter( newPhysicalColumns = new HashSet<>(physicalColumns); for (String column : filterToAdd.getRequiredColumns()) { if (!virtualColumns.exists(column)) { - physicalColumns.add(column); + newPhysicalColumns.add(column); } } } else { From 71136092e0c41416283d511a371032d515dcbddf Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 27 Jan 2025 13:48:56 -0800 Subject: [PATCH 7/7] more test --- .../apache/druid/segment/CursorBuildSpec.java | 39 +++ .../druid/segment/CursorBuildSpecTest.java | 295 ++++++++++++++++++ 2 files changed, 334 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/segment/CursorBuildSpecTest.java diff --git a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java index 3e754e232fce..aca3743cf6c9 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorBuildSpec.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.OrderBy; import org.apache.druid.query.QueryContext; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Objects; import java.util.Set; /** @@ -240,6 +242,42 @@ public boolean isCompatibleOrdering(List ordering) return true; } + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) { + return false; + } + CursorBuildSpec that = (CursorBuildSpec) o; + return isAggregate == that.isAggregate && + Objects.equals(filter, that.filter) && + Objects.equals(interval, that.interval) && + Objects.equals(groupingColumns, that.groupingColumns) && + Objects.equals(virtualColumns, that.virtualColumns) && + Objects.equals(aggregators, that.aggregators) && + Objects.equals(preferredOrdering, that.preferredOrdering) && + Objects.equals(queryContext, that.queryContext) && + Objects.equals(physicalColumns, that.physicalColumns) && + Objects.equals(queryMetrics, that.queryMetrics); + } + + @Override + public int hashCode() + { + return Objects.hash( + filter, + interval, + groupingColumns, + virtualColumns, + aggregators, + preferredOrdering, + queryContext, + isAggregate, + physicalColumns, + queryMetrics + ); + } + public static class CursorBuildSpecBuilder { @Nullable @@ -456,6 +494,7 @@ public CursorBuildSpecBuilder andFilter( Filter filterToAdd ) { + DruidException.conditionalDefensive(filterToAdd != null, "filterToAdd must not be null"); final Filter newFilter; final Set newPhysicalColumns; if (filter == null) { diff --git a/processing/src/test/java/org/apache/druid/segment/CursorBuildSpecTest.java b/processing/src/test/java/org/apache/druid/segment/CursorBuildSpecTest.java new file mode 100644 index 000000000000..7e19858c5a34 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/CursorBuildSpecTest.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.error.DruidException; +import org.apache.druid.query.OrderBy; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.query.filter.EqualityFilter; +import org.apache.druid.query.filter.RangeFilter; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.filter.AndFilter; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.junit.Assert; +import org.junit.Test; + +public class CursorBuildSpecTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(CursorBuildSpec.class).usingGetClass().verify(); + } + + @Test + public void testIsCompatibleOrdering() + { + // test specified preferred ordering by the query + CursorBuildSpec spec1 = CursorBuildSpec.builder() + .setPhysicalColumns(ImmutableSet.of(ColumnHolder.TIME_COLUMN_NAME, "x", "y")) + .setPreferredOrdering( + ImmutableList.of( + OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME), + OrderBy.ascending("x") + ) + ) + .build(); + // fail if cursor isn't fully ordered by the preferred ordering of the spec + Assert.assertFalse(spec1.isCompatibleOrdering(ImmutableList.of(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME)))); + // pass if the cursor ordering exactly matches + Assert.assertTrue( + spec1.isCompatibleOrdering( + ImmutableList.of(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME), OrderBy.ascending("x")) + ) + ); + // pass if the cursor ordering includes additional ordering not specified by the spec preferred ordering + Assert.assertTrue( + spec1.isCompatibleOrdering( + ImmutableList.of( + OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME), + OrderBy.ascending("x"), + OrderBy.descending("y") + ) + ) + ); + // fail if the cursor ordering is different + Assert.assertFalse( + spec1.isCompatibleOrdering( + ImmutableList.of( + OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME), + OrderBy.descending("y"), + OrderBy.ascending("x") + ) + ) + ); + + // test no specified preferred ordering by the reader + CursorBuildSpec spec2 = CursorBuildSpec.builder() + .setPhysicalColumns(ImmutableSet.of(ColumnHolder.TIME_COLUMN_NAME, "x", "y")) + .build(); + Assert.assertTrue( + spec2.isCompatibleOrdering( + ImmutableList.of(OrderBy.ascending(ColumnHolder.TIME_COLUMN_NAME), OrderBy.ascending("x")) + ) + ); + } + + @Test + public void testBuilderAndFilterNoExistingFilterWithPhysicalColumns() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder() + .setPhysicalColumns(ImmutableSet.of("x", "y")) + .setVirtualColumns( + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "y + 2", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ); + + builder.andFilter(new EqualityFilter("z", ColumnType.STRING, "hello", null)); + + Assert.assertEquals(new EqualityFilter("z", ColumnType.STRING, "hello", null), builder.getFilter()); + Assert.assertEquals( + ImmutableSet.of("x", "y", "z"), + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterNoExistingFilterWithPhysicalColumnsNoNewReferences() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder() + .setPhysicalColumns(ImmutableSet.of("x", "y")) + .setVirtualColumns( + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "y + 2", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ); + + builder.andFilter(new EqualityFilter("x", ColumnType.STRING, "hello", null)); + + Assert.assertEquals(new EqualityFilter("x", ColumnType.STRING, "hello", null), builder.getFilter()); + Assert.assertEquals( + ImmutableSet.of("x", "y"), + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterExistingFilterWithPhysicalColumns() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder() + .setFilter(new EqualityFilter("x", ColumnType.STRING, "foo", null)) + .setPhysicalColumns(ImmutableSet.of("x", "y")) + .setVirtualColumns( + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "y + 2", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ); + + builder.andFilter(new EqualityFilter("z", ColumnType.STRING, "hello", null)); + + Assert.assertEquals( + new AndFilter( + ImmutableList.of( + new EqualityFilter("x", ColumnType.STRING, "foo", null), + new EqualityFilter("z", ColumnType.STRING, "hello", null) + ) + ), + builder.getFilter() + ); + Assert.assertEquals( + ImmutableSet.of("x", "y", "z"), + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterNoExistingFilterNoPhysicalColumns() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder(); + + builder.andFilter(new EqualityFilter("z", ColumnType.STRING, "hello", null)); + + Assert.assertEquals(new EqualityFilter("z", ColumnType.STRING, "hello", null), builder.getFilter()); + Assert.assertNull( + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterExistingFilterNoPhysicalColumns() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder().setFilter(new EqualityFilter("x", ColumnType.STRING, "foo", null)); + + builder.andFilter(new EqualityFilter("z", ColumnType.STRING, "hello", null)); + + Assert.assertEquals( + new AndFilter( + ImmutableList.of( + new EqualityFilter("x", ColumnType.STRING, "foo", null), + new EqualityFilter("z", ColumnType.STRING, "hello", null) + ) + ), + builder.getFilter() + ); + Assert.assertNull( + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterNoExistingFilterUsingVirtualColumn() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder() + .setPhysicalColumns(ImmutableSet.of("x", "y")) + .setVirtualColumns( + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "y + 2", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ); + builder.andFilter(new RangeFilter("v0", ColumnType.LONG, 1L, 3L, true, true, null)); + + Assert.assertEquals( + new RangeFilter("v0", ColumnType.LONG, 1L, 3L, true, true, null), + builder.getFilter() + ); + Assert.assertEquals( + ImmutableSet.of("x", "y"), + builder.getPhysicalColumns() + ); + } + + @Test + public void testBuilderAndFilterExistingFilterNewFilterUsingVirtualColumn() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = + CursorBuildSpec.builder() + .setFilter(new EqualityFilter("x", ColumnType.STRING, "foo", null)) + .setPhysicalColumns(ImmutableSet.of("x", "y")) + .setVirtualColumns( + VirtualColumns.create( + new ExpressionVirtualColumn( + "v0", + "y + 2", + ColumnType.LONG, + TestExprMacroTable.INSTANCE + ) + ) + ); + builder.andFilter(new RangeFilter("v0", ColumnType.LONG, 1L, 3L, true, true, null)); + + Assert.assertEquals( + new AndFilter( + ImmutableList.of( + new EqualityFilter("x", ColumnType.STRING, "foo", null), + new RangeFilter("v0", ColumnType.LONG, 1L, 3L, true, true, null) + ) + ), + builder.getFilter() + ); + Assert.assertEquals( + ImmutableSet.of("x", "y"), + builder.getPhysicalColumns() + ); + } + + @Test + public void testAndFilterNull() + { + CursorBuildSpec.CursorBuildSpecBuilder builder = CursorBuildSpec.builder(); + + Throwable t = Assert.assertThrows( + DruidException.class, + () -> builder.andFilter(null) + ); + + Assert.assertEquals("filterToAdd must not be null", t.getMessage()); + } +}