Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datafusion): Expose DataFusion statistics on an IcebergTableScan #880

Open
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

gruuya
Copy link
Contributor

@gruuya gruuya commented Jan 6, 2025

Closes #869.

Provide detailed statistics via DataFusion's ExecutionPlan::statistics for more efficient join planning.

The statistics is accumulated from the snapshot's manifests, and converted to the adequate DataFusion struct.

Verified

This commit was signed with the committer’s verified signature.
renovate-bot Mend Renovate
@gruuya gruuya force-pushed the datafusion-statistics branch from 5a30c42 to 80b8d8c Compare January 6, 2025 13:41
// For each existing/added manifest in the snapshot aggregate the row count, as well as null
// count and min/max values.
for manifest_file in manifest_list.entries() {
let manifest = manifest_file.load_manifest(file_io).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two problems with this approach:

  1. It maybe quite slow for large table
  2. The value is incorrect for table with deletions, which maybe quite different.

Also iceberg has table level statistics: https://iceberg.apache.org/spec/#table-statistics But currently it only contains ndv for each column. Should we consider reading this table statistics?

cc @Fokko @Xuanwo @sdd what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback, I greatly appreciate it.

Regarding 1, I agree completely (and think it should be done during table instantiation), I mainly wanted to get some validation on the general approach first. (Perhaps it might also be an optional call via something like IcebergTableProvider::with_statistics, which would be chained after one of the existing construction methods.)

As for point 2, is it not sufficient that I aggregate stats only for manifest_entry.status() != ManifestStatus::Deleted below? Put another way is it possible for ManifestStatus::Existing | ManifestStatus::Added entries to contain some misleading stats?

Finally, while I think exposing the spec (puffin) statistics should definitely be implemented, it seems that this is not always available (it may be opt-in for some external writers such as pyiceberg/spark?), so the best course of action for starters is to gather the stats from the manifest (entries) by default.

Copy link
Contributor

@ZENOTME ZENOTME Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for point 2, is it not sufficient that I aggregate stats only for manifest_entry.status() != ManifestStatus::Deleted below? Put another way is it possible for ManifestStatus::Existing | ManifestStatus::Added entries to contain some misleading stats?

I think what @liurenjie1024 means is the delete file.🤔 https://iceberg.apache.org/spec/#delete-formats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, good point; I've omitted the equality/position delete files from stats computation process, so only pure data files are included now.

I assume that data files with associated row-level deletes don't get lower-upper bounds updated during commit time. I think that's ok, it just means that the real lower-upper range is strictly contained/nested within the reported lower-upper range (because the rows with actual min/max might be refuted by row level deletes), so the later is more conservative. This information is conveyed by using the Inexact variant for the bounds. (Perhaps this could be made Exact for the row count by subtracting rows from row-deletion files, but I think that could be done separately.)

I've also moved the statistics computation into IcebergTableProvider constructors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that data files with associated row-level deletes don't get lower-upper bounds updated during commit time.

Sorry, I don't get why it's not updated. If a row level deletion deletes a row with least values, should this be updated?

For point 1, I agree that adding an option would be useful to enable it or not.

For point 2, there are many problems, not only about deletions.

For example, for vary length columns, the stored value maybe truncated, then it's not appropricate to use them to calculate upper/lower bounds.

Also deletion not only change lower/upper bounds, but also row count, nvd, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get why it's not updated. If a row level deletion deletes a row with least values, should this be updated?

Oh, so what I was getting at is that I'm not sure whether an existing data file's stats (upper/lower bounds) are updated when a row-level deletion occurs which targets that file. Ideally yes, but I guess that is not a given. In other words I'm wondering is there an equivalent to Deltas tight-wide bounds for Iceberg:

In the presence of Deletion Vectors the statistics may be somewhat outdated, i.e. not reflecting deleted rows yet. The flag stats.tightBounds indicates whether we have tight bounds (i.e. the min/maxValue exists1 in the valid state of the file) or wide bounds (i.e. the minValue is <= all valid values in the file, and the maxValue >= all valid values in the file). These upper/lower bounds are sufficient information for data skipping. Note, stats.tightBounds should be treated as true when it is not explicitly present in the statistics.


For example, for vary length columns, the stored value maybe truncated

Good point, manifests with trimmed upper/lower bounds for potentially large (e.g. string) columns will be misleading, arguably we should just skip providing statistics on those for now.

Also deletion not only change lower/upper bounds, but also row count, nvd, etc.

True, though you can still get good estimates on those, and again it's more useful to provide those than not as hints to DataFusion.

/// A reference-counted arrow `Schema`.
schema: ArrowSchemaRef,
}

impl IcebergTableProvider {
pub(crate) fn new(table: Table, schema: ArrowSchemaRef) -> Self {
pub(crate) async fn new(table: Table, schema: ArrowSchemaRef) -> Self {
let statistics = compute_statistics(&table, None).await.ok();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some people don't care about stats (e.g. they don't really do a lot of joins). I assume a friendlier API in that case would be making the stats computation opt-in via something like

--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -119,6 +119,12 @@ impl IcebergTableProvider {
             statistics,
         })
     }
