Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,10 @@ impl ArrowEngineData {
debug!("Pushing string array for {}", ColumnName::new(path));
col.as_string_opt().map(|a| a as _).ok_or("string")
}
&DataType::BINARY => {
debug!("Pushing binary array for {}", ColumnName::new(path));
col.as_binary_opt().map(|a| a as _).ok_or("binary")
}
&DataType::INTEGER => {
debug!("Pushing int32 array for {}", ColumnName::new(path));
col.as_primitive_opt::<Int32Type>()
Expand Down Expand Up @@ -734,4 +738,138 @@ mod tests {

Ok(())
}

#[test]
fn test_binary_column_extraction() -> DeltaResult<()> {
use crate::arrow::array::BinaryArray;
use crate::engine_data::{GetData, RowVisitor};
use crate::schema::ColumnName;
use std::sync::LazyLock;

// Create a RecordBatch with binary data
let binary_data: Vec<Option<&[u8]>> = vec![
Some(b"hello"),
Some(b"world"),
None,
Some(b"\x00\x01\x02\x03"),
];
let binary_array = BinaryArray::from(binary_data.clone());

let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"data",
ArrowDataType::Binary,
true,
)]));

let batch = RecordBatch::try_new(schema, vec![Arc::new(binary_array)])?;
let arrow_data = ArrowEngineData::new(batch);

// Create a visitor to extract binary data
struct BinaryVisitor {
values: Vec<Option<Vec<u8>>>,
}

impl RowVisitor for BinaryVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["data"])]);
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
(&NAMES, &TYPES)
}

fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
assert_eq!(getters.len(), 1);
let getter = getters[0];

for i in 0..row_count {
self.values
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
}
Ok(())
}
}

let mut visitor = BinaryVisitor { values: vec![] };
arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor)?;
Comment on lines +798 to +799
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also do a test where we apply BinaryVisitor on something like ints or long?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, missed this one. I've added the test


// Verify the extracted values
assert_eq!(visitor.values.len(), 4);
assert_eq!(visitor.values[0].as_deref(), Some(b"hello".as_ref()));
assert_eq!(visitor.values[1].as_deref(), Some(b"world".as_ref()));
assert_eq!(visitor.values[2], None);
assert_eq!(
visitor.values[3].as_deref(),
Some(b"\x00\x01\x02\x03".as_ref())
);

Ok(())
}

#[test]
fn test_binary_column_extraction_type_mismatch() -> DeltaResult<()> {
use crate::engine_data::{GetData, RowVisitor};
use crate::schema::ColumnName;
use std::sync::LazyLock;

// Create a RecordBatch with Int32 data (not binary)
let data: Vec<Option<i32>> = vec![Some(123)];
let int_array = Int32Array::from(data);

let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"data",
ArrowDataType::Int32,
true,
)]));

let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)])?;
let arrow_data = ArrowEngineData::new(batch);

// Create a visitor that tries to extract binary data from an int column
struct BinaryVisitor {
values: Vec<Option<Vec<u8>>>,
}

impl RowVisitor for BinaryVisitor {
fn selected_column_names_and_types(
&self,
) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES: LazyLock<Vec<ColumnName>> =
LazyLock::new(|| vec![ColumnName::new(["data"])]);
static TYPES: LazyLock<Vec<DataType>> = LazyLock::new(|| vec![DataType::BINARY]);
(&NAMES, &TYPES)
}

fn visit<'a>(
&mut self,
row_count: usize,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<()> {
assert_eq!(getters.len(), 1);
let getter = getters[0];

for i in 0..row_count {
self.values
.push(getter.get_binary(i, "data")?.map(|b| b.to_vec()));
}
Ok(())
}
}

let mut visitor = BinaryVisitor { values: vec![] };
let result = arrow_data.visit_rows(&[ColumnName::new(["data"])], &mut visitor);

// Verify that we get a type mismatch error
assert_result_error_with_message(
result,
"Type mismatch on data: expected binary, got Int32",
);

Ok(())
}
}
12 changes: 11 additions & 1 deletion kernel/src/engine/arrow_get_data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::arrow::array::{
types::{GenericStringType, Int32Type, Int64Type},
types::{GenericBinaryType, GenericStringType, Int32Type, Int64Type},
Array, BooleanArray, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait,
PrimitiveArray,
};
Expand Down Expand Up @@ -51,6 +51,16 @@ impl<'a> GetData<'a> for GenericByteArray<GenericStringType<i32>> {
}
}

