@@ -44,13 +44,13 @@ impl Free for ArrowInitData {}
44
44
struct ArrowVTab ;
45
45
46
46
unsafe fn address_to_arrow_schema ( address : usize ) -> FFI_ArrowSchema {
47
- let ptr = address as * const FFI_ArrowSchema ;
48
- std :: ptr :: read ( ptr)
47
+ let ptr = address as * mut FFI_ArrowSchema ;
48
+ * Box :: from_raw ( ptr)
49
49
}
50
50
51
51
unsafe fn address_to_arrow_array ( address : usize ) -> FFI_ArrowArray {
52
- let ptr = address as * const FFI_ArrowArray ;
53
- std :: ptr :: read ( ptr)
52
+ let ptr = address as * mut FFI_ArrowArray ;
53
+ * Box :: from_raw ( ptr)
54
54
}
55
55
56
56
unsafe fn address_to_arrow_ffi ( array : usize , schema : usize ) -> ( FFI_ArrowArray , FFI_ArrowSchema ) {
@@ -446,28 +446,49 @@ fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
446
446
// }
447
447
// }
448
448
449
+ /// Pass RecordBatch to duckdb.
450
+ ///
451
+ /// # Safety
452
+ /// The caller must ensure that the pointer is valid
453
+ /// It's recommended to always use this function with arrow()
454
+ pub fn arrow_recordbatch_to_query_params ( rb : RecordBatch ) -> [ usize ; 2 ] {
455
+ let data = ArrayData :: from ( StructArray :: from ( rb) ) ;
456
+ arrow_arraydata_to_query_params ( data)
457
+ }
458
+
459
+ /// Pass ArrayData to duckdb.
460
+ ///
461
+ /// # Safety
462
+ /// The caller must ensure that the pointer is valid
463
+ /// It's recommended to always use this function with arrow()
464
+ pub fn arrow_arraydata_to_query_params ( data : ArrayData ) -> [ usize ; 2 ] {
465
+ let array = FFI_ArrowArray :: new ( & data) ;
466
+ let schema = FFI_ArrowSchema :: try_from ( data. data_type ( ) ) . expect ( "Failed to convert schema" ) ;
467
+ arrow_ffi_to_query_params ( array, schema)
468
+ }
469
+
449
470
/// Pass array and schema as a pointer to duckdb.
450
471
///
451
472
/// # Safety
452
473
/// The caller must ensure that the pointer is valid
453
474
/// It's recommended to always use this function with arrow()
454
- pub unsafe fn arrow_ffi_to_query_params ( array : FFI_ArrowArray , schema : FFI_ArrowSchema ) -> [ usize ; 2 ] {
455
- let param = [ & array as * const _ as usize , & schema as * const _ as usize ] ;
456
- std :: mem :: forget ( array ) ;
457
- std :: mem :: forget ( schema ) ;
458
- param
475
+ pub fn arrow_ffi_to_query_params ( array : FFI_ArrowArray , schema : FFI_ArrowSchema ) -> [ usize ; 2 ] {
476
+ let arr = Box :: into_raw ( Box :: new ( array) ) ;
477
+ let sch = Box :: into_raw ( Box :: new ( schema ) ) ;
478
+
479
+ [ arr as * mut _ as usize , sch as * mut _ as usize ]
459
480
}
460
481
461
482
#[ cfg( test) ]
462
483
mod test {
463
- use super :: ArrowVTab ;
484
+ use super :: { arrow_recordbatch_to_query_params , ArrowVTab } ;
464
485
use crate :: { Connection , Result } ;
465
486
use arrow:: {
466
- array:: { ArrayData , Float64Array , StructArray } ,
467
- ffi :: { FFI_ArrowArray , FFI_ArrowSchema } ,
487
+ array:: { Float64Array , Int32Array } ,
488
+ datatypes :: { DataType , Field , Schema } ,
468
489
record_batch:: RecordBatch ,
469
490
} ;
470
- use std:: error:: Error ;
491
+ use std:: { error:: Error , sync :: Arc } ;
471
492
472
493
#[ test]
473
494
fn test_vtab_arrow ( ) -> Result < ( ) , Box < dyn Error > > {
@@ -478,12 +499,7 @@ mod test {
478
499
. prepare ( "SELECT * FROM read_parquet('./examples/int32_decimal.parquet');" ) ?
479
500
. query_arrow ( [ ] ) ?
480
501
. collect ( ) ;
481
- let data = ArrayData :: from ( StructArray :: from ( rbs. into_iter ( ) . next ( ) . unwrap ( ) ) ) ;
482
- let array = FFI_ArrowArray :: new ( & data) ;
483
- let schema = FFI_ArrowSchema :: try_from ( data. data_type ( ) ) . expect ( "Failed to convert schema" ) ;
484
- let param = [ & array as * const _ as usize , & schema as * const _ as usize ] ;
485
- std:: mem:: forget ( array) ;
486
- std:: mem:: forget ( schema) ;
502
+ let param = arrow_recordbatch_to_query_params ( rbs. into_iter ( ) . next ( ) . unwrap ( ) ) ;
487
503
let mut stmt = db. prepare ( "select sum(value) from arrow(?, ?)" ) ?;
488
504
let mut arr = stmt. query_arrow ( param) ?;
489
505
let rb = arr. next ( ) . expect ( "no record batch" ) ;
@@ -493,4 +509,25 @@ mod test {
493
509
assert_eq ! ( column. value( 0 ) , 300.0 ) ;
494
510
Ok ( ( ) )
495
511
}
512
+
513
+ #[ test]
514
+ fn test_vtab_arrow_rust_array ( ) -> Result < ( ) , Box < dyn Error > > {
515
+ let db = Connection :: open_in_memory ( ) ?;
516
+ db. register_table_function :: < ArrowVTab > ( "arrow" ) ?;
517
+
518
+ // This is a show case that it's easy for you to build an in-memory data
519
+ // and pass into duckdb
520
+ let schema = Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , false ) ] ) ;
521
+ let array = Int32Array :: from ( vec ! [ 1 , 2 , 3 , 4 , 5 ] ) ;
522
+ let rb = RecordBatch :: try_new ( Arc :: new ( schema) , vec ! [ Arc :: new( array) ] ) . expect ( "failed to create record batch" ) ;
523
+ let param = arrow_recordbatch_to_query_params ( rb) ;
524
+ let mut stmt = db. prepare ( "select sum(a)::int32 from arrow(?, ?)" ) ?;
525
+ let mut arr = stmt. query_arrow ( param) ?;
526
+ let rb = arr. next ( ) . expect ( "no record batch" ) ;
527
+ assert_eq ! ( rb. num_columns( ) , 1 ) ;
528
+ let column = rb. column ( 0 ) . as_any ( ) . downcast_ref :: < Int32Array > ( ) . unwrap ( ) ;
529
+ assert_eq ! ( column. len( ) , 1 ) ;
530
+ assert_eq ! ( column. value( 0 ) , 15 ) ;
531
+ Ok ( ( ) )
532
+ }
496
533
}
0 commit comments