From 80b8d8c0d779924ad3972582c504e1226300a334 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 6 Jan 2025 14:33:21 +0100 Subject: [PATCH 1/7] Expose DataFusion statistics on an IcebergTableScan --- Cargo.lock | 2 + crates/iceberg/src/scan.rs | 22 +--- crates/iceberg/src/table.rs | 25 +++- crates/integration_tests/Cargo.toml | 2 + crates/integration_tests/tests/datafusion.rs | 66 +++++++++++ crates/integrations/datafusion/src/lib.rs | 3 + .../src/physical_plan/expr_to_predicate.rs | 32 ++++- .../datafusion/src/physical_plan/scan.rs | 9 ++ .../integrations/datafusion/src/statistics.rs | 112 ++++++++++++++++++ .../integrations/datafusion/src/table/mod.rs | 6 +- 10 files changed, 252 insertions(+), 27 deletions(-) create mode 100644 crates/integration_tests/tests/datafusion.rs create mode 100644 crates/integrations/datafusion/src/statistics.rs diff --git a/Cargo.lock b/Cargo.lock index 04265323d..c8f154346 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3093,9 +3093,11 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "datafusion", "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-datafusion", "iceberg_test_utils", "parquet", "tokio", diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..1f957b520 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -188,27 +188,7 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { - let snapshot = match self.snapshot_id { - Some(snapshot_id) => self - .table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Snapshot with id {} not found", snapshot_id), - ) - })? - .clone(), - None => self - .table - .metadata() - .current_snapshot() - .ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") - })? - .clone(), - }; + let snapshot = self.table.snapshot(self.snapshot_id)?; let schema = snapshot.schema(self.table.metadata())?; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..c47790c4a 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -24,7 +24,7 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; -use crate::spec::{TableMetadata, TableMetadataRef}; +use crate::spec::{SnapshotRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -201,6 +201,29 @@ impl Table { TableScanBuilder::new(self) } + /// Get the specified or latest snapshot for this table + pub fn snapshot(&self, snapshot_id: Option) -> Result { + Ok(match snapshot_id { + Some(snapshot_id) => self + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", snapshot_id), + ) + })? + .clone(), + None => self + .metadata() + .current_snapshot() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") + })? + .clone(), + }) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn metadata_table(self) -> MetadataTable { diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..172a8d3a5 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,9 +27,11 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +datafusion = "44" futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs new file mode 100644 index 000000000..9b1a18cee --- /dev/null +++ b/crates/integration_tests/tests/datafusion.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; +use iceberg::{Catalog, Result, TableIdent}; +use iceberg_datafusion::compute_statistics; +use iceberg_integration_tests::set_test_fixture; + +#[tokio::test] +async fn test_statistics() -> Result<()> { + let fixture = set_test_fixture("datafusion_statistics").await; + + let catalog = fixture.rest_catalog; + + let table = catalog + .load_table( + &TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) + .unwrap(), + ) + .await + .unwrap(); + + let stats = compute_statistics(&table, None).await?; + + assert_eq!(stats, Statistics { + num_rows: Precision::Inexact(14), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), + min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), + min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), + distinct_count: Precision::Absent, + }, + ], + }); + + Ok(()) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index b7b927fdd..d8765de47 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -23,6 +23,9 @@ pub use error::*; mod physical_plan; mod schema; +mod statistics; mod table; + +pub use statistics::*; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index f438308e6..92dcc3165 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -20,7 +20,7 @@ use std::vec; use datafusion::logical_expr::{Expr, Operator}; use datafusion::scalar::ScalarValue; use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression}; -use iceberg::spec::Datum; +use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType}; // A datafusion expression could be an Iceberg predicate, column, or literal. enum TransformedResult { @@ -195,20 +195,44 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000; /// Convert a scalar value to an iceberg datum. fn scalar_value_to_datum(value: &ScalarValue) -> Option { match value { + ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)), ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)), ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)), - ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)), + ScalarValue::Float32(Some(v)) => Some(Datum::float(*v)), ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)), - ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())), - ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), + ScalarValue::Utf8(Some(v)) + | ScalarValue::Utf8View(Some(v)) + | ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)), ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)), _ => None, } } +/// Convert an iceberg datum to a datafusion scalar value. +pub fn datum_to_scalar_value(datum: &Datum) -> Option { + match (datum.data_type(), datum.literal()) { + (PrimitiveType::Binary, PrimitiveLiteral::Boolean(v)) => { + Some(ScalarValue::Boolean(Some(*v))) + } + (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Int32(Some(*v))), + (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(ScalarValue::Int64(Some(*v))), + (PrimitiveType::Float, PrimitiveLiteral::Float(v)) => { + Some(ScalarValue::Float32(Some(v.into_inner()))) + } + (PrimitiveType::Double, PrimitiveLiteral::Double(v)) => { + Some(ScalarValue::Float64(Some(v.into_inner()))) + } + (PrimitiveType::String, PrimitiveLiteral::String(v)) => { + Some(ScalarValue::Utf8View(Some(v.clone()))) + } + (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Date32(Some(*v))), + _ => None, + } +} + #[cfg(test)] mod tests { use datafusion::arrow::datatypes::{DataType, Field, Schema}; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index f33437eec..66030298b 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -22,6 +22,7 @@ use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::common::Statistics; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; @@ -44,6 +45,8 @@ pub(crate) struct IcebergTableScan { table: Table, /// Snapshot of the table to scan. snapshot_id: Option, + /// Statistics for the table; row count, and null count/min-max values per column. + statistics: Statistics, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: PlanProperties, @@ -59,6 +62,7 @@ impl IcebergTableScan { table: Table, snapshot_id: Option, schema: ArrowSchemaRef, + statistics: Statistics, projection: Option<&Vec>, filters: &[Expr], ) -> Self { @@ -73,6 +77,7 @@ impl IcebergTableScan { Self { table, snapshot_id, + statistics, plan_properties, projection, predicates, @@ -135,6 +140,10 @@ impl ExecutionPlan for IcebergTableScan { stream, ))) } + + fn statistics(&self) -> DFResult { + Ok(self.statistics.clone()) + } } impl DisplayAs for IcebergTableScan { diff --git a/crates/integrations/datafusion/src/statistics.rs b/crates/integrations/datafusion/src/statistics.rs new file mode 100644 index 000000000..f5ba9bf87 --- /dev/null +++ b/crates/integrations/datafusion/src/statistics.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, Statistics}; +use iceberg::spec::ManifestStatus; +use iceberg::table::Table; +use iceberg::Result; + +use crate::physical_plan::expr_to_predicate::datum_to_scalar_value; + +// Compute DataFusion table statistics for a given table/snapshot +pub async fn compute_statistics(table: &Table, snapshot_id: Option) -> Result { + let file_io = table.file_io(); + let metadata = table.metadata(); + let snapshot = table.snapshot(snapshot_id)?; + + let mut num_rows = 0; + let mut lower_bounds = HashMap::new(); + let mut upper_bounds = HashMap::new(); + let mut null_counts = HashMap::new(); + + let manifest_list = snapshot.load_manifest_list(file_io, metadata).await?; + + // For each existing/added manifest in the snapshot aggregate the row count, as well as null + // count and min/max values. + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(file_io).await?; + manifest.entries().iter().for_each(|manifest_entry| { + if manifest_entry.status() != ManifestStatus::Deleted { + let data_file = manifest_entry.data_file(); + num_rows += data_file.record_count(); + data_file.lower_bounds().iter().for_each(|(col_id, min)| { + lower_bounds + .entry(*col_id) + .and_modify(|col_min| { + if min < col_min { + *col_min = min.clone() + } + }) + .or_insert(min.clone()); + }); + data_file.upper_bounds().iter().for_each(|(col_id, max)| { + upper_bounds + .entry(*col_id) + .and_modify(|col_max| { + if max > col_max { + *col_max = max.clone() + } + }) + .or_insert(max.clone()); + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, null_count)| { + null_counts + .entry(*col_id) + .and_modify(|col_null_count| *col_null_count += *null_count) + .or_insert(*null_count); + }); + } + }) + } + + // Construct the DataFusion `Statistics` object, leaving any missing info as `Precision::Absent` + let schema = snapshot.schema(metadata)?; + let col_stats = schema + .as_struct() + .fields() + .iter() + .map(|field| { + ColumnStatistics { + null_count: null_counts + .get(&field.id) + .map(|nc| Precision::Inexact(*nc as usize)) + .unwrap_or(Precision::Absent), + max_value: upper_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + min_value: lower_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + distinct_count: Precision::Absent, // will be picked up after #417 + } + }) + .collect(); + + Ok(Statistics { + num_rows: Precision::Inexact(num_rows as usize), + total_byte_size: Precision::Absent, + column_statistics: col_stats, + }) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..025099de3 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -16,7 +16,6 @@ // under the License. pub mod table_provider_factory; - use std::any::Any; use std::sync::Arc; @@ -32,6 +31,7 @@ use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; +use crate::{compute_statistics, to_datafusion_error}; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. @@ -130,10 +130,14 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + let statistics = compute_statistics(&self.table, self.snapshot_id) + .await + .map_err(to_datafusion_error)?; Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, self.schema.clone(), + statistics, projection, filters, ))) From bae11ea9d60de7721ba1ea86678de2a3913fb92a Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 6 Jan 2025 15:06:15 +0100 Subject: [PATCH 2/7] Default to unknown statistics upon encountering an error --- crates/integrations/datafusion/src/table/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 025099de3..ff33d20f5 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; use datafusion::catalog::Session; +use datafusion::common::Statistics; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; @@ -30,8 +31,8 @@ use iceberg::arrow::schema_to_arrow_schema; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use crate::compute_statistics; use crate::physical_plan::scan::IcebergTableScan; -use crate::{compute_statistics, to_datafusion_error}; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. @@ -132,7 +133,7 @@ impl TableProvider for IcebergTableProvider { ) -> DFResult> { let statistics = compute_statistics(&self.table, self.snapshot_id) .await - .map_err(to_datafusion_error)?; + .unwrap_or(Statistics::new_unknown(self.schema.as_ref())); Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, From 17383089d8d9aa39dae569ed3d453ac61c4b5f9b Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 8 Jan 2025 21:27:55 +0100 Subject: [PATCH 3/7] SKip positional/equality delete files when computing stats --- crates/integration_tests/tests/datafusion.rs | 2 +- .../integrations/datafusion/src/statistics.rs | 61 ++++++++++--------- 2 files changed, 33 insertions(+), 30 deletions(-) diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 9b1a18cee..20ee45a5f 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -38,7 +38,7 @@ async fn test_statistics() -> Result<()> { let stats = compute_statistics(&table, None).await?; assert_eq!(stats, Statistics { - num_rows: Precision::Inexact(14), + num_rows: Precision::Inexact(12), total_byte_size: Precision::Absent, column_statistics: vec![ ColumnStatistics { diff --git a/crates/integrations/datafusion/src/statistics.rs b/crates/integrations/datafusion/src/statistics.rs index f5ba9bf87..c54dcdfe9 100644 --- a/crates/integrations/datafusion/src/statistics.rs +++ b/crates/integrations/datafusion/src/statistics.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use datafusion::common::stats::Precision; use datafusion::common::{ColumnStatistics, Statistics}; -use iceberg::spec::ManifestStatus; +use iceberg::spec::{DataContentType, ManifestStatus}; use iceberg::table::Table; use iceberg::Result; @@ -43,38 +43,41 @@ pub async fn compute_statistics(table: &Table, snapshot_id: Option) -> Resu for manifest_file in manifest_list.entries() { let manifest = manifest_file.load_manifest(file_io).await?; manifest.entries().iter().for_each(|manifest_entry| { + // Gather stats only for non-deleted data files if manifest_entry.status() != ManifestStatus::Deleted { let data_file = manifest_entry.data_file(); - num_rows += data_file.record_count(); - data_file.lower_bounds().iter().for_each(|(col_id, min)| { - lower_bounds - .entry(*col_id) - .and_modify(|col_min| { - if min < col_min { - *col_min = min.clone() - } - }) - .or_insert(min.clone()); - }); - data_file.upper_bounds().iter().for_each(|(col_id, max)| { - upper_bounds - .entry(*col_id) - .and_modify(|col_max| { - if max > col_max { - *col_max = max.clone() - } - }) - .or_insert(max.clone()); - }); - data_file - .null_value_counts() - .iter() - .for_each(|(col_id, null_count)| { - null_counts + if data_file.content_type() == DataContentType::Data { + num_rows += data_file.record_count(); + data_file.lower_bounds().iter().for_each(|(col_id, min)| { + lower_bounds .entry(*col_id) - .and_modify(|col_null_count| *col_null_count += *null_count) - .or_insert(*null_count); + .and_modify(|col_min| { + if min < col_min { + *col_min = min.clone() + } + }) + .or_insert(min.clone()); }); + data_file.upper_bounds().iter().for_each(|(col_id, max)| { + upper_bounds + .entry(*col_id) + .and_modify(|col_max| { + if max > col_max { + *col_max = max.clone() + } + }) + .or_insert(max.clone()); + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, null_count)| { + null_counts + .entry(*col_id) + .and_modify(|col_null_count| *col_null_count += *null_count) + .or_insert(*null_count); + }); + } } }) } From 98bdd8a309861dc0c27000d0c24949d8d4b3ec0f Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Wed, 8 Jan 2025 21:51:17 +0100 Subject: [PATCH 4/7] Move statistics computation to IcebergTableProvider constructor methods --- crates/integration_tests/tests/datafusion.rs | 58 ++++++++++--------- .../datafusion/src/physical_plan/scan.rs | 3 +- .../integrations/datafusion/src/table/mod.rs | 24 ++++++-- .../src/table/table_provider_factory.rs | 4 +- 4 files changed, 56 insertions(+), 33 deletions(-) diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 20ee45a5f..653497980 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. +use datafusion::catalog::TableProvider; use datafusion::common::stats::Precision; use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; use iceberg::{Catalog, Result, TableIdent}; -use iceberg_datafusion::compute_statistics; +use iceberg_datafusion::IcebergTableProvider; use iceberg_integration_tests::set_test_fixture; #[tokio::test] @@ -35,32 +36,37 @@ async fn test_statistics() -> Result<()> { .await .unwrap(); - let stats = compute_statistics(&table, None).await?; + let stats = IcebergTableProvider::try_new_from_table(table) + .await? + .statistics(); - assert_eq!(stats, Statistics { - num_rows: Precision::Inexact(12), - total_byte_size: Precision::Absent, - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Inexact(0), - max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), - min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Inexact(0), - max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), - min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), - distinct_count: Precision::Absent, - }, - ColumnStatistics { - null_count: Precision::Inexact(0), - max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), - min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), - distinct_count: Precision::Absent, - }, - ], - }); + assert_eq!( + stats, + Some(Statistics { + num_rows: Precision::Inexact(12), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), + min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), + min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), + distinct_count: Precision::Absent, + }, + ], + }) + ); Ok(()) } diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 66030298b..e4bfd42e3 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -45,7 +45,8 @@ pub(crate) struct IcebergTableScan { table: Table, /// Snapshot of the table to scan. snapshot_id: Option, - /// Statistics for the table; row count, and null count/min-max values per column. + /// Statistics for the scan; row count and null count/min-max values per column. + /// If not present defaults to empty (absent) statistics. statistics: Statistics, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index ff33d20f5..6b9850e5f 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -42,16 +42,21 @@ pub struct IcebergTableProvider { table: Table, /// Table snapshot id that will be queried via this provider. snapshot_id: Option, + /// Statistics for the table; row count and null count/min-max values per column. + /// If not present defaults to `None`. + statistics: Option, /// A reference-counted arrow `Schema`. schema: ArrowSchemaRef, } impl IcebergTableProvider { - pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { + pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self { + let statistics = compute_statistics(&table, None).await.ok(); IcebergTableProvider { table, snapshot_id: None, schema, + statistics, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -67,10 +72,12 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, + statistics, }) } @@ -78,10 +85,12 @@ impl IcebergTableProvider { /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, + statistics, }) } @@ -102,10 +111,12 @@ impl IcebergTableProvider { })?; let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); + let statistics = compute_statistics(&table, Some(snapshot_id)).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, + statistics, }) } } @@ -131,19 +142,22 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let statistics = compute_statistics(&self.table, self.snapshot_id) - .await - .unwrap_or(Statistics::new_unknown(self.schema.as_ref())); Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, self.schema.clone(), - statistics, + self.statistics + .clone() + .unwrap_or(Statistics::new_unknown(self.schema.as_ref())), projection, filters, ))) } + fn statistics(&self) -> Option { + self.statistics.clone() + } + fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index 15a3fef68..c2aea23f0 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -129,7 +129,9 @@ impl TableProviderFactory for IcebergTableProviderFactory { let schema = schema_to_arrow_schema(table.metadata().current_schema()) .map_err(to_datafusion_error)?; - Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema)))) + Ok(Arc::new( + IcebergTableProvider::new(table, Arc::new(schema)).await, + )) } } From 979d07a95c3a63d5711ac28bfdbc7687b7e58384 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Thu, 9 Jan 2025 08:51:54 +0100 Subject: [PATCH 5/7] Make statistics computation opt-in for IcebergTableProvider --- Cargo.lock | 1 + crates/integration_tests/tests/datafusion.rs | 2 ++ crates/integrations/datafusion/Cargo.toml | 1 + .../integrations/datafusion/src/table/mod.rs | 24 ++++++++++++------- .../src/table/table_provider_factory.rs | 4 +--- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8f154346..3b07ec8a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3074,6 +3074,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-memory", + "log", "tempfile", "tokio", ] diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 653497980..1d4e8275d 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -38,6 +38,8 @@ async fn test_statistics() -> Result<()> { let stats = IcebergTableProvider::try_new_from_table(table) .await? + .with_computed_statistics() + .await .statistics(); assert_eq!( diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml index 81a94d839..496089f12 100644 --- a/crates/integrations/datafusion/Cargo.toml +++ b/crates/integrations/datafusion/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } datafusion = { version = "44" } futures = { workspace = true } iceberg = { workspace = true } +log = { workspace = true } tokio = { workspace = true } [dev-dependencies] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 6b9850e5f..7e801a48a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -50,13 +50,12 @@ pub struct IcebergTableProvider { } impl IcebergTableProvider { - pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self { - let statistics = compute_statistics(&table, None).await.ok(); + pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self { IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, } } /// Asynchronously tries to construct a new [`IcebergTableProvider`] @@ -72,12 +71,11 @@ impl IcebergTableProvider { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, }) } @@ -85,12 +83,11 @@ impl IcebergTableProvider { /// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation. pub async fn try_new_from_table(table: Table) -> Result { let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - let statistics = compute_statistics(&table, None).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: None, schema, - statistics, + statistics: None, }) } @@ -111,14 +108,23 @@ impl IcebergTableProvider { })?; let schema = snapshot.schema(table.metadata())?; let schema = Arc::new(schema_to_arrow_schema(&schema)?); - let statistics = compute_statistics(&table, Some(snapshot_id)).await.ok(); Ok(IcebergTableProvider { table, snapshot_id: Some(snapshot_id), schema, - statistics, + statistics: None, }) } + + // Try to compute the underlying table statistics directly from the manifest/data files + pub async fn with_computed_statistics(mut self) -> Self { + let statistics = compute_statistics(&self.table, self.snapshot_id) + .await + .inspect_err(|err| log::warn!("Failed computing table statistics: {err}")) + .ok(); + self.statistics = statistics; + self + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/table/table_provider_factory.rs b/crates/integrations/datafusion/src/table/table_provider_factory.rs index c2aea23f0..15a3fef68 100644 --- a/crates/integrations/datafusion/src/table/table_provider_factory.rs +++ b/crates/integrations/datafusion/src/table/table_provider_factory.rs @@ -129,9 +129,7 @@ impl TableProviderFactory for IcebergTableProviderFactory { let schema = schema_to_arrow_schema(table.metadata().current_schema()) .map_err(to_datafusion_error)?; - Ok(Arc::new( - IcebergTableProvider::new(table, Arc::new(schema)).await, - )) + Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema)))) } } From 89a1dfa97ef8fc569eb0e4dabcb54400d4b28f57 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 27 Jan 2025 15:02:50 +0100 Subject: [PATCH 6/7] Apply bounds to the provided input statistics when a filter is present in the plan --- crates/integration_tests/tests/datafusion.rs | 58 ++++++++++++------ .../datafusion/src/physical_plan/scan.rs | 47 +++++++++++++-- .../integrations/datafusion/src/statistics.rs | 59 +++++++++++++++++++ .../integrations/datafusion/src/table/mod.rs | 4 +- 4 files changed, 144 insertions(+), 24 deletions(-) diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 9eeae5e6b..93fb1f679 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -24,6 +24,7 @@ use datafusion::assert_batches_eq; use datafusion::catalog::TableProvider; use datafusion::common::stats::Precision; use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion::logical_expr::{col, lit}; use datafusion::prelude::SessionContext; use iceberg::{Catalog, Result, TableIdent}; use iceberg_datafusion::IcebergTableProvider; @@ -38,16 +39,11 @@ async fn test_basic_queries() -> Result<()> { let table = catalog .load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap()) - .await - .unwrap(); + .await?; let ctx = SessionContext::new(); - let table_provider = Arc::new( - IcebergTableProvider::try_new_from_table(table) - .await - .unwrap(), - ); + let table_provider = Arc::new(IcebergTableProvider::try_new_from_table(table).await?); let schema = table_provider.schema(); @@ -146,22 +142,23 @@ async fn test_statistics() -> Result<()> { let catalog = fixture.rest_catalog; + // Test table statistics let table = catalog - .load_table( - &TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) - .unwrap(), - ) - .await - .unwrap(); + .load_table(&TableIdent::from_strs([ + "default", + "test_positional_merge_on_read_double_deletes", + ])?) + .await?; - let stats = IcebergTableProvider::try_new_from_table(table) + let table_provider = IcebergTableProvider::try_new_from_table(table) .await? .with_computed_statistics() - .await - .statistics(); + .await; + + let table_stats = table_provider.statistics(); assert_eq!( - stats, + table_stats, Some(Statistics { num_rows: Precision::Inexact(12), total_byte_size: Precision::Absent, @@ -188,5 +185,32 @@ async fn test_statistics() -> Result<()> { }) ); + // Test plan statistics with filtering + let ctx = SessionContext::new(); + let scan = table_provider + .scan( + &ctx.state(), + Some(&vec![1]), + &[col("number").gt(lit(4))], + None, + ) + .await + .unwrap(); + + let plan_stats = scan.statistics().unwrap(); + + // The estimate for the number of rows and the min value for the column are changed in response + // to the filtration + assert_eq!(plan_stats, Statistics { + num_rows: Precision::Inexact(8), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(5))), + distinct_count: Precision::Absent, + },], + }); + Ok(()) } diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index e4bfd42e3..6a88c8b62 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -22,10 +22,11 @@ use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::common::Statistics; +use datafusion::common::{Statistics, ToDFSchema}; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::EquivalenceProperties; +use datafusion::logical_expr::utils::conjunction; +use datafusion::physical_expr::{create_physical_expr, EquivalenceProperties}; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; @@ -33,9 +34,10 @@ use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; use iceberg::expr::Predicate; use iceberg::table::Table; +use log::warn; use super::expr_to_predicate::convert_filters_to_predicate; -use crate::to_datafusion_error; +use crate::{apply_bounds, to_datafusion_error}; /// Manages the scanning process of an Iceberg [`Table`], encapsulating the /// necessary details and computed properties required for execution planning. @@ -63,7 +65,7 @@ impl IcebergTableScan { table: Table, snapshot_id: Option, schema: ArrowSchemaRef, - statistics: Statistics, + statistics: Option, projection: Option<&Vec>, filters: &[Expr], ) -> Self { @@ -71,6 +73,26 @@ impl IcebergTableScan { None => schema.clone(), Some(projection) => Arc::new(schema.project(projection).unwrap()), }; + + let statistics = statistics + .map(|stats| { + let stats = match projection { + None => stats, + Some(projection) => stats.project(Some(projection)), + }; + Self::bound_statistics(stats.clone(), filters, output_schema.clone()) + }) + .transpose() + .inspect_err(|err| { + warn!( + "Failed to bound input statistics, defaulting to none: {:?}", + err + ) + }) + .ok() + .flatten() + .unwrap_or(Statistics::new_unknown(output_schema.as_ref())); + let plan_properties = Self::compute_properties(output_schema.clone()); let projection = get_column_names(schema.clone(), projection); let predicates = convert_filters_to_predicate(filters); @@ -97,6 +119,23 @@ impl IcebergTableScan { Boundedness::Bounded, ) } + + /// Estimate the effective bounded statistics corresponding to the provided filter expressions + fn bound_statistics( + input_stats: Statistics, + filters: &[Expr], + schema: ArrowSchemaRef, + ) -> DFResult { + Ok(if let Some(filters) = conjunction(filters.to_vec()) { + let schema = schema.clone(); + let df_schema = schema.clone().to_dfschema()?; + let predicate = create_physical_expr(&filters, &df_schema, &Default::default())?; + + apply_bounds(input_stats, &predicate, schema)? + } else { + input_stats + }) + } } impl ExecutionPlan for IcebergTableScan { diff --git a/crates/integrations/datafusion/src/statistics.rs b/crates/integrations/datafusion/src/statistics.rs index c54dcdfe9..0a7db6ff4 100644 --- a/crates/integrations/datafusion/src/statistics.rs +++ b/crates/integrations/datafusion/src/statistics.rs @@ -16,9 +16,13 @@ // under the License. use std::collections::HashMap; +use std::sync::Arc; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::stats::Precision; use datafusion::common::{ColumnStatistics, Statistics}; +use datafusion::error::Result as DFResult; +use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries, PhysicalExpr}; use iceberg::spec::{DataContentType, ManifestStatus}; use iceberg::table::Table; use iceberg::Result; @@ -113,3 +117,58 @@ pub async fn compute_statistics(table: &Table, snapshot_id: Option) -> Resu column_statistics: col_stats, }) } + +// Apply bounds to the provided input statistics. +// +// Adapted from `FilterExec::statistics_helper` in DataFusion. +pub fn apply_bounds( + input_stats: Statistics, + predicate: &Arc, + schema: SchemaRef, +) -> DFResult { + let num_rows = input_stats.num_rows; + let total_byte_size = input_stats.total_byte_size; + let input_analysis_ctx = + AnalysisContext::try_from_statistics(&schema, &input_stats.column_statistics)?; + + let analysis_ctx = analyze(predicate, input_analysis_ctx, &schema)?; + + // Estimate (inexact) selectivity of predicate + let selectivity = analysis_ctx.selectivity.unwrap_or(1.0); + let num_rows = num_rows.with_estimated_selectivity(selectivity); + let total_byte_size = total_byte_size.with_estimated_selectivity(selectivity); + + let column_statistics = analysis_ctx + .boundaries + .into_iter() + .enumerate() + .map( + |( + idx, + ExprBoundaries { + interval, + distinct_count, + .. + }, + )| { + let (lower, upper) = interval.into_bounds(); + let (min_value, max_value) = if lower.eq(&upper) { + (Precision::Exact(lower), Precision::Exact(upper)) + } else { + (Precision::Inexact(lower), Precision::Inexact(upper)) + }; + ColumnStatistics { + null_count: input_stats.column_statistics[idx].null_count.to_inexact(), + max_value, + min_value, + distinct_count: distinct_count.to_inexact(), + } + }, + ) + .collect(); + Ok(Statistics { + num_rows, + total_byte_size, + column_statistics, + }) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 7e801a48a..8d7336ccf 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -152,9 +152,7 @@ impl TableProvider for IcebergTableProvider { self.table.clone(), self.snapshot_id, self.schema.clone(), - self.statistics - .clone() - .unwrap_or(Statistics::new_unknown(self.schema.as_ref())), + self.statistics.clone(), projection, filters, ))) From ad8c213998063f38bd9eae2bb544229c8b528602 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Tue, 28 Jan 2025 13:13:16 +0100 Subject: [PATCH 7/7] Make datafusion integration tests run serially This way we can avoid container port races. --- Cargo.lock | 41 ++++++++++++++++++++ crates/integration_tests/Cargo.toml | 1 + crates/integration_tests/tests/datafusion.rs | 3 ++ 3 files changed, 45 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 13e9e388a..480e5483c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3113,6 +3113,7 @@ dependencies = [ "iceberg-datafusion", "iceberg_test_utils", "parquet", + "serial_test", "tokio", "uuid", ] @@ -5074,6 +5075,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scc" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28e1c91382686d21b5ac7959341fcb9780fa7c03773646995a87c950fa7be640" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.27" @@ -5116,6 +5126,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sdd" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478f121bb72bbf63c52c93011ea1791dca40140dfe13f8336c4c5ac952c33aa9" + [[package]] name = "seahash" version = "4.1.0" @@ -5251,6 +5267,31 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "serial_test" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "sha1" version = "0.10.6" diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 1d1bfd9c2..80979a0e8 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -34,5 +34,6 @@ iceberg-catalog-rest = { workspace = true } iceberg-datafusion = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } +serial_test = "*" tokio = { workspace = true } uuid = { workspace = true } diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs index 93fb1f679..08c1dd279 100644 --- a/crates/integration_tests/tests/datafusion.rs +++ b/crates/integration_tests/tests/datafusion.rs @@ -30,8 +30,10 @@ use iceberg::{Catalog, Result, TableIdent}; use iceberg_datafusion::IcebergTableProvider; use iceberg_integration_tests::set_test_fixture; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use serial_test::serial; #[tokio::test] +#[serial] async fn test_basic_queries() -> Result<()> { let fixture = set_test_fixture("datafusion_basic_read").await; @@ -137,6 +139,7 @@ async fn test_basic_queries() -> Result<()> { } #[tokio::test] +#[serial] async fn test_statistics() -> Result<()> { let fixture = set_test_fixture("datafusion_statistics").await;