+
+    pub async fn with_computed_statistics(mut self) -> Self {
+        let statistics = compute_statistics(&self.table, self.snapshot_id).await.ok();
+        self.statistics = statistics;
+        self
+    }
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why statistics matters for join, I think you are referring to join reordering algorithm in query optimizer? From my experience, complex table statistics doesn't help much in join reordering. For example, if the joined table has many filters, how would you estimate correct statistics after filtering. Histogram may help for single column filter, but not for complex filters. Also cardinality estimation in join doesn't work well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the statistics computation opt-in along the above lines now.

I think you are referring to join reordering algorithm in query optimizer?

Yes, that is correct.

From my experience, complex table statistics doesn't help much in join reordering.

I think there are cases where it can help significantly, see apache/datafusion#7949 and apache/datafusion#7950 for instance.

For example, if the joined table has many filters, how would you estimate correct statistics after filtering. Histogram may help for single column filter, but not for complex filters. Also cardinality estimation in join doesn't work well.

Yeah admittedly, the entire procedure is based on a number of heuristics, and can be quite guesstimatative in nature. Still I think there's considerable value to be extracted, even if only hints are provided; cc @alamb who knows a lot more about potential pitfalls and upsides here than me.

@gruuya gruuya force-pushed the datafusion-statistics branch from 4d73dab to 979d07a Compare January 9, 2025 08:02
Comment on lines 155 to 157
self.statistics
.clone()
.unwrap_or(Statistics::new_unknown(self.schema.as_ref())),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One major missing aspect is applying the filters here (and perhaps projections) to the base statistics to further reduce the range of upper-lower bounds.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking this as Draft until i add support for this.

@gruuya gruuya marked this pull request as draft January 22, 2025 08:12
@gruuya gruuya force-pushed the datafusion-statistics branch from 918af65 to 3aebce8 Compare January 27, 2025 14:20
…t in the plan
@gruuya gruuya force-pushed the datafusion-statistics branch from 3aebce8 to 89a1dfa Compare January 27, 2025 14:22
This way we can avoid container port races.
@gruuya gruuya marked this pull request as ready for review January 29, 2025 10:31
@gruuya
Copy link
Contributor Author

gruuya commented Jan 29, 2025

I think this PR is ready for review again; @ZENOTME @liurenjie1024 can you take a look and share additional feedback?

If this proposal makes sense I can start adding more tests. I also think some actual benchmarking is in order.

Copy link
Member

@Xuanwo Xuanwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thank you @gruuya for working on this. Most changes look good to me. Waiting for @liurenjie1024 to take another look.

@@ -34,5 +34,6 @@ iceberg-catalog-rest = { workspace = true }
iceberg-datafusion = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
parquet = { workspace = true }
serial_test = "*"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, it's better to have a clear version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, though with #924 merged this is no longer needed, so I removed it now.

@gruuya
Copy link
Contributor Author

gruuya commented Feb 8, 2025

Hi, thank you @gruuya for working on this. Most changes look good to me. Waiting for @liurenjie1024 to take another look.

Thank you for taking a look!

I also think some actual benchmarking is in order.

FWIW, we also decided to run some baseline benchmarking, and the results might suggest that there are other things worth investigating when it comes to speeding up scanning (other than just exposing statistics):
image
The above was measured against a local MinIO without the changes in this PR. However since query 1 is also kind of slow, and it doesn't involve joins, it seems the problem is not addressable by this PR alone (i.e. it is a systemic issue).

We haven't done a deep dive on it, but the comparison with the other iceberg-rust implementation is illuminating.

It's our guess that this distinction might arise due to scanning primitives used. JanKaul/iceberg-rust leverages ParquetExec from DataFusion, which is at this point highly optimized, and probably benefits from a more favorable work distribution (e.g. a combination of more evenly spread record batches across different partition streams, scanning multiple ranges from same Parquet files in parallel Tokio tasks, more efficient pruning etc.) than get_batch_stream.

This ultimately leads to DataFusion inserting an expensive RepartitionExec node on top of the IcebergTableScan. Hence the missing statistics might be a second-order effect only.

@Xuanwo
Copy link
Member

Xuanwo commented Feb 8, 2025

It's our guess that this distinction might arise due to scanning primitives used. JanKaul/iceberg-rust leverages ParquetExec from DataFusion, which is at this point highly optimized, and probably benefits from a more favorable work distribution (e.g. a combination of more evenly spread record batches across different partition streams, scanning multiple ranges from same Parquet files in parallel Tokio tasks, more efficient pruning etc.) than get_batch_stream.

Hi, I believe that highly possible. The existing get batch stream is designed for simple workloads and I'm guessing query engines need to build its own part distribution logic instead.

@gruuya
Copy link
Contributor Author

gruuya commented Feb 9, 2025

The existing get batch stream is designed for simple workloads and I'm guessing query engines need to build its own part distribution logic instead.

Got it, makes sense. I'm wondering whether it would be beneficial and desirable if IcebergTableScan, being a DataFusion-oriented scaning/planning API, had a special implementation relying on DataFusion primitives (i.e. ParquetExec), to squeeze out as much perf as possible. Basically something along the lines of table_scan from JanKaul/iceberg-rust.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Expose Iceberg table statistics in DataFusion interface(s)
4 participants