@@ -37,9 +37,11 @@ use crate::physical_planner::create_physical_sort_exprs;
37
37
use arrow:: datatypes:: SchemaRef ;
38
38
use arrow:: record_batch:: RecordBatch ;
39
39
use datafusion_catalog:: Session ;
40
- use datafusion_common:: { not_impl_err, plan_err, Constraints , DFSchema , SchemaExt } ;
40
+ use datafusion_common:: stats:: Precision ;
41
+ use datafusion_common:: { not_impl_err, plan_err, Constraints , DFSchema , SchemaExt , Statistics } ;
41
42
pub use datafusion_datasource:: memory:: MemorySourceConfig ;
42
43
pub use datafusion_datasource:: source:: DataSourceExec ;
44
+
43
45
use datafusion_execution:: TaskContext ;
44
46
use datafusion_expr:: dml:: InsertOp ;
45
47
use datafusion_expr:: SortExpr ;
@@ -67,11 +69,13 @@ pub struct MemTable {
67
69
/// Optional pre-known sort order(s). Must be `SortExpr`s.
68
70
/// inserting data into this table removes the order
69
71
pub sort_order : Arc < Mutex < Vec < Vec < SortExpr > > > > ,
72
+ num_rows : usize ,
70
73
}
71
74
72
75
impl MemTable {
73
76
/// Create a new in-memory table from the provided schema and record batches
74
77
pub fn try_new ( schema : SchemaRef , partitions : Vec < Vec < RecordBatch > > ) -> Result < Self > {
78
+ let mut num_rows = 0_usize ;
75
79
for batches in partitions. iter ( ) . flatten ( ) {
76
80
let batches_schema = batches. schema ( ) ;
77
81
if !schema. contains ( & batches_schema) {
@@ -81,6 +85,7 @@ impl MemTable {
81
85
) ;
82
86
return plan_err ! ( "Mismatch between schema and batches" ) ;
83
87
}
88
+ num_rows += batches. num_rows ( ) ;
84
89
}
85
90
86
91
Ok ( Self {
@@ -92,6 +97,7 @@ impl MemTable {
92
97
constraints : Constraints :: empty ( ) ,
93
98
column_defaults : HashMap :: new ( ) ,
94
99
sort_order : Arc :: new ( Mutex :: new ( vec ! [ ] ) ) ,
100
+ num_rows
95
101
} )
96
102
}
97
103
@@ -215,6 +221,12 @@ impl TableProvider for MemTable {
215
221
TableType :: Base
216
222
}
217
223
224
+ fn statistics ( & self ) -> Option < Statistics > {
225
+ let mut stats = Statistics :: new_unknown ( & self . schema ) ;
226
+ stats. num_rows = Precision :: Inexact ( self . num_rows ) ;
227
+ Some ( stats)
228
+ }
229
+
218
230
async fn scan (
219
231
& self ,
220
232
state : & dyn Session ,
0 commit comments