Skip to content

Commit

Permalink
Expose DataFusion statistics on an IcebergTableScan
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jan 6, 2025
1 parent 4603b64 commit 80b8d8c
Show file tree
Hide file tree
Showing 10 changed files with 252 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 1 addition & 21 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,27 +188,7 @@ impl<'a> TableScanBuilder<'a> {

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => self
.table
.metadata()
.current_snapshot()
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots")
})?
.clone(),
};
let snapshot = self.table.snapshot(self.snapshot_id)?;

let schema = snapshot.schema(self.table.metadata())?;

Expand Down
25 changes: 24 additions & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::metadata_scan::MetadataTable;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::spec::{SnapshotRef, TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};

/// Builder to create table scan.
Expand Down Expand Up @@ -201,6 +201,29 @@ impl Table {
TableScanBuilder::new(self)
}

/// Get the specified or latest snapshot for this table
pub fn snapshot(&self, snapshot_id: Option<i64>) -> Result<SnapshotRef> {
Ok(match snapshot_id {
Some(snapshot_id) => self
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => self
.metadata()
.current_snapshot()
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots")
})?
.clone(),
})
}

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn metadata_table(self) -> MetadataTable {
Expand Down
2 changes: 2 additions & 0 deletions crates/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ rust-version = { workspace = true }
[dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
datafusion = "44"
futures = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-rest = { workspace = true }
iceberg-datafusion = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
tokio = { workspace = true }
66 changes: 66 additions & 0 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, ScalarValue, Statistics};
use iceberg::{Catalog, Result, TableIdent};
use iceberg_datafusion::compute_statistics;
use iceberg_integration_tests::set_test_fixture;

#[tokio::test]
async fn test_statistics() -> Result<()> {
let fixture = set_test_fixture("datafusion_statistics").await;

let catalog = fixture.rest_catalog;

let table = catalog
.load_table(
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
.unwrap(),
)
.await
.unwrap();

let stats = compute_statistics(&table, None).await?;

assert_eq!(stats, Statistics {
num_rows: Precision::Inexact(14),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))),
min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))),
min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))),
distinct_count: Precision::Absent,
},
],
});

Ok(())
}
3 changes: 3 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub use error::*;

mod physical_plan;
mod schema;
mod statistics;
mod table;

pub use statistics::*;
pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::vec;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::scalar::ScalarValue;
use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
use iceberg::spec::Datum;
use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType};

// A datafusion expression could be an Iceberg predicate, column, or literal.
enum TransformedResult {
Expand Down Expand Up @@ -195,20 +195,44 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
/// Convert a scalar value to an iceberg datum.
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
match value {
ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)),
ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)),
ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)),
ScalarValue::Float32(Some(v)) => Some(Datum::float(*v)),
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::Utf8(Some(v))
| ScalarValue::Utf8View(Some(v))
| ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
_ => None,
}
}

