Skip to content

Commit

Permalink
remove style change in DruidQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
cecemei committed Feb 4, 2025
1 parent cbc38c8 commit c231fa4
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -1517,15 +1517,15 @@ private WindowOperatorQuery toWindowQuery()
operators = windowing.getOperators();
} else {
operators = ImmutableList.<OperatorFactory>builder()
.add(new ScanOperatorFactory(
null,
null,
null,
null,
virtualColumns,
null))
.addAll(windowing.getOperators())
.build();
.add(new ScanOperatorFactory(
null,
null,
null,
null,
virtualColumns,
null))
.addAll(windowing.getOperators())
.build();
}
// if planning in native set to null
// if planning in MSQ set to empty list
Expand All @@ -1544,6 +1544,7 @@ private WindowOperatorQuery toWindowQuery()
pushLeafOperator ? null : ImmutableList.of()
);
}

/**
* Create an OperatorQuery which runs an order on top of a scan.
*/
Expand All @@ -1555,29 +1556,35 @@ private WindowOperatorQuery toScanAndSortQuery()
|| (sorting.getProjection() != null && !sorting.getProjection().getVirtualColumns().isEmpty())) {
return null;
}

ScanQuery scan = toScanQuery(false);
if (scan == null) {
return null;
}

if (dataSource.isConcrete()) {
// Currently only non-time orderings of subqueries are allowed.
setPlanningErrorOrderByNonTimeIsUnsupported();
return null;
}

QueryDataSource newDataSource = new QueryDataSource(scan);
List<ColumnWithDirection> sortColumns = getColumnWithDirectionsFromOrderBys(sorting.getOrderBys());
RowSignature signature = getOutputRowSignature();
List<OperatorFactory> operators = new ArrayList<>();

operators.add(new NaiveSortOperatorFactory(sortColumns));


final Projection projection = sorting.getProjection();

final org.apache.druid.query.operator.OffsetLimit offsetLimit = sorting.getOffsetLimit().isNone()
? null
: sorting.getOffsetLimit().toOperatorOffsetLimit();
? null
: sorting.getOffsetLimit().toOperatorOffsetLimit();

final List<String> projectedColumns = projection == null
? null
: projection.getOutputRowSignature().getColumnNames();
? null
: projection.getOutputRowSignature().getColumnNames();

if (offsetLimit != null || projectedColumns != null) {
operators.add(
Expand All @@ -1591,6 +1598,7 @@ private WindowOperatorQuery toScanAndSortQuery()
)
);
}

return new WindowOperatorQuery(
newDataSource,
new LegacySegmentSpec(Intervals.ETERNITY),
Expand All @@ -1600,23 +1608,25 @@ private WindowOperatorQuery toScanAndSortQuery()
null
);
}

private void setPlanningErrorOrderByNonTimeIsUnsupported()
{
List<String> orderByColumnNames = sorting.getOrderBys()
.stream().map(OrderByColumnSpec::getDimension)
.collect(Collectors.toList());
.stream().map(OrderByColumnSpec::getDimension)
.collect(Collectors.toList());
plannerContext.setPlanningError(
"SQL query requires ordering a table by non-time column [%s], which is not supported.",
orderByColumnNames
);
}

private ArrayList<ColumnWithDirection> getColumnWithDirectionsFromOrderBys(List<OrderByColumnSpec> orderBys)
{
ArrayList<ColumnWithDirection> ordering = new ArrayList<>();
for (OrderByColumnSpec orderBySpec : orderBys) {
Direction direction = orderBySpec.getDirection() == OrderByColumnSpec.Direction.ASCENDING
? ColumnWithDirection.Direction.ASC
: ColumnWithDirection.Direction.DESC;
? ColumnWithDirection.Direction.ASC
: ColumnWithDirection.Direction.DESC;
ordering.add(new ColumnWithDirection(orderBySpec.getDimension(), direction));
}
return ordering;
Expand All @@ -1634,11 +1644,13 @@ private ScanQuery toScanQuery(final boolean considerSorting)
// Scan cannot GROUP BY or do windows.
return null;
}

if (outputRowSignature.size() == 0) {
// Should never do a scan query without any columns that we're interested in. This is probably a planner bug.
this.plannerContext.setPlanningError("Cannot convert to Scan query without any columns.");
return null;
}

final Pair<DataSource, Filtration> dataSourceFiltrationPair = getFiltration(
dataSource,
filter,
Expand All @@ -1647,19 +1659,25 @@ private ScanQuery toScanQuery(final boolean considerSorting)
);
final DataSource newDataSource = dataSourceFiltrationPair.lhs;
final Filtration filtration = dataSourceFiltrationPair.rhs;

final List<OrderBy> orderByColumns;
long scanOffset = 0L;
long scanLimit = 0L;

if (considerSorting && sorting != null) {
scanOffset = sorting.getOffsetLimit().getOffset();

if (sorting.getOffsetLimit().hasLimit()) {
final long limit = sorting.getOffsetLimit().getLimit();

if (limit == 0) {
// Can't handle zero limit (the Scan query engine would treat it as unlimited).
return null;
}

scanLimit = limit;
}

orderByColumns = sorting.getOrderBys().stream().map(
orderBy ->
new OrderBy(
Expand All @@ -1669,6 +1687,7 @@ private ScanQuery toScanQuery(final boolean considerSorting)
: Order.ASCENDING
)
).collect(Collectors.toList());

} else {
orderByColumns = Collections.emptyList();
}
Expand All @@ -1685,13 +1704,17 @@ private ScanQuery toScanQuery(final boolean considerSorting)
setPlanningErrorOrderByNonTimeIsUnsupported();
}
return null;

}
}

// Deduplicate column list
final Set<String> scanColumns = new LinkedHashSet<>(outputRowSignature.getColumnNames());
orderByColumns.forEach(column -> scanColumns.add(column.getColumnName()));

final VirtualColumns virtualColumns = getVirtualColumns(true);
final ImmutableList<String> scanColumnsList = ImmutableList.copyOf(scanColumns);

return new ScanQuery(
newDataSource,
filtration.getQuerySegmentSpec(),
Expand Down

0 comments on commit c231fa4

Please sign in to comment.