Skip to content

Commit

Permalink
Fix: limit is missing after removing SPM (#14569)
Browse files Browse the repository at this point in the history
* Fix: limit is missing after removing SPM

* fix test

* use limitexec
  • Loading branch information
xudong963 authored Feb 10, 2025
1 parent 17396a5 commit dee0dc7
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 4 deletions.
28 changes: 26 additions & 2 deletions datafusion/core/tests/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::physical_optimizer::test_utils::{
create_test_schema3, create_test_schema4, filter_exec, global_limit_exec,
hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec,
repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec,
sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec,
RequirementsTestExec,
sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch,
spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec,
};

use datafusion_physical_plan::displayable;
Expand Down Expand Up @@ -1943,6 +1943,30 @@ async fn test_remove_unnecessary_spm1() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_spm2() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = sort_preserving_merge_exec_with_fetch(
vec![sort_expr("non_nullable_col", &schema)],
source,
100,
);

let expected_input = [
"SortPreservingMergeExec: [non_nullable_col@1 ASC], fetch=100",
" DataSourceExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=100",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" DataSourceExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, input, true);

Ok(())
}

#[tokio::test]
async fn test_change_wrong_sorting() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,15 @@ pub fn sort_preserving_merge_exec(
Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
}

pub fn sort_preserving_merge_exec_with_fetch(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
fetch: usize,
) -> Arc<dyn ExecutionPlan> {
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortPreservingMergeExec::new(sort_exprs, input).with_fetch(Some(fetch)))
}

pub fn union_exec(input: Vec<Arc<dyn ExecutionPlan>>) -> Arc<dyn ExecutionPlan> {
Arc::new(UnionExec::new(input))
}
Expand Down
9 changes: 7 additions & 2 deletions datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,13 @@ pub fn ensure_sorting(
&& child_node.plan.output_partitioning().partition_count() <= 1
{
// This `SortPreservingMergeExec` is unnecessary, input already has a
// single partition.
let child_node = requirements.children.swap_remove(0);
// single partition and no fetch is required.
let mut child_node = requirements.children.swap_remove(0);
if let Some(fetch) = plan.fetch() {
// Add the limit exec if the spm has a fetch
child_node.plan =
Arc::new(LocalLimitExec::new(Arc::clone(&child_node.plan), fetch));
}
return Ok(Transformed::yes(child_node));
}

Expand Down

0 comments on commit dee0dc7

Please sign in to comment.