Skip to content

Analysis to supportSortPreservingMerge --> ProgressiveEval #15191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Tracked by #6672
alamb opened this issue Mar 12, 2025 · 13 comments
Open
Tracked by #6672

Analysis to supportSortPreservingMerge --> ProgressiveEval #15191

alamb opened this issue Mar 12, 2025 · 13 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 12, 2025

This is a potential design to support

It is largely copy/paste from an internal design I wrote for a project at InfluxData

We are planning to propose upstreaming what we do, and @wiedld is working on this

I purposely wrote it in markdown to make it easier to copy/paste the diagrams and explanation into code.

Background:

📖 The following description uses the DataFusions definition of a partition , not the IOx one (how data is divided across files)

What is SortPreservingMerge?

SortPreservingMerge is DataFusion operator that merges data row by row from multiple sorted input partitions. In order to produce any output rows, the SortPreservingMerge must open all its inputs (e.g. must open several parquet files). Here is a diagram:

┌─────────────────────────┐
│ ┌───┬───┬───┬───┐       │
│ │ A │ B │ C │ D │ ...   │──┐
│ └───┴───┴───┴───┘       │  │
└─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
  Partition 1                │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
                             ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C │ D ║ E ║ ... │
                             │  │                   │    │ └───┴───╩═══╩───┴───╩═══╝     │
┌─────────────────────────┐  │  └───────────────────┘    └───────────────────────────────┘
│ ╔═══╦═══╗               │  │
│ ║ B ║ E ║     ...       │──┘
│ ╚═══╩═══╝               │
└─────────────────────────┘
  Partition 2


 Input Partitions                                          Output Partition
   (sorted)                                                  (sorted)

What is ProgressiveEval?

ProgressiveEval is a special operator. See the blog post Making Most Recent Value Queries Hundreds of Times Faster for more details

ProgressiveEval outputs in order"

  1. all rows for its first input partition
  2. all rows for its second input partition
  3. all rows for its third input partition
  4. and so on.

Note that ProgressiveEval only starts [2 (configurable)] inputs at a time. Here is a diagram

┌─────────────────────────┐
│ ┌───┬───┬───┬───┐       │
│ │ A │ B │ C │ D │       │──┐
│ └───┴───┴───┴───┘       │  │
└─────────────────────────┘  │  ┌───────────────────┐    ┌───────────────────────────────┐
  Stream 1                   │  │                   │    │ ┌───┬───╦═══╦───┬───╦═══╗     │
                             ├─▶│  ProgressiveEval  │───▶│ │ A │ B ║ C ║ D │ M ║ N ║ ... │
                             │  │                   │    │ └───┴─▲─╩═══╩───┴───╩═══╝     │
┌─────────────────────────┐  │  └───────────────────┘    └─┬─────┴───────────────────────┘
│ ╔═══╦═══╗               │  │
│ ║ M ║ N ║               │──┘                             │
│ ╚═══╩═══╝               │                Output is all rows from Stream 1 followed by all rows from Stream 2, etc
└─────────────────────────┘
  Stream 2


 Input Streams                                             Output stream
 (in some order)                                           (in same order)

Why is SortPreservingMerge better then ProgressiveEval?

