diff --git a/Cargo.lock b/Cargo.lock index 04265323d..c8f154346 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3093,9 +3093,11 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "datafusion", "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-datafusion", "iceberg_test_utils", "parquet", "tokio", diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..1f957b520 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -188,27 +188,7 @@ impl<'a> TableScanBuilder<'a> { /// Build the table scan. pub fn build(self) -> Result { - let snapshot = match self.snapshot_id { - Some(snapshot_id) => self - .table - .metadata() - .snapshot_by_id(snapshot_id) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Snapshot with id {} not found", snapshot_id), - ) - })? - .clone(), - None => self - .table - .metadata() - .current_snapshot() - .ok_or_else(|| { - Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") - })? - .clone(), - }; + let snapshot = self.table.snapshot(self.snapshot_id)?; let schema = snapshot.schema(self.table.metadata())?; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..c47790c4a 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -24,7 +24,7 @@ use crate::io::object_cache::ObjectCache; use crate::io::FileIO; use crate::metadata_scan::MetadataTable; use crate::scan::TableScanBuilder; -use crate::spec::{TableMetadata, TableMetadataRef}; +use crate::spec::{SnapshotRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -201,6 +201,29 @@ impl Table { TableScanBuilder::new(self) } + /// Get the specified or latest snapshot for this table + pub fn snapshot(&self, snapshot_id: Option) -> Result { + Ok(match snapshot_id { + Some(snapshot_id) => self + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", snapshot_id), + ) + })? + .clone(), + None => self + .metadata() + .current_snapshot() + .ok_or_else(|| { + Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots") + })? + .clone(), + }) + } + /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. pub fn metadata_table(self) -> MetadataTable { diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index a047d7580..172a8d3a5 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,9 +27,11 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +datafusion = "44" futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-datafusion = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/tests/datafusion.rs b/crates/integration_tests/tests/datafusion.rs new file mode 100644 index 000000000..9b1a18cee --- /dev/null +++ b/crates/integration_tests/tests/datafusion.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, ScalarValue, Statistics}; +use iceberg::{Catalog, Result, TableIdent}; +use iceberg_datafusion::compute_statistics; +use iceberg_integration_tests::set_test_fixture; + +#[tokio::test] +async fn test_statistics() -> Result<()> { + let fixture = set_test_fixture("datafusion_statistics").await; + + let catalog = fixture.rest_catalog; + + let table = catalog + .load_table( + &TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) + .unwrap(), + ) + .await + .unwrap(); + + let stats = compute_statistics(&table, None).await?; + + assert_eq!(stats, Statistics { + num_rows: Precision::Inexact(14), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))), + min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Int32(Some(12))), + min_value: Precision::Inexact(ScalarValue::Int32(Some(1))), + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Inexact(0), + max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))), + min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))), + distinct_count: Precision::Absent, + }, + ], + }); + + Ok(()) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index b7b927fdd..d8765de47 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -23,6 +23,9 @@ pub use error::*; mod physical_plan; mod schema; +mod statistics; mod table; + +pub use statistics::*; pub use table::table_provider_factory::IcebergTableProviderFactory; pub use table::*; diff --git a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs index f438308e6..92dcc3165 100644 --- a/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs +++ b/crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs @@ -20,7 +20,7 @@ use std::vec; use datafusion::logical_expr::{Expr, Operator}; use datafusion::scalar::ScalarValue; use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression}; -use iceberg::spec::Datum; +use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType}; // A datafusion expression could be an Iceberg predicate, column, or literal. enum TransformedResult { @@ -195,20 +195,44 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000; /// Convert a scalar value to an iceberg datum. fn scalar_value_to_datum(value: &ScalarValue) -> Option { match value { + ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)), ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)), ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)), ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)), - ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)), + ScalarValue::Float32(Some(v)) => Some(Datum::float(*v)), ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)), - ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())), - ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), + ScalarValue::Utf8(Some(v)) + | ScalarValue::Utf8View(Some(v)) + | ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())), ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)), ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)), _ => None, } } +/// Convert an iceberg datum to a datafusion scalar value. +pub fn datum_to_scalar_value(datum: &Datum) -> Option { + match (datum.data_type(), datum.literal()) { + (PrimitiveType::Binary, PrimitiveLiteral::Boolean(v)) => { + Some(ScalarValue::Boolean(Some(*v))) + } + (PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Int32(Some(*v))), + (PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(ScalarValue::Int64(Some(*v))), + (PrimitiveType::Float, PrimitiveLiteral::Float(v)) => { + Some(ScalarValue::Float32(Some(v.into_inner()))) + } + (PrimitiveType::Double, PrimitiveLiteral::Double(v)) => { + Some(ScalarValue::Float64(Some(v.into_inner()))) + } + (PrimitiveType::String, PrimitiveLiteral::String(v)) => { + Some(ScalarValue::Utf8View(Some(v.clone()))) + } + (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Date32(Some(*v))), + _ => None, + } +} + #[cfg(test)] mod tests { use datafusion::arrow::datatypes::{DataType, Field, Schema}; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index f33437eec..66030298b 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -22,6 +22,7 @@ use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::common::Statistics; use datafusion::error::Result as DFResult; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::EquivalenceProperties; @@ -44,6 +45,8 @@ pub(crate) struct IcebergTableScan { table: Table, /// Snapshot of the table to scan. snapshot_id: Option, + /// Statistics for the table; row count, and null count/min-max values per column. + statistics: Statistics, /// Stores certain, often expensive to compute, /// plan properties used in query optimization. plan_properties: PlanProperties, @@ -59,6 +62,7 @@ impl IcebergTableScan { table: Table, snapshot_id: Option, schema: ArrowSchemaRef, + statistics: Statistics, projection: Option<&Vec>, filters: &[Expr], ) -> Self { @@ -73,6 +77,7 @@ impl IcebergTableScan { Self { table, snapshot_id, + statistics, plan_properties, projection, predicates, @@ -135,6 +140,10 @@ impl ExecutionPlan for IcebergTableScan { stream, ))) } + + fn statistics(&self) -> DFResult { + Ok(self.statistics.clone()) + } } impl DisplayAs for IcebergTableScan { diff --git a/crates/integrations/datafusion/src/statistics.rs b/crates/integrations/datafusion/src/statistics.rs new file mode 100644 index 000000000..f5ba9bf87 --- /dev/null +++ b/crates/integrations/datafusion/src/statistics.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use datafusion::common::stats::Precision; +use datafusion::common::{ColumnStatistics, Statistics}; +use iceberg::spec::ManifestStatus; +use iceberg::table::Table; +use iceberg::Result; + +use crate::physical_plan::expr_to_predicate::datum_to_scalar_value; + +// Compute DataFusion table statistics for a given table/snapshot +pub async fn compute_statistics(table: &Table, snapshot_id: Option) -> Result { + let file_io = table.file_io(); + let metadata = table.metadata(); + let snapshot = table.snapshot(snapshot_id)?; + + let mut num_rows = 0; + let mut lower_bounds = HashMap::new(); + let mut upper_bounds = HashMap::new(); + let mut null_counts = HashMap::new(); + + let manifest_list = snapshot.load_manifest_list(file_io, metadata).await?; + + // For each existing/added manifest in the snapshot aggregate the row count, as well as null + // count and min/max values. + for manifest_file in manifest_list.entries() { + let manifest = manifest_file.load_manifest(file_io).await?; + manifest.entries().iter().for_each(|manifest_entry| { + if manifest_entry.status() != ManifestStatus::Deleted { + let data_file = manifest_entry.data_file(); + num_rows += data_file.record_count(); + data_file.lower_bounds().iter().for_each(|(col_id, min)| { + lower_bounds + .entry(*col_id) + .and_modify(|col_min| { + if min < col_min { + *col_min = min.clone() + } + }) + .or_insert(min.clone()); + }); + data_file.upper_bounds().iter().for_each(|(col_id, max)| { + upper_bounds + .entry(*col_id) + .and_modify(|col_max| { + if max > col_max { + *col_max = max.clone() + } + }) + .or_insert(max.clone()); + }); + data_file + .null_value_counts() + .iter() + .for_each(|(col_id, null_count)| { + null_counts + .entry(*col_id) + .and_modify(|col_null_count| *col_null_count += *null_count) + .or_insert(*null_count); + }); + } + }) + } + + // Construct the DataFusion `Statistics` object, leaving any missing info as `Precision::Absent` + let schema = snapshot.schema(metadata)?; + let col_stats = schema + .as_struct() + .fields() + .iter() + .map(|field| { + ColumnStatistics { + null_count: null_counts + .get(&field.id) + .map(|nc| Precision::Inexact(*nc as usize)) + .unwrap_or(Precision::Absent), + max_value: upper_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + min_value: lower_bounds + .get(&field.id) + .and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact)) + .unwrap_or(Precision::Absent), + distinct_count: Precision::Absent, // will be picked up after #417 + } + }) + .collect(); + + Ok(Statistics { + num_rows: Precision::Inexact(num_rows as usize), + total_byte_size: Precision::Absent, + column_statistics: col_stats, + }) +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 00c9e1322..025099de3 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -16,7 +16,6 @@ // under the License. pub mod table_provider_factory; - use std::any::Any; use std::sync::Arc; @@ -32,6 +31,7 @@ use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use crate::physical_plan::scan::IcebergTableScan; +use crate::{compute_statistics, to_datafusion_error}; /// Represents a [`TableProvider`] for the Iceberg [`Catalog`], /// managing access to a [`Table`]. @@ -130,10 +130,14 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { + let statistics = compute_statistics(&self.table, self.snapshot_id) + .await + .map_err(to_datafusion_error)?; Ok(Arc::new(IcebergTableScan::new( self.table.clone(), self.snapshot_id, self.schema.clone(), + statistics, projection, filters, )))