Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datafusion): Expose DataFusion statistics on an IcebergTableScan #880

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 1 addition & 21 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,27 +206,7 @@ impl<'a> TableScanBuilder<'a> {

/// Build the table scan.
pub fn build(self) -> Result<TableScan> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => self
.table
.metadata()
.current_snapshot()
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots")
})?
.clone(),
};
let snapshot = self.table.snapshot(self.snapshot_id)?;

let schema = snapshot.schema(self.table.metadata())?;

Expand Down
25 changes: 24 additions & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::inspect::MetadataTable;
use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::spec::{SnapshotRef, TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};

/// Builder to create table scan.
Expand Down Expand Up @@ -201,6 +201,29 @@ impl Table {
TableScanBuilder::new(self)
}

/// Get the specified or latest snapshot for this table
pub fn snapshot(&self, snapshot_id: Option<i64>) -> Result<SnapshotRef> {
Ok(match snapshot_id {
Some(snapshot_id) => self
.metadata()
.snapshot_by_id(snapshot_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Snapshot with id {} not found", snapshot_id),
)
})?
.clone(),
None => self
.metadata()
.current_snapshot()
.ok_or_else(|| {
Error::new(ErrorKind::Unexpected, "Can't scan table without snapshots")
})?
.clone(),
})
}

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn inspect(&self) -> MetadataTable<'_> {
Expand Down
103 changes: 90 additions & 13 deletions crates/integration_tests/tests/shared_tests/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,29 @@ use arrow_schema::TimeUnit;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
use datafusion::catalog::TableProvider;
use datafusion::error::DataFusionError;
use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, ScalarValue, Statistics};
use datafusion::logical_expr::{col, lit};
use datafusion::prelude::SessionContext;
use iceberg::{Catalog, TableIdent};
use iceberg::{Catalog, Result, TableIdent};
use iceberg_catalog_rest::RestCatalog;
use iceberg_datafusion::IcebergTableProvider;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::get_shared_containers;

#[tokio::test]
async fn test_basic_queries() -> Result<(), DataFusionError> {
async fn test_basic_queries() -> Result<()> {
let fixture = get_shared_containers();
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());

let table = rest_catalog
.load_table(&TableIdent::from_strs(["default", "types_test"]).unwrap())
.await
.unwrap();
.await?;

let ctx = SessionContext::new();

let table_provider = Arc::new(
IcebergTableProvider::try_new_from_table(table)
.await
.unwrap(),
);
let table_provider = Arc::new(IcebergTableProvider::try_new_from_table(table).await?);

let schema = table_provider.schema();

Expand Down Expand Up @@ -118,13 +115,15 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
])
);

ctx.register_table("types_table", table_provider)?;
ctx.register_table("types_table", table_provider).unwrap();

let batches = ctx
.sql("SELECT * FROM types_table ORDER BY cbigint LIMIT 3")
.await?
.await
.unwrap()
.collect()
.await?;
.await
.unwrap();
let expected = [
"+----------+----------+-----------+------+---------+--------+---------+----------+------------+---------------------+----------------------+---------+----------+",
"| cboolean | ctinyint | csmallint | cint | cbigint | cfloat | cdouble | cdecimal | cdate | ctimestamp_ntz | ctimestamp | cstring | cbinary |",
Expand All @@ -137,3 +136,81 @@ async fn test_basic_queries() -> Result<(), DataFusionError> {
assert_batches_eq!(expected, &batches);
Ok(())
}

#[tokio::test]
async fn test_statistics() -> Result<()> {
let fixture = get_shared_containers();
let rest_catalog = RestCatalog::new(fixture.catalog_config.clone());

// Test table statistics
let table = rest_catalog
.load_table(&TableIdent::from_strs([
"default",
"test_positional_merge_on_read_double_deletes",
])?)
.await?;

let table_provider = IcebergTableProvider::try_new_from_table(table)
.await?
.with_computed_statistics()
.await;

let table_stats = table_provider.statistics();

assert_eq!(
table_stats,
Some(Statistics {
num_rows: Precision::Inexact(12),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))),
min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))),
min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))),
distinct_count: Precision::Absent,
},
],
})
);