When possible, ProgressiveEval should be used instead of SortPreservingMerge because:

  1. It is faster and less CPU intensive (it doesn't need to compare individual rows across partitions)
  2. It is more efficient with limit (as it does not start all the input streams at once).

Under what conditions SortPreservingMerge be converted to ProgressiveEval?

In order to convert a SortPreservingMerge (SPM) to ProgressiveEval, the plans must still produce the same results. We know all input partitions to the SPM are sorted on the sort expressions (this is required for correctness) and the output of the SPM will also be sorted on these expressions

We define the "Lexical Space" as the space of all possible values of the sort expressions. For example, given data with a sort order of A ASC, B ASC (A ascending, B ascending), then the lexical space is all the unique combinations of (A, B). The "range" of an input in this lexical space is the minimum and maximum sort key values.

For example, for data like

a b
1 100
1 200
1 300
2 100
2 200
3 50

The lexical range is min --> max: (1,100) --> (3,50)

Using a ProgressiveEval instead of SortPreservingMerge requires

  1. The input partitions's lexical ranges do not overlap
  2. The partitions are ordered in increasing key space

When this is the case, concatenating such partitions together results in the same otuput as a sorted stream, and thus the output of ProgressiveEval and SortPreservingMerge are the same

Example: Using ProgressiveEval can be used instead of a SortPreservingMerge.

In the following example, the input streams have non overlaping lexical ranges in order and thus SortPreservingMerge and ProgressiveEval produce the same output

  • Partition 1: (1,100) --> (2,200)
  • Partition 2: (2,200) --> (2,200)
  • Partition 3: (2,300) --> (3,100)
                                      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                       
                                           ┏━━━━┳━━━━┓     │                Input streams:        
                                      │    ┃ A  ┃ B  ┃                      ✅ Non overlapping     
                                           ┣────╋────┫     │                ✅ Ordered             
                                      │    │ 1  │100 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 1  │200 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │100 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │200 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 2  │300 │                                            
                                           ├────┼────┤     │                                      
                                      │    │ 3  │100 │                                            
                                           └────┴────┘     │  Output                              
                                      └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                       
                                                ▲ ▲                                               
                     ┌──────────────────────────┘ └──────────────────────────┐                    
                     │                                                       │                    
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
                     ▲                                                       ▲                    
                     └──────────────────────────┐ ┌──────────────────────────┘                    
                                                │ │                                               
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                         
                                                                                                  
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                         
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                            
                          │  ┣────╋────┫    ┣────╋────┫    ┣────╋────┫  │                         
                             │ 1  │100 │    │ 2  │200 │    │ 2  │300 │                            
                          │  ├────┼────┤    └────┴────┘    ├────┼────┤  │                         
                             │ 1  │200 │                   │ 3  │100 │                            
                          │  ├────┼────┤                   └────┴────┘  │                         
                             │ 2  │100 │                                                          
                          │  └────┴────┘                                │                         
                                                                                                  
                          │  Partition 1    Partition 2    Partition 3  │                         
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                          
                                                                            Input                 

Counter Example 1: Out of order Partitions

In the following example, the input partitions still have non overlaping lexical ranges, but they are NOT in order. Therefore the the output of the ProgressiveEval (which concatenates the streams) is different than SortPreservingMerge, and thus in this case we can NOT use ProgressiveEvalExec:

          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
               ┏━━━━┳━━━━┓     │                                       ┏━━━━┳━━━━┓     │                   
          │    ┃ A  ┃ B  ┃                                        │    ┃ A  ┃ B  ┃       Input streams:    
               ┣────╋────┫     │                                       ┣────╋────┫     │ ✅ Non overlapping 
          │    │ 1  │100 │                                        │    │ 1  │100 │       ❌ Ordered         
               ├────┼────┤     │    Partition 3     Partition 3        ├────┼────┤     │                   
          │    │ 1  │200 │         comes before     comes after   │    │ 1  │200 │                         
               ├────┼────┤     │         │               │             ├────┼────┤     │                   
          │    │ 2  │100 │                                        │    │ 2  │100 │                         
               ╠════╬════╣     │         │               │             ├────┼────┤     │                   
          │    ║ 2  ║200 ║◀ ─ ─ ─ ─ ─ ─ ─                         │    │ 2  │300 │                         
               ╠────╬────╣     │                         │             ├────┼────┤     │                   
          │    │ 2  │300 │                                        │    │ 3  │100 │                         
               ├────┼────┤     │   Output                │             ╠────╬────╣     │                   
          │    │ 3  │100 │        (same as                ─ ─ ─ ─ ┼ ─ ▶║ 2  ║200 ║                         
               └────┴────┘     │   above)                              ╚════╩════╝     │   Output          
          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
                     ▲                                                       ▲                             
                     │                                                       │                             
                     │                                                       │                             
                     │                                                       │                             
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓         
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃         
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃         
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛         
                     ▲                                                       ▲                             
                     └──────────────────────────┐ ┌──────────────────────────┘                             
                                                │ │                                                        
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                                  
                                                                                                           
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                                  
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                                     
                          │  ┣────╋────┫    ┣────╋────┫    ┣────╋────┫  │                                  
                             │ 1  │100 │    │ 2  │300 │    │ 2  │200 │                                     
                          │  ├────┼────┤    ├────┼────┤    └────┴────┘  │                                  
                             │ 1  │200 │    │ 3  │100 │                                                    
                          │  ├────┼────┤    └────┴────┘                 │                                  
                             │ 2  │100 │                                      Input                        
                          │  └────┴────┘                                │                                  
                                                                           Note Partition 2 and            
                          │  Partition 1    Partition 2    Partition 3  │   Partition 3 are in             
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─      different order              

Counter Example 2: Overlapping Partitions

When we have partitions that do overlap in lexical ranges, it is more clear that the output of the two operators is different. When ProgressiveEval appends the input streams together they will not be sorted as shown in the following figure

          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
               ┏━━━━┳━━━━┓     │                                       ┏━━━━┳━━━━┓     │                   
          │    ┃ A  ┃ B  ┃                 Partition 3            │    ┃ A  ┃ B  ┃       Input streams:    
               ┣────╋────┫     │          comes before                 ┣────╋────┫     │ ❌ Non overlapping 
          │    │ 1  │100 │                      │                 │    │ 1  │100 │       ✅ Ordered         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 1  │200 │                      │                 │    │ 1  │200 │                         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 2  │200 │                      │        ┌ ─ ─ ─ ─│─ ─▶│ 2  │250 │                         
               ├────┼────┤     │                                       ├────┼────┤     │                   
          │    │ 2  │250 │◀ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘        │        │    │ 2  │200 │                         
               ├────┼────┤     │                    Partition 3        ├────┼────┤     │                   
          │    │ 2  │300 │                          comes after   │    │ 2  │300 │                         
               ├────┼────┤     │                    Partition 2        ├────┼────┤     │                   
          │    │ 3  │100 │                                        │    │ 3  │100 │                         
               └────┴────┘     │                                       └────┴────┘     │   Output          
          └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                    
                     ▲                                                       ▲                             
                     │                                                       │                             
                     │                                                       │                             
                     │                                                       │                             
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓              ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓         
┃          SortPreservingMerge           ┃              ┃            ProgressiveEval             ┃         
┃         exprs = [a ASC, b ASC]         ┃              ┃                                        ┃         
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛              ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛         
                     ▲                                                       ▲                             
                     └──────────────────────────┐ ┌──────────────────────────┘                             
                                                │ │                                                        
                          ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐                                  
                                                                                                           
                          │  ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓    ┏━━━━┳━━━━┓  │                                  
                             ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃    ┃ A  ┃ B  ┃                                     
                          │  ┣────╋────┫    ┣────╋────┫    ┣━━━━╋────┫  │                                  
                             │ 1  │100 │    │ 2  │200 │    ┃ 2  │300 │                                     
                          │  ├────┼────┤    └────┴────┘    ┣────┼────┤  │        Input                     
                             │ 1  │200 │                   │ 3  │100 │                                     
                          │  ├────┼────┤                   └────┴────┘  │   Partition 3 overlaps           
                             │ 2  │250 │◀─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ with partition 1's            
                          │  └────┴────┘                                │       lexical range              
                                                                                                           
                          │  Partition 1    Partition 2    Partition 3  │                                  
                           ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─                                   

Proposed Algorithm

Step 1: find min/max values for each sort key column for for each input partiton to a SortPreservingMerge.

We can not get this information reliably from DataFusion statistics yet due to

However, internally at influx we have a special analysis that works for time.

We can also use the EquivalenceProperties::constants to determine min/max values for constants (what is needed for OrderUnionSortedInputsForConstants)

If we can't determine all min/max values ==> no transform

Step 2: Determine if the Lexical Space overlaps

The algorithm that converts mins/maxes to arrow arrays and then calls the rank kernel on it

Details

/// Sort the given vector by the given value ranges
/// Return none if
///    . the number of plans is not the same as the number of value ranges
///    . the value ranges overlap
/// Return the sorted plans and the sorted vallue ranges
fn sort_by_value_ranges<T>(
    plans: Vec<T>,
    value_ranges: Vec<(ScalarValue, ScalarValue)>,
    sort_options: SortOptions,
) -> Result<Option<SortedVecAndValueRanges<T>>> {
    if plans.len() != value_ranges.len() {
        trace!(
            plans.len = plans.len(),
            value_ranges.len = value_ranges.len(),
            "--------- number of plans is not the same as the number of value ranges"
        );
        return Ok(None);
    }

    if overlap(&value_ranges)? {
        trace!("--------- value ranges overlap");
        return Ok(None);
    }

    // get the min value of each value range
    let min_iter = value_ranges.iter().map(|(min, _)| min.clone());
    let mins = ScalarValue::iter_to_array(min_iter)?;

    // rank the min values
    let ranks = rank(&*mins, Some(sort_options))?;

    // no need to sort if it is already sorted
    if ranks.iter().zip(ranks.iter().skip(1)).all(|(a, b)| a <= b) {
        return Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)));
    }

    // sort the plans by the ranks of their min values
    let mut plan_rank_zip: Vec<(T, u32)> = plans
        .into_iter()
        .zip(ranks.iter().copied())
        .collect::<Vec<_>>();
    plan_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
    let plans = plan_rank_zip
        .into_iter()
        .map(|(plan, _)| plan)
        .collect::<Vec<_>>();

    // Sort the value ranges by the ranks of their min values
    let mut value_range_rank_zip: Vec<((ScalarValue, ScalarValue), u32)> =
        value_ranges.into_iter().zip(ranks).collect::<Vec<_>>();
    value_range_rank_zip.sort_by(|(_, min1), (_, min2)| min1.cmp(min2));
    let value_ranges = value_range_rank_zip
        .into_iter()
        .map(|(value_range, _)| value_range)
        .collect::<Vec<_>>();

    Ok(Some(SortedVecAndValueRanges::new(plans, value_ranges)))
}