impl<'a> GetData<'a> for GenericByteArray<GenericBinaryType<i32>> {
fn get_binary(&'a self, row_index: usize, _field_name: &str) -> DeltaResult<Option<&'a [u8]>> {
if self.is_valid(row_index) {
Ok(Some(self.value(row_index)))
} else {
Ok(None)
}
}
}

impl<'a, OffsetSize> GetData<'a> for GenericListArray<OffsetSize>
where
OffsetSize: OffsetSizeTrait,
Expand Down
79 changes: 78 additions & 1 deletion kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{AsAny, DeltaResult, Error};
///
/// A value of `true` in the selection vector means the corresponding row is selected (i.e., not deleted),
/// while `false` means the row is logically deleted and should be ignored. If the selection vector is shorter
/// then the number of rows in `data` then all rows not covered by the selection vector are assumed to be selected.
/// than the number of rows in `data` then all rows not covered by the selection vector are assumed to be selected.
///
/// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior.
pub struct FilteredEngineData {
Expand Down Expand Up @@ -196,6 +196,7 @@ pub trait GetData<'a> {
(get_int, i32),
(get_long, i64),
(get_str, &'a str),
(get_binary, &'a [u8]),
(get_list, ListItem<'a>),
(get_map, MapItem<'a>)
);
Expand All @@ -217,6 +218,7 @@ impl<'a> GetData<'a> for () {
(get_int, i32),
(get_long, i64),
(get_str, &'a str),
(get_binary, &'a [u8]),
(get_list, ListItem<'a>),
(get_map, MapItem<'a>)
);
Expand Down Expand Up @@ -251,6 +253,7 @@ impl_typed_get_data!(
(get_int, i32),
(get_long, i64),
(get_str, &'a str),
(get_binary, &'a [u8]),
(get_list, ListItem<'a>),
(get_map, MapItem<'a>)
);
Expand Down Expand Up @@ -513,6 +516,80 @@ mod tests {
}
}

#[test]

fn test_get_binary_some_value() {
use crate::arrow::array::BinaryArray;

// Use Arrow's BinaryArray implementation
let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello"), Some(b"world"), None];
let binary_array = BinaryArray::from(binary_data);

// Cast to dyn GetData to use TypedGetData trait
let getter: &dyn GetData<'_> = &binary_array;

// Test getting first row
let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
assert_eq!(result, Some(b"hello".as_ref()));

// Test getting second row
let result: Option<&[u8]> = getter.get_opt(1, "binary_field").unwrap();
assert_eq!(result, Some(b"world".as_ref()));

// Test getting None value
let result: Option<&[u8]> = getter.get_opt(2, "binary_field").unwrap();
assert_eq!(result, None);
}

#[test]
fn test_get_binary_required() {
use crate::arrow::array::BinaryArray;

let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello")];
let binary_array = BinaryArray::from(binary_data);

// Cast to dyn GetData to use TypedGetData trait
let getter: &dyn GetData<'_> = &binary_array;

// Test using get() for required field
let result: &[u8] = getter.get(0, "binary_field").unwrap();
assert_eq!(result, b"hello");
}

#[test]
fn test_get_binary_required_missing() {
use crate::arrow::array::BinaryArray;

let binary_data: Vec<Option<&[u8]>> = vec![None];
let binary_array = BinaryArray::from(binary_data);

// Cast to dyn GetData to use TypedGetData trait
let getter: &dyn GetData<'_> = &binary_array;

// Test using get() for missing required field should error
let result: DeltaResult<&[u8]> = getter.get(0, "binary_field");
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("Data missing for field"));
}
}

#[test]
fn test_get_binary_empty_bytes() {
use crate::arrow::array::BinaryArray;

let binary_data: Vec<Option<&[u8]>> = vec![Some(b"")];
let binary_array = BinaryArray::from(binary_data);

// Cast to dyn GetData to use TypedGetData trait
let getter: &dyn GetData<'_> = &binary_array;

// Test getting empty bytes
let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
assert_eq!(result, Some([].as_ref()));
assert_eq!(result.unwrap().len(), 0);
}

#[test]
fn test_from_engine_data() {
let data = get_engine_data(3);
Expand Down
Loading