Skip to content

Commit 07d3b8c

Browse files
authored
Merge pull request JanKaul#265 from JanKaul/improve-cardinality-estimate
Improve cardinality estimate
2 parents f451462 + 7e5f162 commit 07d3b8c

File tree

2 files changed

+222
-12
lines changed

2 files changed

+222
-12
lines changed

datafusion_iceberg/src/statistics.rs

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use datafusion::{
44
scalar::ScalarValue,
55
};
66
use iceberg_rust::error::Error;
7+
use iceberg_rust::file_format::parquet::estimate_distinct_count;
78
use iceberg_rust::spec::{
89
manifest::{ManifestEntry, Status},
910
schema::Schema,
@@ -45,12 +46,16 @@ pub(crate) fn statistics_from_datafiles(
4546
.column_statistics
4647
.into_iter()
4748
.zip(column_stats)
48-
.map(|(acc, x)| ColumnStatistics {
49-
null_count: acc.null_count.add(&x.null_count),
50-
max_value: acc.max_value.max(&x.max_value),
51-
min_value: acc.min_value.min(&x.min_value),
52-
distinct_count: acc.distinct_count.add(&x.distinct_count),
53-
sum_value: acc.sum_value.add(&x.sum_value),
49+
.map(|(acc, x)| {
50+
let new_distinct_count = new_distinct_count(&acc, &x);
51+
52+
ColumnStatistics {
53+
null_count: acc.null_count.add(&x.null_count),
54+
max_value: acc.max_value.max(&x.max_value),
55+
min_value: acc.min_value.min(&x.min_value),
56+
distinct_count: new_distinct_count,
57+
sum_value: acc.sum_value.add(&x.sum_value),
58+
}
5459
})
5560
.collect(),
5661
}
@@ -134,3 +139,49 @@ fn convert_value_to_scalar_value(value: Value) -> Result<ScalarValue, Error> {
134139
)),
135140
}
136141
}
142+
143+
fn new_distinct_count(acc: &ColumnStatistics, x: &ColumnStatistics) -> Precision<usize> {
144+
match (
145+
&acc.distinct_count,
146+
&x.distinct_count,
147+
&acc.min_value,
148+
&acc.max_value,
149+
&x.min_value,
150+
&x.max_value,
151+
) {
152+
(
153+
Precision::Exact(old_count),
154+
Precision::Exact(new_count),
155+
Precision::Exact(ScalarValue::Int32(Some(old_min))),
156+
Precision::Exact(ScalarValue::Int32(Some(old_max))),
157+
Precision::Exact(ScalarValue::Int32(Some(new_min))),
158+
Precision::Exact(ScalarValue::Int32(Some(new_max))),
159+
) => {
160+
let estimated = estimate_distinct_count(
161+
&[old_min, old_max],
162+
&[new_min, new_max],
163+
*old_count as i64,
164+
*new_count as i64,
165+
);
166+
Precision::Inexact(*old_count + estimated as usize)
167+
}
168+
(
169+
Precision::Exact(old_count),
170+
Precision::Exact(new_count),
171+
Precision::Exact(ScalarValue::Int64(Some(old_min))),
172+
Precision::Exact(ScalarValue::Int64(Some(old_max))),
173+
Precision::Exact(ScalarValue::Int64(Some(new_min))),
174+
Precision::Exact(ScalarValue::Int64(Some(new_max))),
175+
) => {
176+
let estimated = estimate_distinct_count(
177+
&[old_min, old_max],
178+
&[new_min, new_max],
179+
*old_count as i64,
180+
*new_count as i64,
181+
);
182+
Precision::Inexact(*old_count + estimated as usize)
183+
}
184+
(Precision::Absent, Precision::Exact(_), _, _, _, _) => x.distinct_count,
185+
_ => acc.distinct_count.add(&x.distinct_count),
186+
}
187+
}

iceberg-rust/src/file_format/parquet.rs