We will need to order the inputs by minimum value and then ensure:

  1. All maxes are in ascending order
  2. min_{i} <= max_{j+i} for all rows i > j

If we the lexical space overlaps ==> no transform

Step 3: Reorder inputs, if needed and possible

If the input partitions are non overlapping, attempt to reorder the input partitons if needed

It is possible to reorder input partitions for certian plans such as UnionExec and ParquetExec, but it is not possible to reorder the input partitions for others (like RepartitionExec)

If we cannot reorder the partitions to have a lexical overlap ==> no transform

@alamb alamb added the enhancement New feature or request label Mar 12, 2025
@alamb alamb changed the title Analysis to suppor SortPreservingMerge --> ProgressiveEval Analysis to supportSortPreservingMerge --> ProgressiveEval Mar 12, 2025
@suremarc
Copy link
Contributor

suremarc commented Mar 13, 2025

Thanks for the writeup. The core idea makes sense, but I have a couple of comments on the design, in particular I think we can do better in a few ways.

1. Some but not all partitions overlap

As I understand it, the current implementation gives up if any two partitions are overlapping. This is fine if your target use case involves queries with no overlapping partitions, but we can do better.
Basically, if we take the proposed algorithm and add another step at the end where we apply "first fit", instead of producing only one lexical ordering or failing fast, we can produce multiple groups of "chained" input partitions, each ordered lexically by the sort key within themselves, and we can optimize the query to use the minimum number of chains possible.

