Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
363 changes: 357 additions & 6 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl Statistics {
max_value: cs.max_value.clone(),
min_value: cs.min_value.clone(),
sum_value: cs.sum_value.clone(),
distinct_count: Precision::Absent,
distinct_count: cs.distinct_count,
byte_size: cs.byte_size,
})
.collect();
Expand All @@ -679,11 +679,24 @@ impl Statistics {
let item_cs = &stat.column_statistics[col_idx];

col_stats.null_count = col_stats.null_count.add(&item_cs.null_count);
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
col_stats.sum_value =
precision_add(&col_stats.sum_value, &item_cs.sum_value);

// NDV must be computed before min/max update (needs pre-merge ranges)
col_stats.distinct_count = match (
col_stats.distinct_count.get_value(),
item_cs.distinct_count.get_value(),
) {
(Some(&l), Some(&r)) => Precision::Inexact(
estimate_ndv_with_overlap(col_stats, item_cs, l, r)
.unwrap_or_else(|| usize::max(l, r)),
),
_ => Precision::Absent,
};

col_stats.min_value = col_stats.min_value.min(&item_cs.min_value);
col_stats.max_value = col_stats.max_value.max(&item_cs.max_value);
col_stats.sum_value =
precision_add(&col_stats.sum_value, &item_cs.sum_value);
col_stats.byte_size = col_stats.byte_size.add(&item_cs.byte_size);
}
}

Expand All @@ -695,6 +708,96 @@ impl Statistics {
}
}

/// Estimates the combined number of distinct values (NDV) when merging two
/// column statistics, using range overlap to avoid double-counting shared values.
///
/// Assumes values are distributed uniformly within each input's
/// `[min, max]` range (the standard assumption when only summary
/// statistics are available). Under uniformity the fraction of an input's
/// distinct values that land in a sub-range equals the fraction of
/// the range that sub-range covers.
///
/// The combined value space is split into three disjoint regions:
///
/// ```text
/// |-- only A --|-- overlap --|-- only B --|
/// ```
///
/// * **Only in A/B** - values outside the other input's range
/// contribute `(1 - overlap_a) * NDV_a` and `(1 - overlap_b) * NDV_b`.
/// * **Overlap** - both inputs may produce values here. We take
/// `max(overlap_a * NDV_a, overlap_b * NDV_b)` rather than the
/// sum because values in the same sub-range are likely shared
/// (the smaller set is assumed to be a subset of the larger).
///
/// The formula ranges between `[max(NDV_a, NDV_b), NDV_a + NDV_b]`,
/// from full overlap to no overlap.
///
/// ```text
/// NDV = max(overlap_a * NDV_a, overlap_b * NDV_b) [intersection]
/// + (1 - overlap_a) * NDV_a [only in A]
/// + (1 - overlap_b) * NDV_b [only in B]
/// ```
///
/// Returns `None` when min/max are absent or distance is unsupported
/// (e.g. strings), in which case the caller should fall back to a simpler
/// estimate.
pub fn estimate_ndv_with_overlap(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being moved from union.rs makes me wonder if this should be generalized to all operators.

Say rather than just merging two columns we were to merge three: A, B, C. This can result in different outcomes with the same columns being merged since the columns stats are smeared after the first merge.

Example:

- A = [0,100], NDV=80
- B = [50,150], NDV=60
- C = [100,200], NDV=50

Scenarios:
- (A+B)+C = 135
- A+(B+C) = 137

I know that while in union.rs this problem still existed but it was more tightly scoped.

Is there a way we could make this more explicit in the documentation / plan follow up work to handle merging?

I would think that preserving the shape of the distinct values in some way after merging is what we want. This might require more plumbing and could come back to this.

Let me know your thoughts.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have filed #20966 as it's indeed a nice improvement and I think this can be made associative if working with lists of column statistics, rather than merging pair-wise.

I wouldn't consider this a problem for the current PR as the error is still reasonably bounded due to the uniform distribution of NDVs that our formulas rely on anyway.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice nice, this makes sense.

I will also have a peak and some thoughts on that issue. Thank you for all the thoughtful responses and explanations 💯

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to you and to all other reviewers, great discussions!

left: &ColumnStatistics,
right: &ColumnStatistics,
ndv_left: usize,
ndv_right: usize,
) -> Option<usize> {
let left_min = left.min_value.get_value()?;
let left_max = left.max_value.get_value()?;
let right_min = right.min_value.get_value()?;
let right_max = right.max_value.get_value()?;

let range_left = left_max.distance(left_min)?;
let range_right = right_max.distance(right_min)?;

// Constant columns (range == 0) can't use the proportional overlap
// formula below, so check interval overlap directly instead.
if range_left == 0 || range_right == 0 {
let overlaps = left_min <= right_max && right_min <= left_max;
return Some(if overlaps {
usize::max(ndv_left, ndv_right)
} else {
ndv_left + ndv_right
});
}

let overlap_min = if left_min >= right_min {
left_min
} else {
right_min
};
let overlap_max = if left_max <= right_max {
left_max
} else {
right_max
};

// Disjoint ranges: no overlap, NDVs are additive
if overlap_min > overlap_max {
return Some(ndv_left + ndv_right);
}

let overlap_range = overlap_max.distance(overlap_min)? as f64;

let overlap_left = overlap_range / range_left as f64;
let overlap_right = overlap_range / range_right as f64;

let intersection = f64::max(
overlap_left * ndv_left as f64,
overlap_right * ndv_right as f64,
);
let only_left = (1.0 - overlap_left) * ndv_left as f64;
let only_right = (1.0 - overlap_right) * ndv_right as f64;

Some((intersection + only_left + only_right).round() as usize)
}