// Test plan statistics with filtering
let ctx = SessionContext::new();
let scan = table_provider
.scan(
&ctx.state(),
Some(&vec![1]),
&[col("number").gt(lit(4))],
None,
)
.await
.unwrap();

let plan_stats = scan.statistics().unwrap();

// The estimate for the number of rows and the min value for the column are changed in response
// to the filtration
assert_eq!(plan_stats, Statistics {
num_rows: Precision::Inexact(8),
total_byte_size: Precision::Absent,
column_statistics: vec![ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(5))),
distinct_count: Precision::Absent,
},],
});

Ok(())
}
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async-trait = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
Expand Down
3 changes: 3 additions & 0 deletions crates/integrations/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub use error::*;

mod physical_plan;
mod schema;
mod statistics;
mod table;

pub use statistics::*;
pub use table::table_provider_factory::IcebergTableProviderFactory;
pub use table::*;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::vec;
use datafusion::logical_expr::{Expr, Operator};
use datafusion::scalar::ScalarValue;
use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
use iceberg::spec::Datum;
use iceberg::spec::{Datum, PrimitiveLiteral, PrimitiveType};

// A datafusion expression could be an Iceberg predicate, column, or literal.
enum TransformedResult {
Expand Down Expand Up @@ -196,20 +196,44 @@ const MILLIS_PER_DAY: i64 = 24 * 60 * 60 * 1000;
/// Convert a scalar value to an iceberg datum.
fn scalar_value_to_datum(value: &ScalarValue) -> Option<Datum> {
match value {
ScalarValue::Boolean(Some(v)) => Some(Datum::bool(*v)),
ScalarValue::Int8(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int16(Some(v)) => Some(Datum::int(*v as i32)),
ScalarValue::Int32(Some(v)) => Some(Datum::int(*v)),
ScalarValue::Int64(Some(v)) => Some(Datum::long(*v)),
ScalarValue::Float32(Some(v)) => Some(Datum::double(*v as f64)),
ScalarValue::Float32(Some(v)) => Some(Datum::float(*v)),
ScalarValue::Float64(Some(v)) => Some(Datum::double(*v)),
ScalarValue::Utf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::Utf8(Some(v))
| ScalarValue::Utf8View(Some(v))
| ScalarValue::LargeUtf8(Some(v)) => Some(Datum::string(v.clone())),
ScalarValue::Date32(Some(v)) => Some(Datum::date(*v)),
ScalarValue::Date64(Some(v)) => Some(Datum::date((*v / MILLIS_PER_DAY) as i32)),
_ => None,
}
}

/// Convert an iceberg datum to a datafusion scalar value.
pub fn datum_to_scalar_value(datum: &Datum) -> Option<ScalarValue> {
match (datum.data_type(), datum.literal()) {
(PrimitiveType::Binary, PrimitiveLiteral::Boolean(v)) => {
Some(ScalarValue::Boolean(Some(*v)))
}
(PrimitiveType::Int, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Int32(Some(*v))),
(PrimitiveType::Long, PrimitiveLiteral::Long(v)) => Some(ScalarValue::Int64(Some(*v))),
(PrimitiveType::Float, PrimitiveLiteral::Float(v)) => {
Some(ScalarValue::Float32(Some(v.into_inner())))
}
(PrimitiveType::Double, PrimitiveLiteral::Double(v)) => {
Some(ScalarValue::Float64(Some(v.into_inner())))
}
(PrimitiveType::String, PrimitiveLiteral::String(v)) => {
Some(ScalarValue::Utf8View(Some(v.clone())))
}
(PrimitiveType::Date, PrimitiveLiteral::Int(v)) => Some(ScalarValue::Date32(Some(*v))),
_ => None,
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
Expand Down
Loading
Loading