Skip to content

Commit 911e625

Browse files
JanKaulJan Kaul
authored andcommitted
Merge pull request JanKaul#218 from splitgraph/equality-delete-scan-fix
fix: skip adding another plan when no data file groups left during table scan
2 parents 66d0a4f + 0e05939 commit 911e625

File tree

2 files changed

+46
-32
lines changed

2 files changed

+46
-32
lines changed

datafusion_iceberg/src/table.rs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
statistics::manifest_statistics,
2525
};
2626
use datafusion::common::NullEquality;
27+
use datafusion::physical_plan::empty::EmptyExec;
2728
use datafusion::{
2829
arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaBuilder, SchemaRef},
2930
catalog::Session,
@@ -53,7 +54,6 @@ use datafusion::{
5354
scalar::ScalarValue,
5455
sql::parser::DFParserBuilder,
5556
};
56-
5757
use iceberg_rust::spec::{schema::Schema, view_metadata::ViewRepresentation};
5858
use iceberg_rust::{
5959
arrow::write::write_parquet_partitioned, catalog::tabular::Tabular, error::Error,
@@ -830,7 +830,7 @@ async fn table_scan(
830830
.await?;
831831

832832
// Create plan for partitions without delete files
833-
let file_groups = data_file_groups
833+
let file_groups: Vec<_> = data_file_groups
834834
.into_values()
835835
.map(|x| {
836836
x.into_iter()
@@ -853,24 +853,34 @@ async fn table_scan(
853853
.collect()
854854
})
855855
.collect();
856-
let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
857-
.with_file_groups(file_groups)
858-
.with_statistics(statistics)
859-
.with_projection(projection)
860-
.with_limit(limit)
861-
.with_table_partition_cols(table_partition_cols)
862-
.build();
863-
864-
let other_plan = ParquetFormat::default()
865-
.create_physical_plan(session, file_scan_config)
866-
.await?;
867856

868-
if plans.is_empty() {
869-
Ok(other_plan)
870-
} else {
857+
if !file_groups.is_empty() {
858+
let file_scan_config =
859+
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
860+
.with_file_groups(file_groups)
861+
.with_statistics(statistics)
862+
.with_projection(projection.clone())
863+
.with_limit(limit)
864+
.with_table_partition_cols(table_partition_cols)
865+
.build();
866+
867+
let other_plan = ParquetFormat::default()
868+
.create_physical_plan(session, file_scan_config)
869+
.await?;
870+
871871
plans.push(other_plan);
872+
}
872873

873-
Ok(Arc::new(UnionExec::new(plans)))
874+
match plans.len() {
875+
0 => {
876+
let projected_schema = projection
877+
.map(|p| arrow_schema.project(&p))
878+
.transpose()?
879+
.unwrap_or(arrow_schema.as_ref().clone());
880+
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
881+
}
882+
1 => Ok(plans.remove(0)),
883+
_ => Ok(Arc::new(UnionExec::new(plans))),
874884
}
875885
}
876886

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,21 +108,24 @@ pub async fn test_equality_delete() {
108108
.expect("Failed to insert values into table");
109109

110110
let batches = ctx
111-
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id")
111+
.sql("select * from warehouse.test.orders order by id")
112112
.await
113113
.expect("Failed to create plan for select")
114114
.collect()
115115
.await
116116
.expect("Failed to execute select query");
117117

118118
let expected = [
119-
"+------------+-----------------------------------+",
120-
"| product_id | sum(warehouse.test.orders.amount) |",
121-
"+------------+-----------------------------------+",
122-
"| 1 | 7 |",
123-
"| 2 | 1 |",
124-
"| 3 | 3 |",
125-
"+------------+-----------------------------------+",
119+
"+----+-------------+------------+------------+--------+",
120+
"| id | customer_id | product_id | date | amount |",
121+
"+----+-------------+------------+------------+--------+",
122+
"| 1 | 1 | 1 | 2020-01-01 | 1 |",
123+
"| 2 | 2 | 1 | 2020-01-01 | 1 |",
124+
"| 3 | 3 | 1 | 2020-01-01 | 3 |",
125+
"| 4 | 1 | 2 | 2020-02-02 | 1 |",
126+
"| 5 | 1 | 1 | 2020-02-02 | 2 |",
127+
"| 6 | 3 | 3 | 2020-02-02 | 3 |",
128+
"+----+-------------+------------+------------+--------+",
126129
];
127130
assert_batches_eq!(expected, &batches);
128131

@@ -174,20 +177,21 @@ pub async fn test_equality_delete() {
174177
.unwrap();
175178

176179
let batches = ctx
177-
.sql("select product_id, sum(amount) from warehouse.test.orders group by product_id order by product_id")
180+
.sql("select * from warehouse.test.orders order by id")
178181
.await
179182
.expect("Failed to create plan for select")
180183
.collect()
181184
.await
182185
.expect("Failed to execute select query");
183186

184187
let expected = [
185-
"+------------+-----------------------------------+",
186-
"| product_id | sum(warehouse.test.orders.amount) |",
187-
"+------------+-----------------------------------+",
188-
"| 1 | 4 |",
189-
"| 3 | 3 |",
190-
"+------------+-----------------------------------+",
188+
"+----+-------------+------------+------------+--------+",
189+
"| id | customer_id | product_id | date | amount |",
190+
"+----+-------------+------------+------------+--------+",
191+
"| 2 | 2 | 1 | 2020-01-01 | 1 |",
192+
"| 3 | 3 | 1 | 2020-01-01 | 3 |",
193+
"| 6 | 3 | 3 | 2020-02-02 | 3 |",
194+
"+----+-------------+------------+------------+--------+",
191195
];
192196
assert_batches_eq!(expected, &batches);
193197
}

0 commit comments

Comments
 (0)