@@ -73,11 +73,37 @@ impl MinMaxStatistics {
73
73
) -> Result < Self > {
74
74
use datafusion_common:: ScalarValue ;
75
75
76
- let statistics = statistics. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
76
+ let sort_columns = sort_columns_from_physical_sort_exprs ( sort_order) . ok_or (
77
+ DataFusionError :: Plan ( "sort expression must be on column" . to_string ( ) ) ,
78
+ ) ?;
79
+
80
+ let projection = sort_columns. iter ( ) . map ( |c| c. index ( ) ) . collect :: < Vec < _ > > ( ) ;
81
+
82
+ // Project the schema & sort order down to just the relevant columns
83
+
84
+ let projected_schema = Arc :: new ( schema. project ( & projection) ?) ;
85
+
86
+ let projected_sort_order = LexOrdering {
87
+ inner : sort_columns
88
+ . iter ( )
89
+ . zip ( sort_order. iter ( ) )
90
+ . enumerate ( )
91
+ . map ( |( i, ( col, sort) ) | PhysicalSortExpr {
92
+ expr : Arc :: new ( Column :: new ( col. name ( ) , i) ) ,
93
+ options : sort. options ,
94
+ } )
95
+ . collect :: < Vec < _ > > ( ) ,
96
+ } ;
77
97
78
- // Helper function to get min/max statistics for a given column of projected_schema
98
+ let projected_statistics = statistics
99
+ . into_iter ( )
100
+ . cloned ( )
101
+ . map ( |s| s. project ( Some ( & projection) ) )
102
+ . collect :: < Vec < _ > > ( ) ;
103
+
104
+ // Helper function to get min/max statistics
79
105
let get_min_max = |i : usize | -> Result < ( Vec < ScalarValue > , Vec < ScalarValue > ) > {
80
- Ok ( statistics
106
+ Ok ( projected_statistics
81
107
. iter ( )
82
108
. map ( |s| {
83
109
s. column_statistics [ i]
@@ -94,31 +120,11 @@ impl MinMaxStatistics {
94
120
. unzip ( ) )
95
121
} ;
96
122
97
- let sort_columns = sort_columns_from_physical_sort_exprs ( sort_order) . ok_or (
98
- DataFusionError :: Plan ( "sort expression must be on column" . to_string ( ) ) ,
99
- ) ?;
100
-
101
- // Project the schema & sort order down to just the relevant columns
102
- let min_max_schema = Arc :: new (
103
- schema
104
- . project ( & ( sort_columns. iter ( ) . map ( |c| c. index ( ) ) . collect :: < Vec < _ > > ( ) ) ) ?,
105
- ) ;
106
- let min_max_sort_order = LexOrdering {
107
- inner : sort_columns
108
- . iter ( )
109
- . zip ( sort_order. iter ( ) )
110
- . enumerate ( )
111
- . map ( |( i, ( col, sort) ) | PhysicalSortExpr {
112
- expr : Arc :: new ( Column :: new ( col. name ( ) , i) ) ,
113
- options : sort. options ,
114
- } )
115
- . collect :: < Vec < _ > > ( ) ,
116
- } ;
117
-
118
123
let ( min_values, max_values) : ( Vec < _ > , Vec < _ > ) = sort_columns
119
124
. iter ( )
120
- . map ( |c| {
121
- let ( min, max) = get_min_max ( c. index ( ) ) . map_err ( |e| {
125
+ . enumerate ( )
126
+ . map ( |( i, c) | {
127
+ let ( min, max) = get_min_max ( i) . map_err ( |e| {
122
128
e. context ( format ! ( "get min/max for column: '{}'" , c. name( ) ) )
123
129
} ) ?;
124
130
Ok ( (
@@ -132,14 +138,14 @@ impl MinMaxStatistics {
132
138
. unzip ( ) ;
133
139
134
140
Self :: new (
135
- & min_max_sort_order ,
136
- & min_max_schema ,
137
- RecordBatch :: try_new ( Arc :: clone ( & min_max_schema ) , min_values) . map_err (
141
+ & projected_sort_order ,
142
+ & projected_schema ,
143
+ RecordBatch :: try_new ( Arc :: clone ( & projected_schema ) , min_values) . map_err (
138
144
|e| {
139
145
DataFusionError :: ArrowError ( e, Some ( "\n create min batch" . to_string ( ) ) )
140
146
} ,
141
147
) ?,
142
- RecordBatch :: try_new ( Arc :: clone ( & min_max_schema ) , max_values) . map_err (
148
+ RecordBatch :: try_new ( Arc :: clone ( & projected_schema ) , max_values) . map_err (
143
149
|e| {
144
150
DataFusionError :: ArrowError ( e, Some ( "\n create max batch" . to_string ( ) ) )
145
151
} ,
0 commit comments