Skip to content

Commit

Permalink
Move statistics computation to IcebergTableProvider constructor methods
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jan 8, 2025
1 parent 1738308 commit 98bdd8a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
58 changes: 32 additions & 26 deletions crates/integration_tests/tests/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::catalog::TableProvider;
use datafusion::common::stats::Precision;
use datafusion::common::{ColumnStatistics, ScalarValue, Statistics};
use iceberg::{Catalog, Result, TableIdent};
use iceberg_datafusion::compute_statistics;
use iceberg_datafusion::IcebergTableProvider;
use iceberg_integration_tests::set_test_fixture;

#[tokio::test]
Expand All @@ -35,32 +36,37 @@ async fn test_statistics() -> Result<()> {
.await
.unwrap();

let stats = compute_statistics(&table, None).await?;
let stats = IcebergTableProvider::try_new_from_table(table)
.await?
.statistics();

assert_eq!(stats, Statistics {
num_rows: Precision::Inexact(12),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))),
min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))),
min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))),
distinct_count: Precision::Absent,
},
],
});
assert_eq!(
stats,
Some(Statistics {
num_rows: Precision::Inexact(12),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Date32(Some(19428))),
min_value: Precision::Inexact(ScalarValue::Date32(Some(19417))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Int32(Some(12))),
min_value: Precision::Inexact(ScalarValue::Int32(Some(1))),
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Inexact(0),
max_value: Precision::Inexact(ScalarValue::Utf8View(Some("l".to_string()))),
min_value: Precision::Inexact(ScalarValue::Utf8View(Some("a".to_string()))),
distinct_count: Precision::Absent,
},
],
})
);

Ok(())
}
3 changes: 2 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ pub(crate) struct IcebergTableScan {
table: Table,
/// Snapshot of the table to scan.
snapshot_id: Option<i64>,
/// Statistics for the table; row count, and null count/min-max values per column.
/// Statistics for the scan; row count and null count/min-max values per column.
/// If not present defaults to empty (absent) statistics.
statistics: Statistics,
/// Stores certain, often expensive to compute,
/// plan properties used in query optimization.
Expand Down
24 changes: 19 additions & 5 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,21 @@ pub struct IcebergTableProvider {
table: Table,
/// Table snapshot id that will be queried via this provider.
snapshot_id: Option<i64>,
/// Statistics for the table; row count and null count/min-max values per column.
/// If not present defaults to `None`.
statistics: Option<Statistics>,
/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self {
let statistics = compute_statistics(&table, None).await.ok();
IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
}
}
/// Asynchronously tries to construct a new [`IcebergTableProvider`]
Expand All @@ -67,21 +72,25 @@ impl IcebergTableProvider {

let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

let statistics = compute_statistics(&table, None).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
})
}

/// Asynchronously tries to construct a new [`IcebergTableProvider`]
/// using the given table. Can be used to create a table provider from an existing table regardless of the catalog implementation.
pub async fn try_new_from_table(table: Table) -> Result<Self> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let statistics = compute_statistics(&table, None).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: None,
schema,
statistics,
})
}

Expand All @@ -102,10 +111,12 @@ impl IcebergTableProvider {
})?;
let schema = snapshot.schema(table.metadata())?;
let schema = Arc::new(schema_to_arrow_schema(&schema)?);
let statistics = compute_statistics(&table, Some(snapshot_id)).await.ok();
Ok(IcebergTableProvider {
table,
snapshot_id: Some(snapshot_id),
schema,
statistics,
})
}
}
Expand All @@ -131,19 +142,22 @@ impl TableProvider for IcebergTableProvider {
filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let statistics = compute_statistics(&self.table, self.snapshot_id)
.await
.unwrap_or(Statistics::new_unknown(self.schema.as_ref()));
Ok(Arc::new(IcebergTableScan::new(
self.table.clone(),
self.snapshot_id,
self.schema.clone(),
statistics,
self.statistics
.clone()
.unwrap_or(Statistics::new_unknown(self.schema.as_ref())),
projection,
filters,
)))
}

fn statistics(&self) -> Option<Statistics> {
self.statistics.clone()
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ impl TableProviderFactory for IcebergTableProviderFactory {
let schema = schema_to_arrow_schema(table.metadata().current_schema())
.map_err(to_datafusion_error)?;

Ok(Arc::new(IcebergTableProvider::new(table, Arc::new(schema))))
Ok(Arc::new(
IcebergTableProvider::new(table, Arc::new(schema)).await,
))
}
}

Expand Down

0 comments on commit 98bdd8a

Please sign in to comment.