Skip to content

Conversation

@asolimando
Copy link
Member

Which issue does this PR close?

Related: #18628, #8227

(I am not sure if an new issue specifically for the scope of the PR is needed, happy to create it if needed)

Rationale for this change

This work originates from a discussion in datafusion-distributed about improving the TaskEstimator API:
datafusion-contrib/datafusion-distributed#296 (comment)

We agreed that improved statistics support in DataFusion would benefit both projects. For distributed-datafusion, better cardinality estimation helps decide how to split computation across network boundaries.

This also benefits DataFusion directly, as CBO is already in place, for example, join cardinality estimation (joins/utils.rs:586-646) uses distinct_count via max_distinct_count to compute join selectivity.

Currently this field is always Absent when reading from Parquet, so this PR fills that gap.

What changes are included in this PR?

Commit 1 - Reading NDV from Parquet files:

  • Extract distinct_count from Parquet row group column statistics
  • Single row group with NDV -> Precision::Exact(ndv)
  • Multiple row groups with NDV -> Precision::Inexact(max) as conservative lower bound
  • No NDV available -> Precision::Absent

Commit 2 - Statistics propagation (can be split to a separate PR, if preferred):

  • Statistics::try_merge(): use max as conservative lower bound instead of discarding NDV
  • Projection: preserve NDV for single-column expressions as upper bound

I'm including the second commit to showcase how I intend to use the statistics, but these changes can be split to a follow-up PR to keep review scope limited.

Are these changes tested?

Yes, 7 unit tests are added for NDV extraction:

  • Single/multiple row groups with NDV
  • Partial NDV availability across row groups
  • Multiple columns with different NDV values
  • Integration test reading a real Parquet file with distinct_count statistics (following the pattern in
    row_filter.rs:685-696, using parquet_to_arrow_schema to derive the schema from the file)

Are there any user-facing changes?

No breaking changes. Statistics consumers will now see populated distinct_count values when available in Parquet metadata.

Disclaimer: I used AI (Claude Code) to assist translating my ideas into code as I am still ramping up with the codebase and especially with Rust (guidance on both aspects is highly appreciated). I have a good understanding of the core concepts (statistics, CBO etc.) and have carefully double-checked that the PR matches my intentions and understanding.

cc: @gabotechs @jayshrivastava @NGA-TRAN @gene-bordegaray

This change adds support for reading Number of Distinct Values (NDV)
statistics from Parquet file metadata when available.

Previously, `distinct_count` in `ColumnStatistics` was always set to
`Precision::Absent`. Now it is populated from parquet row group
column statistics when present:

- Single row group with NDV: `Precision::Exact(ndv)`
- Multiple row groups with NDV: `Precision::Inexact(max)` as lower bound
  (we can't accurately merge NDV since duplicates may exist across
  row groups; max is more conservative than sum for join cardinality
  estimation)
- No NDV available: `Precision::Absent`

This provides foundation for improved join cardinality estimation
and other statistics-based optimizations.

Relates to apache#15265
- Statistics merge: use max as conservative lower bound instead of
  discarding NDV (duplicates may exist across partitions)
- Projection: preserve NDV for single-column expressions as upper bound
@github-actions github-actions bot added physical-expr Changes to the physical-expr crates common Related to common crate datasource Changes to the datasource crate labels Jan 23, 2026
Copy link
Contributor

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a few minor comments but this looks good 💯

Precision::Inexact(*v)
}
(Precision::Absent, Precision::Absent) => Precision::Absent,
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this verbosity could be reduced to something like:

col_stats.distinct_count = col_stats.distinct_count.get_value()
    .max(item_col_stats.distinct_count.get_value())
    .map(|&v| Precision::Inexact(v))
    .unwrap_or(Precision::Absent);

or we could introduce some method like max_inexact() on Precision.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, this is very neat, addressed in db182e5!

is_max_value_exact: &mut [Option<bool>],
is_min_value_exact: &mut [Option<bool>],
column_byte_sizes: &[Precision<usize>],
distinct_counts: &[Precision<usize>],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit but maybe these could be extracted into a struct that encapsulates these parameters as fields - say extend StatisticsAccumulators and use this or create a new struct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adopted StatisticsAccumulators as suggested, it feels better and I got rid of the "too many arguments" warning suppression, addressed in 4833ef5

use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::fs::File;
use std::path::PathBuf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since these tests are in their own module, I think moving these to the ndv_test module level would be ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, adopted in e36c46a

// TODO stats: estimate more statistics from expressions
// (expressions should compute their statistics themselves)
ColumnStatistics::new_unknown()
// TODO: expressions should compute their own statistics
Copy link
Contributor

