Skip to content

Commit

Permalink
Fix Duplicated filters within (filter(TableScan)) plan for unparser (a…
Browse files Browse the repository at this point in the history
…pache#13422)

* Eliminate duplicated filter within (filter(TableScan)) plan (apache#51)

* Eliminate duplicated filter within (filter(TableScan)) plan

* Updates

* fix

* add test

* fix

* Preserve the filter order when eliminating duplicated filter apache#56

* Use IndexSet instead of Vec
  • Loading branch information
Sevenannn authored Nov 27, 2024
1 parent a00e394 commit 6a4bf0f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
14 changes: 11 additions & 3 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion_expr::{
expr, utils::grouping_set_to_exprlist, Aggregate, Expr, LogicalPlan,
LogicalPlanBuilder, Projection, SortExpr, Unnest, Window,
};
use indexmap::IndexSet;
use sqlparser::ast;

use super::{
Expand Down Expand Up @@ -310,7 +311,7 @@ pub(crate) fn unproject_sort_expr(
pub(crate) fn try_transform_to_simple_table_scan_with_filters(
plan: &LogicalPlan,
) -> Result<Option<(LogicalPlan, Vec<Expr>)>> {
let mut filters: Vec<Expr> = vec![];
let mut filters: IndexSet<Expr> = IndexSet::new();
let mut plan_stack = vec![plan];
let mut table_alias = None;

Expand All @@ -321,7 +322,9 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
plan_stack.push(alias.input.as_ref());
}
LogicalPlan::Filter(filter) => {
filters.push(filter.predicate.clone());
if !filters.contains(&filter.predicate) {
filters.insert(filter.predicate.clone());
}
plan_stack.push(filter.input.as_ref());
}
LogicalPlan::TableScan(table_scan) => {
Expand All @@ -347,7 +350,11 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
})
.collect::<Result<Vec<_>, DataFusionError>>()?;

filters.extend(table_scan_filters);
for table_scan_filter in table_scan_filters {
if !filters.contains(&table_scan_filter) {
filters.insert(table_scan_filter);
}
}

let mut builder = LogicalPlanBuilder::scan(
table_scan.table_name.clone(),
Expand All @@ -360,6 +367,7 @@ pub(crate) fn try_transform_to_simple_table_scan_with_filters(
}

let plan = builder.build()?;
let filters = filters.into_iter().collect();

return Ok(Some((plan, filters)));
}
Expand Down
30 changes: 30 additions & 0 deletions datafusion/sql/tests/cases/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,36 @@ fn test_join_with_table_scan_filters() -> Result<()> {

assert_eq!(sql.to_string(), expected_sql);

let right_plan_with_filter_schema = table_scan_with_filters(
Some("right_table"),
&schema_right,
None,
vec![
col("right_table.age").gt(lit(10)),
col("right_table.age").lt(lit(11)),
],
)?
.build()?;
let right_plan_with_duplicated_filter =
LogicalPlanBuilder::from(right_plan_with_filter_schema.clone())
.filter(col("right_table.age").gt(lit(10)))?
.build()?;

let join_plan_duplicated_filter = LogicalPlanBuilder::from(left_plan)
.join(
right_plan_with_duplicated_filter,
datafusion_expr::JoinType::Inner,
(vec!["left.id"], vec!["right_table.id"]),
Some(col("left.id").gt(lit(5))),
)?
.build()?;

let sql = plan_to_sql(&join_plan_duplicated_filter)?;

let expected_sql = r#"SELECT * FROM left_table AS "left" JOIN right_table ON "left".id = right_table.id AND (("left".id > 5) AND (("left"."name" LIKE 'some_name' AND (right_table.age > 10)) AND (right_table.age < 11)))"#;

assert_eq!(sql.to_string(), expected_sql);

Ok(())
}

Expand Down

0 comments on commit 6a4bf0f

Please sign in to comment.