Let me give an example:

SELECT * FROM recent_table_1
WHERE time > now() - INTERVAL 1 DAY
UNION ALL
SELECT * FROM recent_table_2
WHERE time > now() - INTERVAL 1 DAY
UNION ALL
SELECT * FROM historic_table
WHERE time <= now() - INTERVAL 1 DAY
ORDER BY time ASC

Incidentally this is the use case I am targeting. Anyway, this query would result in at least 3 partitions, two of which are overlapping in time. If we could generalize ProgressiveEval to have multiple partition groupings, we could do something like this:

SortPreservingMergeExec: time ASC
  ProgressiveEval: partitions=[2, 0], [1]
    UnionExec: partitions=[0, 1, 2]
      TableExec: recent_table_1
      TableExec: recent_table_2
      TableExec: historic_table

This would concatenate partitions 2 and 0, and partition 1 remains unchanged. Then a final SortPreservingMergeExec is required to merge these into one sorted stream.

The "first fit" algorithm has actually already been implemented in FileScanConfig::split_groups_by_statistics which uses the MinMaxStatistics helper to analyze min/maxes, however it was written to be used in ParquetExec (now deprecated and replaced with DataSourceExec I believe).

I believe this change could be retrofitted onto ProgressiveEval with not too much additional complexity. The analysis is already implemented, we just need to use it. Or we can take the implementation from influxdb and just add the final "first fit" step.