Lines changed: 165 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use std::{
66
collections::{hash_map::Entry, HashMap},
7+
ops::Sub,
78
sync::Arc,
89
};
910

@@ -89,18 +90,66 @@ pub fn parquet_to_datafile(
8990
.and_modify(|x| *x += null_count as i64)
9091
.or_insert(null_count as i64);
9192
}
92-
if let Some(distinct_count) = statistics.distinct_count_opt() {
93-
distinct_counts
94-
.entry(id)
95-
.and_modify(|x| *x += distinct_count as i64)
96-
.or_insert(distinct_count as i64);
97-
}
93+
9894
let data_type = &schema
9995
.fields()
10096
.get(id as usize)
10197
.ok_or_else(|| Error::Schema(column_name.to_string(), "".to_string()))?
10298
.field_type;
10399

100+
if let (Some(distinct_count), Some(min_bytes), Some(max_bytes)) = (
101+
statistics.distinct_count_opt(),
102+
statistics.min_bytes_opt(),
103+
statistics.max_bytes_opt(),
104+
) {
105+
let min = Value::try_from_bytes(min_bytes, data_type)?;
106+
let max = Value::try_from_bytes(max_bytes, data_type)?;
107+
let current_min = lower_bounds.get(&id);
108+
let current_max = upper_bounds.get(&id);
109+
match (min, max, current_min, current_max) {
110+
(
111+
Value::Int(min),
112+
Value::Int(max),
113+
Some(Value::Int(current_min)),
114+
Some(Value::Int(current_max)),
115+
) => {
116+
distinct_counts
117+
.entry(id)
118+
.and_modify(|x| {
119+
*x += estimate_distinct_count(
120+
&[current_min, current_max],
121+
&[&min, &max],
122+
*x,
123+
distinct_count as i64,
124+
);
125+
})
126+
.or_insert(distinct_count as i64);
127+
}
128+
(
129+
Value::LongInt(min),
130+
Value::LongInt(max),
131+
Some(Value::LongInt(current_min)),
132+
Some(Value::LongInt(current_max)),
133+
) => {
134+
distinct_counts
135+
.entry(id)
136+
.and_modify(|x| {
137+
*x += estimate_distinct_count(
138+
&[current_min, current_max],
139+
&[&min, &max],
140+
*x,
141+
distinct_count as i64,
142+
);
143+
})
144+
.or_insert(distinct_count as i64);
145+
}
146+
(_, _, None, None) => {
147+
distinct_counts.entry(id).or_insert(distinct_count as i64);
148+
}
149+
_ => (),
150+
}
151+
}
152+
104153
if let Some(min_bytes) = statistics.min_bytes_opt() {
105154
if let Type::Primitive(_) = &data_type {
106155
let new = Value::try_from_bytes(min_bytes, data_type)?;
@@ -275,3 +324,113 @@ pub fn thrift_size<T: TSerializable>(metadata: &T) -> Result<usize, Error> {
275324
metadata.write_to_out_protocol(&mut protocol)?;
276325
Ok(buffer.bytes_written())
277326
}
327+
328+
fn range_overlap<T: Ord + Sub + Copy>(
329+
old_range: &[&T; 2],
330+
new_range: &[&T; 2],
331+
) -> <T as Sub>::Output {
332+
let overlap_start = (*old_range[0]).max(*new_range[0]);
333+
let overlap_end = (*old_range[1]).min(*new_range[1]);
334+
overlap_end - overlap_start
335+
}
336+
337+
/// Helper trait to convert numeric types to f64 for statistical calculations.
338+
///
339+
/// This trait provides a uniform interface for converting integer types to f64,
340+
/// which is necessary for the statistical estimation algorithms. The conversion
341+
/// may be lossy for very large i64 values (beyond 2^53), but this is acceptable
342+
/// for statistical approximations.
343+
pub trait ToF64 {
344+
/// Converts the value to f64.
345+
///
346+
/// # Note
347+
///
348+
/// For i64 values larger than 2^53, precision may be lost in the conversion.
349+
/// This is acceptable for statistical calculations where exact precision is
350+
/// not required.
351+
fn to_f64(self) -> f64;
352+
}
353+
354+
impl ToF64 for i32 {
355+
fn to_f64(self) -> f64 {
356+
self as f64
357+
}
358+
}
359+
360+
impl ToF64 for i64 {
361+
fn to_f64(self) -> f64 {
362+
self as f64
363+
}
364+
}
365+
366+
/// Estimates the number of new distinct values when merging two sets of statistics.
367+
///
368+
/// This function assumes uniform distribution of distinct values within their respective ranges
369+
/// and uses an independence approximation to estimate overlap probability.
370+
///
371+
/// # Algorithm
372+
///
373+
/// The estimation is split into two parts:
374+
/// 1. **Non-overlapping region**: All values in the new range that fall outside the old range
375+
/// are guaranteed to be new.
376+
/// 2. **Overlapping region**: Uses the independence approximation:
377+
/// - P(specific value not covered) = ((R-1)/R)^k
378+
/// - where R is the overlap size and k is the expected number of old values in the overlap
379+
/// - Expected new values = n2_overlap × P(not covered)
380+
///
381+
/// # Parameters
382+
///
383+
/// * `old_range` - [min, max] of the existing value range
384+
/// * `new_range` - [min, max] of the new value range
385+
/// * `old_distinct_count` - Number of distinct values in the old range
386+
/// * `new_distinct_count` - Number of distinct values in the new range
387+
///
388+
/// # Returns
389+
///
390+
/// Estimated number of new distinct values to add to the running total
391+
///
392+
/// # Example
393+
///
394+
/// ```ignore
395+
/// // Old range [0, 1000] with 100 distinct values
396+
/// // New range [500, 1500] with 50 distinct values
397+
/// let new_count = estimate_distinct_count(&[&0, &1000], &[&500, &1500], 100, 50);
398+
/// ```
399+
pub fn estimate_distinct_count<T>(
400+
old_range: &[&T; 2],
401+
new_range: &[&T; 2],
402+
old_distinct_count: i64,
403+
new_distinct_count: i64,
404+
) -> i64
405+
where
406+
T: Ord + Sub<Output = T> + Copy + Default + ToF64,
407+
{
408+
let new_range_size = (*new_range[1] - *new_range[0]).to_f64();
409+
let current_range_size = (*old_range[1] - *old_range[0]).to_f64();
410+
let overlap = range_overlap(old_range, new_range);
411+
let overlap_size: f64 = if overlap >= T::default() {
412+
overlap.to_f64()
413+
} else {
414+
0.0
415+
};
416+
let n2 = new_distinct_count as f64;
417+
let n1 = old_distinct_count as f64;
418+
419+
// Values outside overlap are definitely new
420+
let outside_overlap = ((new_range_size - overlap_size) / new_range_size * n2).max(0.0);
421+
422+
// For overlap region: estimate how many new values exist
423+
// using independence approximation: P(value not covered) = ((R-1)/R)^k
424+
// Expected new values in overlap = n2_overlap * ((R-1)/R)^(n1_overlap)
425+
let n2_overlap = (overlap_size / new_range_size * n2).max(0.0);
426+
let expected_n1_in_overlap = (overlap_size / current_range_size * n1).max(0.0);
427+
428+
let new_in_overlap = if overlap_size > 0.0 {
429+
let prob_not_covered = ((overlap_size - 1.0) / overlap_size).powf(expected_n1_in_overlap);
430+
n2_overlap * prob_not_covered
431+
} else {
432+
0.0
433+
};
434+
435+
(outside_overlap + new_in_overlap).round() as i64
436+
}

0 commit comments

Comments
 (0)