/// Convert an iceberg datum to a datafusion scalar value.
pub fn datum_to_scalar_value(datum: &Datum) -> Option<ScalarValue> {
match (datum.data_type(), datum.literal()) {
(PrimitiveType::Binary, PrimitiveLiteral::Boolean(v)) => {
Some(ScalarValue::Boolean(Some(*v)))
}
(PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Int32(Some(*v))),
(PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(ScalarValue::Int64(Some(*v))),
(PrimitiveType::Float, PrimitiveLiteral::Float(v)) => {
Some(ScalarValue::Float32(Some(v.into_inner())))
}
(PrimitiveType::Double, PrimitiveLiteral::Double(v)) => {
Some(ScalarValue::Float64(Some(v.into_inner())))
}
(PrimitiveType::String, PrimitiveLiteral::String(v)) => {
Some(ScalarValue::Utf8View(Some(v.clone())))
}
(PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Date32(Some(*v))),
_ => None,
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
Expand Down
9 changes: 9 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::vec;

use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::common::Statistics;
use datafusion::error::Result as DFResult;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
Expand All @@ -44,6 +45,8 @@ pub(crate) struct IcebergTableScan {
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// Statistics for the table; row count, and null count/min-max values per column.
statistics: Statistics,
/// Stores certain, often expensive to compute,
/// plan properties used in query optimization.
plan_properties: PlanProperties,
Expand All @@ -59,6 +62,7 @@ impl IcebergTableScan {
table: Table,
snapshot_id: Option<i64>,
schema: ArrowSchemaRef,
statistics: Statistics,
projection: Option<&Vec<usize>>,
filters: &[Expr],
) -> Self {
Expand All @@ -73,6 +77,7 @@ impl IcebergTableScan {
Self {
table,
snapshot_id,
statistics,
plan_properties,
projection,
predicates,
Expand Down Expand Up @@ -135,6 +140,10 @@ impl ExecutionPlan for IcebergTableScan {
stream,
)))
}

fn statistics(&self) -> DFResult<Statistics> {
Ok(self.statistics.clone())
}
}

impl DisplayAs for IcebergTableScan {
Expand Down
112 changes: 112 additions & 0 deletions crates/integrations/datafusion/src/statistics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, Statistics};
use iceberg::spec::ManifestStatus;
use iceberg::table::Table;
use iceberg::Result;

use crate::physical_plan::expr_to_predicate::datum_to_scalar_value;

// Compute DataFusion table statistics for a given table/snapshot
pub async fn compute_statistics(table: &Table, snapshot_id: Option<i64>) -> Result<Statistics> {
let file_io = table.file_io();
let metadata = table.metadata();
let snapshot = table.snapshot(snapshot_id)?;

let mut num_rows = 0;
let mut lower_bounds = HashMap::new();
let mut upper_bounds = HashMap::new();
let mut null_counts = HashMap::new();

let manifest_list = snapshot.load_manifest_list(file_io, metadata).await?;

// For each existing/added manifest in the snapshot aggregate the row count, as well as null
// count and min/max values.
for manifest_file in manifest_list.entries() {
let manifest = manifest_file.load_manifest(file_io).await?;
manifest.entries().iter().for_each(|manifest_entry| {
if manifest_entry.status() != ManifestStatus::Deleted {
let data_file = manifest_entry.data_file();
num_rows += data_file.record_count();
data_file.lower_bounds().iter().for_each(|(col_id, min)| {
lower_bounds
.entry(*col_id)
.and_modify(|col_min| {
if min < col_min {
*col_min = min.clone()
}
})
.or_insert(min.clone());
});
data_file.upper_bounds().iter().for_each(|(col_id, max)| {
upper_bounds
.entry(*col_id)
.and_modify(|col_max| {
if max > col_max {
*col_max = max.clone()
}
})
.or_insert(max.clone());
});
data_file
.null_value_counts()
.iter()
.for_each(|(col_id, null_count)| {
null_counts
.entry(*col_id)
.and_modify(|col_null_count| *col_null_count += *null_count)
.or_insert(*null_count);
});
}
})
}

// Construct the DataFusion `Statistics` object, leaving any missing info as `Precision::Absent`
let schema = snapshot.schema(metadata)?;
let col_stats = schema
.as_struct()
.fields()
.iter()
.map(|field| {
ColumnStatistics {
null_count: null_counts
.get(&field.id)
.map(|nc| Precision::Inexact(*nc as usize))
.unwrap_or(Precision::Absent),
max_value: upper_bounds
.get(&field.id)
.and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact))
.unwrap_or(Precision::Absent),
min_value: lower_bounds
.get(&field.id)
.and_then(|datum| datum_to_scalar_value(datum).map(Precision::Inexact))
.unwrap_or(Precision::Absent),
distinct_count: Precision::Absent, // will be picked up after #417
}
})
.collect();

Ok(Statistics {
num_rows: Precision::Inexact(num_rows as usize),
total_byte_size: Precision::Absent,
column_statistics: col_stats,
})
}
Loading

0 comments on commit 80b8d8c

Please sign in to comment.