2. Scanning non-overlapping Parquet files

I see that one of the target use cases for ProgressiveEval is #6672. I am a little curious to see the implementation, because the way I see it, ProgressiveEval will solve some but not all instances of this problem.

Here is the issue I am worried about. When you use ListingTable you have a config option called target_partitions. By default it is set to the number of available cpu cores on the system. If the number of files exceeds target_partitions then it will start to merge files into the same partitions with no guarantees on ordering. Let me demonstrate using datafusion-cli:

> SET datafusion.execution.target_partitions=2;
0 row(s) fetched. 
Elapsed 0.000 seconds.

> CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);
0 row(s) fetched. 
Elapsed 0.002 seconds.

> INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');
+-------+
| count |
+-------+
| 4     |
+-------+
1 row(s) fetched. 
Elapsed 0.004 seconds.

> EXPLAIN SELECT * FROM t1 ORDER BY id ASC;
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                           |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.id ASC NULLS LAST                                                                                                                                                                                                                                                                     |
|               |   TableScan: t1 projection=[id, date]                                                                                                                                                                                                                                                          |
| physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]                                                                                                                                                                                                                                                 |
|               |   SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                           |
|               |     DataSourceExec: file_groups={2 groups: [[./data/date=2025-03-01/9nxILoicy2uUAt7r.parquet, ./data/date=2025-03-02/9nxILoicy2uUAt7r.parquet], [./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet, ./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.002 seconds.

> EXPLAIN SELECT * FROM t1 WHERE date > '2025-03-02' ORDER BY id ASC;
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                              |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.id ASC NULLS LAST                                                                                                                                                                                                        |
|               |   TableScan: t1 projection=[id, date], full_filters=[t1.date > Date32("2025-03-02")]                                                                                                                                              |
| physical_plan | SortPreservingMergeExec: [id@0 ASC NULLS LAST]                                                                                                                                                                                    |
|               |   DataSourceExec: file_groups={2 groups: [[./data/date=2025-03-03/9nxILoicy2uUAt7r.parquet], [./data/date=2025-03-04/9nxILoicy2uUAt7r.parquet]]}, projection=[id, date], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                   |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.001 seconds.

When there are more than 2 files in the scan, the partitions get merged in no particular order. In particular any ordering on id ASC is lost, our partitions end up having id's of [4, 3] and [2, 1]. If ProgressiveEval were to run on this plan, it would see that the input partitions are not ordered and give up.

IMO, for this use case, ProgressiveEval is trying to solve a problem it does not have total control over. The files are not visible above the DataSourceExec, and it has no ability to reorder the files, even though we know we could avoid a sort here. There is also the potential issue of RepartitionExec and other nodes sitting in between ProgressiveEval and DataSourceExec.

I don't mean to tout my own horn too much, but in fact this exact use case is what FileScanConfig::split_groups_by_statistics was written to solve. We can solve the problem locally at the DataSourceExec, which I think is the right place to do it. It's still gated behind a feature flag, I have not been able to dedicate the time to set up benchmarks for ListingTable which I think is required to take this feature out of being experimental and ship it.

Conclusion

Basically I think the design as-is is good enough to include in DataFusion, but I would like to see it generalized a bit, but I also think it may not completely solve #6672. That said I think it will solve other problems, including optimizing queries with non-overlapping unions in them.

I apologize if I come off as a bit overbearing 😅 but this issue is near and dear to my heart. Eliminating sorts has been one of the most important things in my team's project, and it sounds like InfluxDB has been dealing with the same issue.

@alamb
Copy link
Contributor Author

alamb commented Mar 17, 2025

Incidentally this is the use case I am targeting. Anyway, this query would result in at least 3 partitions, two of which are overlapping in time. If we could generalize ProgressiveEval to have multiple partition groupings, we could do something like this:

This is a neat idea -- basically a cascade of progressive evals to avoid some (but not all) merging

I see that one of the target use cases for ProgressiveEval is #6672. I am a little curious to see the implementation, because the way I see it, ProgressiveEval will solve some but not all instances of this problem.

Our progressive eval implementation is here https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query/src/provider/progressive_eval.rs

(there is a PR to add it to Datafusion here: #10490)

Here is the issue I am worried about. When you use ListingTable you have a config option called target_partitions. By default it is set to the number of available cpu cores on the system. If the number of files exceeds target_partitions then it will start to merge files into the same partitions with no guarantees on ordering. Let me demonstrate using datafusion-cli:

If the number of files exceeds target_partitions then it will start to merge files into the same partitions with no guarantees on ordering.

I think it uses FileGroupPartitioner that maintains the same ordering... But maybe it is a different ordering than you have in mind

You can define an order using WITH ORDER like this:

DataFusion CLI v46.0.1
> copy (values (4, '2025-03-01')) to '/tmp/test/1.parquet';
+-------+
| count |
+-------+
| 1     |
+-------+
1 row(s) fetched.
Elapsed 0.004 seconds.

> copy (values (3, '2025-03-02')) to '/tmp/test/2.parquet';
+-------+
| count |
+-------+
| 1     |
+-------+
1 row(s) fetched.
Elapsed 0.004 seconds.

> create external table test stored as parquet location '/tmp/test' with order (column2 ASC);
0 row(s) fetched.
Elapsed 0.006 seconds.

Then the sort is not needed:

> explain select * from test order by column2 asc;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                  |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: test.column2 ASC NULLS LAST                                                                                                                                                                                                                                                                     |
|               |   TableScan: test projection=[column1, column2]                                                                                                                                                                                                                                                       |
| physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]                                                                                                                                                                                                                                                   |
|               |   DataSourceExec: file_groups={16 groups: [[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], [tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], [tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                       |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.002 seconds.

Though datafusion will put it when needed

> explain select * from test order by column2 desc;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                    |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: test.column2 DESC NULLS FIRST                                                                                                                                                                                                                                                                     |
|               |   TableScan: test projection=[column1, column2]                                                                                                                                                                                                                                                         |
| physical_plan | SortPreservingMergeExec: [column2@1 DESC]                                                                                                                                                                                                                                                               |
|               |   SortExec: expr=[column2@1 DESC], preserve_partitioning=[true]                                                                                                                                                                                                                                         |
|               |     DataSourceExec: file_groups={16 groups: [[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], [tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], [tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                         |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.012 seconds.

I apologize if I come off as a bit overbearing 😅 but this issue is near and dear to my heart. Eliminating sorts has been one of the most important things in my team's project, and it sounds like InfluxDB has been dealing with the same issue.

Not at all! Yes we have done a lot of this (as have @berkaysynnada @ozankabak and @akurmustafa at Synnada). It is a very important optimization

@alamb
Copy link
Contributor Author

alamb commented Mar 17, 2025

I don't mean to tout my own horn too much, but in fact this exact use case is what FileScanConfig::split_groups_by_statistics was written to solve. We can solve the problem locally at the DataSourceExec, which I think is the right place to do it. It's still gated behind a feature flag, I have not been able to dedicate the time to set up benchmarks for ListingTable which I think is required to take this feature out of being experimental and ship it.

I agree -- in my mind this is all related -- when trying to take maximum advantage of pre-existing orderings I do think the optimizer should be more careful.

@suremarc
Copy link
Contributor

I think it uses FileGroupPartitioner that maintains the same ordering... But maybe it is a different ordering than you have in mind

AFAICT FileGroupPartitioner is able to maintain the same ordering as what it is given, yes. But the initial ordering for the partitions as determined by ListingTable is ultimately just sorting by the object store paths (see here), which may not match the table order. This is the case in my (somewhat contrived) example above, where the table order is id ASC but id decreases as date increases.

For a more realistic example where the table order and object store path order don't match, consider a horizontally partitioned table, something like this:

DataFusion CLI v46.0.0
> SET datafusion.execution.target_partitions=2;
0 row(s) fetched. 
Elapsed 0.001 seconds.

> CREATE EXTERNAL TABLE t1 (time TIMESTAMP, date DATE, shard INT) STORED AS PARQUET LOCATION '/tmp/data/' PARTITIONED BY (date, shard) WITH ORDER (time ASC);
0 row(s) fetched. 
Elapsed 0.003 seconds.

> INSERT INTO t1 VALUES 
('2025-03-01 00:00:01', '2025-03-01', 0), 
('2025-03-01 00:00:00', '2025-03-01', 1), 
('2025-03-02 00:00:00', '2025-03-02', 0), 
('2025-03-02 00:00:02', '2025-03-02', 1);
+-------+
| count |
+-------+
| 4     |
+-------+
1 row(s) fetched. 
Elapsed 0.011 seconds.

> EXPLAIN SELECT * FROM t1 ORDER BY time ASC;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                            |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: t1.time ASC NULLS LAST                                                                                                                                                                                                                                                                                                                    |
|               |   TableScan: t1 projection=[time, date, shard]                                                                                                                                                                                                                                                                                                  |
| physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]                                                                                                                                                                                                                                                                                                |
|               |   SortExec: expr=[time@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                                                                                                                                                                                          |
|               |     DataSourceExec: file_groups={2 groups: [[tmp/data/date=2025-03-01/shard=0/8eTZY2WyyhnV7Klv.parquet, tmp/data/date=2025-03-01/shard=1/8eTZY2WyyhnV7Klv.parquet], [tmp/data/date=2025-03-02/shard=0/8eTZY2WyyhnV7Klv.parquet, tmp/data/date=2025-03-02/shard=1/8eTZY2WyyhnV7Klv.parquet]]}, projection=[time, date, shard], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                                                                 |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched. 
Elapsed 0.001 seconds.

Sorting by the object store paths gives us a lexsort by date and then shard, which leads to a loss of ordering within partitions. ProgressiveEval will not be able to fix this alone unless it is able to manipulate DataSourceExec's files.

Then the sort is not needed:

explain select * from test order by column2 asc;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                  |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: test.column2 ASC NULLS LAST                                                                                                                                                                                                                                                                     |
|               |   TableScan: test projection=[column1, column2]                                                                                                                                                                                                                                                       |
| physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]                                                                                                                                                                                                                                                   |
|               |   DataSourceExec: file_groups={16 groups: [[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], [tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], [tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
|               |                                                                                                                                                                                                                                                                                                       |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.002 seconds.

The only reason it is not needed here is because there are fewer files than target_partitions, so this will not work if we increase the number of files or reduce target_partitions. If we set target_partitions to 1 then it requires a sort:

> SET datafusion.execution.target_partitions=1;
0 row(s) fetched. 
Elapsed 0.000 seconds.

> EXPLAIN SELECT * FROM test ORDER BY column2 ASC;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                  |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Sort: test.column2 ASC NULLS LAST                                                                                                     |
|               |   TableScan: test projection=[column1, column2]                                                                                       |
| physical_plan | SortExec: expr=[column2@1 ASC NULLS LAST], preserve_partitioning=[false]                                                              |
|               |   DataSourceExec: file_groups={1 group: [[tmp/test/1.parquet, tmp/test/2.parquet]]}, projection=[column1, column2], file_type=parquet |
|               |                                                                                                                                       |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------+

target_partitions is usually the number of cpu cores, so typically 16 or so, but the problem will still appear for tables with more than 16 files (which is most tables that I have worked with).

@xudong963
Copy link
Member

The only reason it is not needed here is because there are fewer files than target_partitions, so this will not work if we increase the number of files or reduce target_partitions. If we set target_partitions to 1 then it requires a sort:

I reread the codebase, and also think so.

FileScanConfig::split_groups_by_statistics definitely can solve the problem, then we can remove unnecessary SortExec which will be significant gains!

One question: Is there something that makes it difficult to turn on split_groups_by_statistics by default?

@leoyvens
Copy link
Contributor

@xudong963 see #10336, that flag groups Parquet scanning to a single partition. ProgressiveEval here is proposed to preserve the partitioning of the input scan, and to run 2 inputs at a time, so would preserve some parallelism.

@xudong963
Copy link
Member

@xudong963 see #10336, that flag groups Parquet scanning to a single partition. ProgressiveEval here is proposed to preserve the partitioning of the input scan, and to run 2 inputs at a time, so would preserve some parallelism.

Got it, thank you

@xudong963
Copy link
Member

Hi @alamb @wiedld , how's it going? Can I do something to help?

@suremarc
Copy link
Contributor

The only reason it is not needed here is because there are fewer files than target_partitions, so this will not work if we increase the number of files or reduce target_partitions. If we set target_partitions to 1 then it requires a sort:

I reread the codebase, and also think so.

FileScanConfig::split_groups_by_statistics definitely can solve the problem, then we can remove unnecessary SortExec which will be significant gains!

One question: Is there something that makes it difficult to turn on split_groups_by_statistics by default?

I left my full answer on that issue as I don't want to take over this issue too much, but TL;DR we need benchmarks for tables with large numbers of files.

@alamb
Copy link
Contributor Author

alamb commented Mar 27, 2025

Hi @alamb @wiedld , how's it going? Can I do something to help?

We are a bit blocked on some of the overlap analysis -- I am going to try and pitch in and see if we can push something forward

@xudong963
Copy link
Member

@alamb Fyi: here is a POC: xudong963#4

The PR glues three PRs:

@alamb
Copy link
Contributor Author

alamb commented Mar 28, 2025

@alamb Fyi: here is a POC: xudong963#4

@wiedld - can you please review the above PRs and work with @xudong963 to make sure our implementations are aligned?

@alamb
Copy link
Contributor Author

alamb commented Apr 1, 2025

@wiedld has created a PR with a version of the lexical range PR for review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants