@@ -408,3 +408,172 @@ where
408
408
Ok ( ( ) )
409
409
}
410
410
}
411
+
412
+ #[ cfg( test) ]
413
+ mod tests {
414
+ use std:: collections:: BTreeMap ;
415
+ use std:: sync:: Arc ;
416
+
417
+ use crate :: aggregates:: group_values:: single_group_by:: primitive:: GroupValuesPrimitive ;
418
+ use crate :: aggregates:: group_values:: GroupValues ;
419
+ use arrow:: array:: { AsArray , Int64Array , NullBufferBuilder , UInt32Array } ;
420
+ use arrow:: datatypes:: { DataType , UInt32Type } ;
421
+ use datafusion_expr:: EmitTo ;
422
+ use datafusion_functions_aggregate_common:: aggregate:: groups_accumulator:: {
423
+ BlockedGroupIndexOperations , GroupIndexOperations ,
424
+ } ;
425
+
426
+ #[ test]
427
+ fn test_flat_primitive_group_values ( ) {
428
+ // Will cover such insert cases:
429
+ // 1.1 Non-null row + distinct
430
+ // 1.2 Null row + distinct
431
+ // 1.3 Non-null row + non-distinct
432
+ // 1.4 Null row + non-distinct
433
+ //
434
+ // Will cover such emit cases:
435
+ // 2.1 Emit first n
436
+ // 2.2 Emit all
437
+ // 2.3 Insert again + emit
438
+ let mut group_values = GroupValuesPrimitive :: < UInt32Type > :: new ( DataType :: UInt32 ) ;
439
+ let mut group_indices = vec ! [ ] ;
440
+
441
+ let data1 = Arc :: new ( UInt32Array :: from ( vec ! [
442
+ Some ( 1 ) ,
443
+ None ,
444
+ Some ( 1 ) ,
445
+ None ,
446
+ Some ( 2 ) ,
447
+ Some ( 3 ) ,
448
+ ] ) ) ;
449
+ let data2 = Arc :: new ( UInt32Array :: from ( vec ! [ Some ( 3 ) , None , Some ( 4 ) , Some ( 5 ) ] ) ) ;
450
+
451
+ // Insert case 1.1, 1.3, 1.4 + Emit case 2.1
452
+ group_values
453
+ . intern ( & vec ! [ data1. clone( ) as _] , & mut group_indices)
454
+ . unwrap ( ) ;
455
+
456
+ let mut expected = BTreeMap :: new ( ) ;
457
+ for ( & group_index, value) in group_indices. iter ( ) . zip ( data1. iter ( ) ) {
458
+ expected. insert ( group_index, value) ;
459
+ }
460
+ let mut expected = expected. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
461
+ let last_group_index = expected. len ( ) - 1 ;
462
+ let last_value = expected. last ( ) . unwrap ( ) . 1 ;
463
+ expected. pop ( ) ;
464
+
465
+ let emit_result = group_values. emit ( EmitTo :: First ( 3 ) ) . unwrap ( ) ;
466
+ let actual = emit_result[ 0 ]
467
+ . as_primitive :: < UInt32Type > ( )
468
+ . iter ( )
469
+ . enumerate ( )
470
+ . map ( |( group_idx, val) | {
471
+ assert ! ( group_idx < last_group_index) ;
472
+ ( group_idx, val)
473
+ } )
474
+ . collect :: < Vec < _ > > ( ) ;
475
+
476
+ assert_eq ! ( expected, actual) ;
477
+
478
+ // Insert case 1.1~1.3 + Emit case 2.2~2.3
479
+ group_values
480
+ . intern ( & vec ! [ data2. clone( ) as _] , & mut group_indices)
481
+ . unwrap ( ) ;
482
+
483
+ let mut expected = BTreeMap :: new ( ) ;
484
+ for ( & group_index, value) in group_indices. iter ( ) . zip ( data2. iter ( ) ) {
485
+ if group_index == 0 {
486
+ assert_eq ! ( last_value, value) ;
487
+ }
488
+ expected. insert ( group_index, value) ;
489
+ }
490
+ let expected = expected. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
491
+
492
+ let emit_result = group_values. emit ( EmitTo :: All ) . unwrap ( ) ;
493
+ let actual = emit_result[ 0 ]
494
+ . as_primitive :: < UInt32Type > ( )
495
+ . iter ( )
496
+ . enumerate ( )
497
+ . collect :: < Vec < _ > > ( ) ;
498
+
499
+ assert_eq ! ( expected, actual) ;
500
+ }
501
+
502
+ #[ test]
503
+ fn test_blocked_primitive_group_values ( ) {
504
+ // Will cover such insert cases:
505
+ // 1.1 Non-null row + distinct
506
+ // 1.2 Null row + distinct
507
+ // 1.3 Non-null row + non-distinct
508
+ // 1.4 Null row + non-distinct
509
+ //
510
+ // Will cover such emit cases:
511
+ // 2.1 Emit block
512
+ // 2.2 Insert again + emit block
513
+ //
514
+ let mut group_values = GroupValuesPrimitive :: < UInt32Type > :: new ( DataType :: UInt32 ) ;
515
+ let block_size = 2 ;
516
+ group_values. alter_block_size ( Some ( block_size) ) . unwrap ( ) ;
517
+ let mut group_indices = vec ! [ ] ;
518
+
519
+ let data1 = Arc :: new ( UInt32Array :: from ( vec ! [
520
+ Some ( 1 ) ,
521
+ None ,
522
+ Some ( 1 ) ,
523
+ None ,
524
+ Some ( 2 ) ,
525
+ Some ( 3 ) ,
526
+ ] ) ) ;
527
+ let data2 = Arc :: new ( UInt32Array :: from ( vec ! [ Some ( 3 ) , None , Some ( 4 ) ] ) ) ;
528
+
529
+ // Insert case 1.1, 1.3, 1.4 + Emit case 2.1
530
+ group_values
531
+ . intern ( & vec ! [ data1. clone( ) as _] , & mut group_indices)
532
+ . unwrap ( ) ;
533
+
534
+ let mut expected = BTreeMap :: new ( ) ;
535
+ for ( & packed_index, value) in group_indices. iter ( ) . zip ( data1. iter ( ) ) {
536
+ let block_id = BlockedGroupIndexOperations :: get_block_id ( packed_index as u64 ) ;
537
+ let block_offset =
538
+ BlockedGroupIndexOperations :: get_block_offset ( packed_index as u64 ) ;
539
+ let flatten_index = block_id as usize * block_size + block_offset as usize ;
540
+ expected. insert ( flatten_index, value) ;
541
+ }
542
+ let expected = expected. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
543
+
544
+ let emit_result1 = group_values. emit ( EmitTo :: NextBlock ) . unwrap ( ) ;
545
+ assert_eq ! ( emit_result1[ 0 ] . len( ) , block_size) ;
546
+ let emit_result2 = group_values. emit ( EmitTo :: NextBlock ) . unwrap ( ) ;
547
+ assert_eq ! ( emit_result2[ 0 ] . len( ) , block_size) ;
548
+ let iter1 = emit_result1[ 0 ] . as_primitive :: < UInt32Type > ( ) . iter ( ) ;
549
+ let iter2 = emit_result2[ 0 ] . as_primitive :: < UInt32Type > ( ) . iter ( ) ;
550
+ let actual = iter1. chain ( iter2) . enumerate ( ) . collect :: < Vec < _ > > ( ) ;
551
+
552
+ assert_eq ! ( actual, expected) ;
553
+
554
+ // Insert case 1.1~1.2 + Emit case 2.2
555
+ group_values
556
+ . intern ( & vec ! [ data2. clone( ) as _] , & mut group_indices)
557
+ . unwrap ( ) ;
558
+
559
+ let mut expected = BTreeMap :: new ( ) ;
560
+ for ( & packed_index, value) in group_indices. iter ( ) . zip ( data2. iter ( ) ) {
561
+ let block_id = BlockedGroupIndexOperations :: get_block_id ( packed_index as u64 ) ;
562
+ let block_offset =
563
+ BlockedGroupIndexOperations :: get_block_offset ( packed_index as u64 ) ;
564
+ let flatten_index = block_id as usize * block_size + block_offset as usize ;
565
+ expected. insert ( flatten_index, value) ;
566
+ }
567
+ let expected = expected. into_iter ( ) . collect :: < Vec < _ > > ( ) ;
568
+
569
+ let emit_result1 = group_values. emit ( EmitTo :: NextBlock ) . unwrap ( ) ;
570
+ assert_eq ! ( emit_result1[ 0 ] . len( ) , block_size) ;
571
+ let emit_result2 = group_values. emit ( EmitTo :: NextBlock ) . unwrap ( ) ;
572
+ assert_eq ! ( emit_result2[ 0 ] . len( ) , 1 ) ;
573
+ let iter1 = emit_result1[ 0 ] . as_primitive :: < UInt32Type > ( ) . iter ( ) ;
574
+ let iter2 = emit_result2[ 0 ] . as_primitive :: < UInt32Type > ( ) . iter ( ) ;
575
+ let actual = iter1. chain ( iter2) . enumerate ( ) . collect :: < Vec < _ > > ( ) ;
576
+
577
+ assert_eq ! ( actual, expected) ;
578
+ }
579
+ }
0 commit comments