@gene-bordegaray gene-bordegaray Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noice, this is useful thanks for understanding implications with using and propagating distincts. thank you 😄

Partition columns now preserve distinct_count as Inexact(1) when
merging statistics, reflecting that each partition file has a single
distinct partition value.
@github-actions github-actions bot added the core Core DataFusion crate label Jan 23, 2026
Use get_value().max() chain instead of verbose match statement for
merging NDV in Statistics::try_merge()
Encapsulate get_col_stats parameters by adding build_column_statistics()
method to StatisticsAccumulators, removing the standalone function.
Move imports to module level in ndv_tests since they're in their own
module anyway.
Copy link
Contributor

@gene-bordegaray gene-bordegaray left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great refactor, very clean

}

fn summarize_min_max_null_counts(
impl StatisticsAccumulators<'_> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

didn't know about this notation, an anonymous lifetime. cool 😄

Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good! I just have one comment to explore what would it take to be a bit more forward looking and try to provide some more foundational tools for stats propagation across expressions. See #19957 (comment)

// TODO stats: estimate more statistics from expressions
// (expressions should compute their statistics themselves)
ColumnStatistics::new_unknown()
// TODO: expressions should compute their own statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this TODO comment is right on point: if we wanted to compute statistics for all expressions here, it would become unmanageable pretty soon.

Even if the estimation about expressions referencing single columns here is an improvement, it would be awesome if we could try to be one step ahead and think how proper stats propagation across expressions would look like. That way, this PR, instead of special-handling one particular case, it could be setting the foundation on top of which more people can start contributing smarter stats calculation.

I see that some prior work has been done in order to improve stats propagation in expressions. Some context here:

By looking at the API for handling statistics in expressions shipped in #14699... I would not know how to use it in order to properly calculate NDV values. I get the impression that 1) we do not want to introduce yet another API for propagating stats across expressions and 2) the current "new" API shipped in #14699 is not suitable for propagating NDV values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any thoughts about how the current Distribution-based API could be used for propagating NDV stats across expressions?

Copy link
Member Author

@asolimando asolimando Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question! I have an idea of how this could evolve, based on my experience with Apache Calcite.

The idea is to make statistics propagation pluggable, with each relational operator having a default but configurable logic for how statistics propagate through it.

The default implementation would follow classic Selinger-style estimation (selectivity factors, independence assumptions), as seen in this PR. A nice intro to this can be found here. That's what most OSS databases implement by default.

Following DataFusion's philosophy as a customizable framework, users should be able to override and complement this logic when needed.

Proposed architecture:

  • StatisticsProvider: chain element that computes statistics for specific operators (returns Computed or Delegate)
  • StatisticsRegistry: chains providers, would live in SessionState
  • CardinalityEstimator: unified interface for metadata queries (row count, selectivity, NDV, ...) - similar to Calcite's RelMetadataProvider/RelMetadataQuery
  • ExtendedStatistics: Statistics with type-safe custom extensions for histograms, sketches, etc. (I am looking at type-erased maps for that but I am not sure that's the best way to implement it)
  • ExpressionAnalyzerRegistry+ExpressionAnalyzer: similar concept of chain for expression analyzers, equivalent to what detailed above for the operators, so that built-in and UDF can be covered

This follows the same chain-of-responsibility pattern that the https://datafusion.apache.org/blog/2026/01/12/extending-sql/ solved for custom syntax/relations. Built-in operators get default handling, custom ExecutionPlan nodes can plug in their own logic, and unknown cases delegate down the chain. To override the default estimation (e.g., with histogram-based approaches), you register your provider before the default one.

StatisticsV2 and Distribution-based are very advanced and interesting statistics, but I see them more as an extension via ExtendedStatistics than taking over the default implementation. What you are wondering about NDVs, for instance, is correct, it deals with distributions but as-is it can't answer question around number of distinct values.

If this sounds interesting and aligns with community interest, I can provide a more detailed design doc and an epic to break down the work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For StatisticsProvider, StatisticsRegistry and CardinalityEstimator. Do you think there is some overlap with the existing ExecutionPlan::partition_statistics() and PhysicalExpr::evaluate_statistics()? would it be appropriate to extend the functionality shipped in both methods? or are you considering StatisticsProvider, StatisticsRegistry and CardinalityEstimator to be a separate mechanism for providing statistics?

@xudong963 xudong963 self-requested a review January 26, 2026 13:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate datasource Changes to the datasource crate physical-expr Changes to the physical-expr crates

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants