Skip to content

Commit 02f6519

Browse files
committed
fix
1 parent 2b49385 commit 02f6519

File tree

3 files changed

+106
-44
lines changed

3 files changed

+106
-44
lines changed

kernel/src/engine/arrow_data.rs

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,14 @@ where
100100

101101
fn get(&self, row_index: usize, index: usize) -> String {
102102
let arry = self.value(row_index);
103-
let sarry = arry.as_string::<i32>();
104-
sarry.value(index).to_string()
103+
// Try both i32 (StringArray) and i64 (LargeStringArray) offsets
104+
if let Some(sarry) = arry.as_string_opt::<i32>() {
105+
sarry.value(index).to_string()
106+
} else if let Some(sarry) = arry.as_string_opt::<i64>() {
107+
sarry.value(index).to_string()
108+
} else {
109+
String::new()
110+
}
105111
}
106112

107113
fn materialize(&self, row_index: usize) -> Vec<String> {
@@ -118,13 +124,28 @@ impl EngineMap for MapArray {
118124
let offsets = self.offsets();
119125
let start_offset = offsets[row_index] as usize;
120126
let count = offsets[row_index + 1] as usize - start_offset;
121-
let keys = self.keys().as_string::<i32>();
122-
for (idx, map_key) in keys.iter().enumerate().skip(start_offset).take(count) {
123-
if let Some(map_key) = map_key {
124-
if key == map_key {
125-
// found the item
126-
let vals = self.values().as_string::<i32>();
127-
return Some(vals.value(idx));
127+
128+
// Try both i32 (StringArray) and i64 (LargeStringArray) offsets
129+
if let (Some(keys), Some(vals)) = (
130+
self.keys().as_string_opt::<i32>(),
131+
self.values().as_string_opt::<i32>(),
132+
) {
133+
for (idx, map_key) in keys.iter().enumerate().skip(start_offset).take(count) {
134+
if let Some(map_key) = map_key {
135+
if key == map_key {
136+
return Some(vals.value(idx));
137+
}
138+
}
139+
}
140+
} else if let (Some(keys), Some(vals)) = (
141+
self.keys().as_string_opt::<i64>(),
142+
self.values().as_string_opt::<i64>(),
143+
) {
144+
for (idx, map_key) in keys.iter().enumerate().skip(start_offset).take(count) {
145+
if let Some(map_key) = map_key {
146+
if key == map_key {
147+
return Some(vals.value(idx));
148+
}
128149
}
129150
}
130151
}
@@ -134,11 +155,25 @@ impl EngineMap for MapArray {
134155
fn materialize(&self, row_index: usize) -> HashMap<String, String> {
135156
let mut ret = HashMap::new();
136157
let map_val = self.value(row_index);
137-
let keys = map_val.column(0).as_string::<i32>();
138-
let values = map_val.column(1).as_string::<i32>();
139-
for (key, value) in keys.iter().zip(values.iter()) {
140-
if let (Some(key), Some(value)) = (key, value) {
141-
ret.insert(key.into(), value.into());
158+
159+
// Try both i32 (StringArray) and i64 (LargeStringArray) offsets
160+
if let (Some(keys), Some(values)) = (
161+
map_val.column(0).as_string_opt::<i32>(),
162+
map_val.column(1).as_string_opt::<i32>(),
163+
) {
164+
for (key, value) in keys.iter().zip(values.iter()) {
165+
if let (Some(key), Some(value)) = (key, value) {
166+
ret.insert(key.into(), value.into());
167+
}
168+
}
169+
} else if let (Some(keys), Some(values)) = (
170+
map_val.column(0).as_string_opt::<i64>(),
171+
map_val.column(1).as_string_opt::<i64>(),
172+
) {
173+
for (key, value) in keys.iter().zip(values.iter()) {
174+
if let (Some(key), Some(value)) = (key, value) {
175+
ret.insert(key.into(), value.into());
176+
}
142177
}
143178
}
144179
ret
@@ -277,19 +312,24 @@ impl ArrowEngineData {
277312
data_type: &DataType,
278313
col: &'a dyn Array,
279314
) -> DeltaResult<&'a dyn GetData<'a>> {
280-
use ArrowDataType::Utf8;
315+
use ArrowDataType::{LargeUtf8, Utf8, Utf8View};
316+
317+
// Helper to check if a type is a string type (Utf8, LargeUtf8, or Utf8View)
318+
let is_string_type = |dt: &ArrowDataType| matches!(dt, Utf8 | LargeUtf8 | Utf8View);
319+
281320
let col_as_list = || {
282321
if let Some(array) = col.as_list_opt::<i32>() {
283-
(array.value_type() == Utf8).then_some(array as _)
322+
is_string_type(&array.value_type()).then_some(array as _)
284323
} else if let Some(array) = col.as_list_opt::<i64>() {
285-
(array.value_type() == Utf8).then_some(array as _)
324+
is_string_type(&array.value_type()).then_some(array as _)
286325
} else {
287326
None
288327
}
289328
};
290329
let col_as_map = || {
291330
col.as_map_opt().and_then(|array| {
292-
(array.key_type() == &Utf8 && array.value_type() == &Utf8).then_some(array as _)
331+
(is_string_type(&array.key_type()) && is_string_type(&array.value_type()))
332+
.then_some(array as _)
293333
})
294334
};
295335
let result: Result<&'a dyn GetData<'a>, _> = match data_type {
@@ -299,7 +339,7 @@ impl ArrowEngineData {
299339
}
300340
&DataType::STRING => {
301341
debug!("Pushing string array for {}", ColumnName::new(path));
302-
col.as_string_opt().map(|a| a as _).ok_or("string")
342+
col.as_string_opt::<i64>().map(|a| a as _).ok_or("string")
303343
}
304344
&DataType::INTEGER => {
305345
debug!("Pushing int32 array for {}", ColumnName::new(path));

kernel/src/engine/arrow_get_data.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::arrow::array::{
2-
types::{GenericStringType, Int32Type, Int64Type},
3-
Array, BooleanArray, GenericByteArray, GenericListArray, MapArray, OffsetSizeTrait,
4-
PrimitiveArray,
2+
types::{Int32Type, Int64Type},
3+
Array, BooleanArray, GenericListArray, LargeStringArray, MapArray, OffsetSizeTrait,
4+
PrimitiveArray, StringArray,
55
};
66

77
use crate::{
@@ -41,7 +41,17 @@ impl GetData<'_> for PrimitiveArray<Int64Type> {
4141
}
4242
}
4343

44-
impl<'a> GetData<'a> for GenericByteArray<GenericStringType<i32>> {
44+
impl<'a> GetData<'a> for StringArray {
45+
fn get_str(&'a self, row_index: usize, _field_name: &str) -> DeltaResult<Option<&'a str>> {
46+
if self.is_valid(row_index) {
47+
Ok(Some(self.value(row_index)))
48+
} else {
49+
Ok(None)
50+
}
51+
}
52+
}
53+
54+
impl<'a> GetData<'a> for LargeStringArray {
4555
fn get_str(&'a self, row_index: usize, _field_name: &str) -> DeltaResult<Option<&'a str>> {
4656
if self.is_valid(row_index) {
4757
Ok(Some(self.value(row_index)))

kernel/src/engine/arrow_utils.rs

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use crate::{
1818

1919
use crate::arrow::array::{
2020
cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray,
21-
MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
21+
GenericStringArray, LargeStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
22+
StringArray, StructArray,
2223
};
2324
use crate::arrow::buffer::NullBuffer;
2425
use crate::arrow::compute::concat_batches;
@@ -1012,24 +1013,35 @@ pub(crate) fn parse_json(
10121013
schema: SchemaRef,
10131014
) -> DeltaResult<Box<dyn EngineData>> {
10141015
let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into();
1015-
let json_strings = json_strings
1016-
.column(0)
1017-
.as_any()
1018-
.downcast_ref::<StringArray>()
1019-
.ok_or_else(|| {
1020-
Error::generic("Expected json_strings to be a StringArray, found something else")
1021-
})?;
1016+
let array_ref = json_strings.column(0);
10221017
let schema = Arc::new(ArrowSchema::try_from_kernel(schema.as_ref())?);
1023-
let result = parse_json_impl(json_strings, schema)?;
1024-
Ok(Box::new(ArrowEngineData::new(result)))
1018+
1019+
// Try LargeStringArray first
1020+
if let Some(large_strings) = array_ref.as_any().downcast_ref::<LargeStringArray>() {
1021+
let result = parse_json_impl(large_strings, schema)?;
1022+
return Ok(Box::new(ArrowEngineData::new(result)));
1023+
}
1024+
1025+
// Fall back to StringArray
1026+
if let Some(strings) = array_ref.as_any().downcast_ref::<StringArray>() {
1027+
let result = parse_json_impl(strings, schema)?;
1028+
return Ok(Box::new(ArrowEngineData::new(result)));
1029+
}
1030+
1031+
Err(Error::generic(
1032+
"Expected json_strings to be a StringArray or LargeStringArray, found something else",
1033+
))
10251034
}
10261035

10271036
// Raw arrow implementation of the json parsing. Separate from the public function for testing.
10281037
//
10291038
// NOTE: This code is really inefficient because arrow lacks the native capability to perform robust
10301039
// StringArray -> StructArray JSON parsing. See https://github.com/apache/arrow-rs/issues/6522. If
10311040
// that shortcoming gets fixed upstream, this method can simplify or hopefully even disappear.
1032-
fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaResult<RecordBatch> {
1041+
fn parse_json_impl<O: OffsetSizeTrait>(
1042+
json_strings: &GenericStringArray<O>,
1043+
schema: ArrowSchemaRef,
1044+
) -> DeltaResult<RecordBatch> {
10331045
if json_strings.is_empty() {
10341046
return Ok(RecordBatch::new_empty(schema));
10351047
}
@@ -1231,45 +1243,45 @@ mod tests {
12311243
ArrowField::new("c", ArrowDataType::Int32, true),
12321244
]));
12331245
let input: Vec<&str> = vec![];
1234-
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
1246+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone()).unwrap();
12351247
assert_eq!(result.num_rows(), 0);
12361248

12371249
let input: Vec<Option<&str>> = vec![Some("")];
1238-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1250+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12391251
result.expect_err("empty string");
12401252

12411253
let input: Vec<Option<&str>> = vec![Some(" \n\t")];
1242-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1254+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12431255
result.expect_err("empty string");
12441256

12451257
let input: Vec<Option<&str>> = vec![Some(r#""a""#)];
1246-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1258+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12471259
result.expect_err("invalid string");
12481260

12491261
let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#)];
1250-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1262+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12511263
result.expect_err("incomplete object");
12521264

12531265
let input: Vec<Option<&str>> = vec![Some("{}{}")];
1254-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1266+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12551267
assert!(matches!(
12561268
result.unwrap_err(),
12571269
Error::Generic(s) if s == "Malformed JSON: Multiple JSON objects"
12581270
));
12591271

12601272
let input: Vec<Option<&str>> = vec![Some(r#"{} { "a": 1"#)];
1261-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1273+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12621274
assert!(matches!(
12631275
result.unwrap_err(),
12641276
Error::Generic(s) if s == "Malformed JSON: Multiple JSON objects"
12651277
));
12661278

12671279
let input: Vec<Option<&str>> = vec![Some(r#"{ "a": 1"#), Some(r#", "b"}"#)];
1268-
let result = parse_json_impl(&input.into(), requested_schema.clone());
1280+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone());
12691281
result.expect_err("split object");
12701282

12711283
let input: Vec<Option<&str>> = vec![None, Some(r#"{"a": 1, "b": "2", "c": 3}"#), None];
1272-
let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap();
1284+
let result = parse_json_impl::<i32>(&input.into(), requested_schema.clone()).unwrap();
12731285
assert_eq!(result.num_rows(), 3);
12741286
assert_eq!(result.column(0).null_count(), 2);
12751287
assert_eq!(result.column(1).null_count(), 2);
@@ -1288,7 +1300,7 @@ mod tests {
12881300
let json_string = format!(r#"{{"long_val": "{long_string}"}}"#);
12891301
let input: Vec<Option<&str>> = vec![Some(&json_string)];
12901302

1291-
let batch = parse_json_impl(&input.into(), schema.clone()).unwrap();
1303+
let batch = parse_json_impl::<i32>(&input.into(), schema.clone()).unwrap();
12921304
assert_eq!(batch.num_rows(), 1);
12931305
let long_col = batch.column(0).as_string::<i32>();
12941306
assert_eq!(long_col.value(0), long_string);

0 commit comments

Comments
 (0)