/// Creates an estimate of the number of rows in the output using the given
/// optional value and exactness flag.
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
Expand Down Expand Up @@ -1361,6 +1464,253 @@ mod tests {
);
}

#[test]
fn test_try_merge_distinct_count_absent() {
// Create statistics with known distinct counts
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.with_total_byte_size(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_null_count(Precision::Exact(0))
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(1))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(5)),
);

let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(15))
.with_total_byte_size(Precision::Exact(150))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_null_count(Precision::Exact(0))
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(20))))
.with_distinct_count(Precision::Exact(7)),
);

// Merge statistics
let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged_stats =
Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();

// Verify the results
assert_eq!(merged_stats.num_rows, Precision::Exact(25));
assert_eq!(merged_stats.total_byte_size, Precision::Exact(250));

let col_stats = &merged_stats.column_statistics[0];
assert_eq!(col_stats.null_count, Precision::Exact(0));
assert_eq!(
col_stats.min_value,
Precision::Exact(ScalarValue::Int32(Some(1)))
);
assert_eq!(
col_stats.max_value,
Precision::Exact(ScalarValue::Int32(Some(20)))
);
// Overlap-based NDV: ranges [1,10] and [5,20], overlap [5,10]
// range_left=9, range_right=15, overlap=5
// overlap_left=5*(5/9)=2.78, overlap_right=7*(5/15)=2.33
// result = max(2.78, 2.33) + (5-2.78) + (7-2.33) = 9.67 -> 10
assert_eq!(col_stats.distinct_count, Precision::Inexact(10));
}

#[test]
fn test_try_merge_ndv_disjoint_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(20))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(30))))
.with_distinct_count(Precision::Exact(8)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// No overlap -> sum of NDVs
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(13)
);
}

#[test]
fn test_try_merge_ndv_identical_ranges() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(50)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(30)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// Full overlap -> max(50, 30) = 50
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(50)
);
}

#[test]
fn test_try_merge_ndv_partial_overlap() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(0))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(100))))
.with_distinct_count(Precision::Exact(80)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(100))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(50))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(150))))
.with_distinct_count(Precision::Exact(60)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// overlap=[50,100], range_left=100, range_right=100, overlap_range=50
// overlap_left=80*(50/100)=40, overlap_right=60*(50/100)=30
// result = max(40,30) + (80-40) + (60-30) = 40 + 40 + 30 = 110
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(110)
);
}

#[test]
fn test_try_merge_ndv_missing_min_max() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown().with_distinct_count(Precision::Exact(8)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// No min/max -> fallback to max(5, 8)
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}

#[test]
fn test_try_merge_ndv_non_numeric_types() {
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"aaa".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"zzz".to_string(),
))))
.with_distinct_count(Precision::Exact(5)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Utf8(Some(
"bbb".to_string(),
))))
.with_max_value(Precision::Exact(ScalarValue::Utf8(Some(
"yyy".to_string(),
))))
.with_distinct_count(Precision::Exact(8)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
// distance() unsupported for strings -> fallback to max
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(8)
);
}

#[test]
fn test_try_merge_ndv_constant_columns() {
// Same constant: [5,5]+[5,5] -> max
let stats1 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats2 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);

let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
let merged = Statistics::try_merge_iter([&stats1, &stats2], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(1)
);

// Different constants: [5,5]+[10,10] -> sum
let stats3 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(5))))
.with_distinct_count(Precision::Exact(1)),
);
let stats4 = Statistics::default()
.with_num_rows(Precision::Exact(10))
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_max_value(Precision::Exact(ScalarValue::Int32(Some(10))))
.with_distinct_count(Precision::Exact(1)),
);

let merged = Statistics::try_merge_iter([&stats3, &stats4], &schema).unwrap();
assert_eq!(
merged.column_statistics[0].distinct_count,
Precision::Inexact(2)
);
}

#[test]
fn test_with_fetch_basic_preservation() {
// Test that column statistics and byte size are preserved (as inexact) when applying fetch
Expand Down Expand Up @@ -2005,8 +2355,9 @@ mod tests {
Precision::Exact(ScalarValue::Int64(Some(3500)))
);
assert_eq!(col_stats.byte_size, Precision::Exact(480));
// distinct_count is always Absent after merge (can't accurately merge NDV)
assert_eq!(col_stats.distinct_count, Precision::Absent);
// Overlap-based NDV merge (pairwise left-to-right):
// stats1+stats2: [10,100]+[5,200] -> NDV=16, then +stats3: [5,200]+[1,150] -> NDV=29
assert_eq!(col_stats.distinct_count, Precision::Inexact(29));
}

#[test]
Expand Down
Loading
Loading