Skip to content

Commit c26e427

Browse files
authored
feat: Support Utf8View in JSON reader (#7263)
* feat: Support Utf8View in JSON reader * Add code * Fix fmt * Address comments * Add benchmark * Add benchmark * Fix lint * Clean up comments
1 parent d5339f3 commit c26e427

File tree

3 files changed

+338
-1
lines changed

3 files changed

+338
-1
lines changed

arrow-json/src/reader/mod.rs

+143-1
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ use crate::reader::map_array::MapArrayDecoder;
154154
use crate::reader::null_array::NullArrayDecoder;
155155
use crate::reader::primitive_array::PrimitiveArrayDecoder;
156156
use crate::reader::string_array::StringArrayDecoder;
157+
use crate::reader::string_view_array::StringViewArrayDecoder;
157158
use crate::reader::struct_array::StructArrayDecoder;
158159
use crate::reader::tape::{Tape, TapeDecoder};
159160
use crate::reader::timestamp_array::TimestampArrayDecoder;
@@ -167,6 +168,7 @@ mod primitive_array;
167168
mod schema;
168169
mod serializer;
169170
mod string_array;
171+
mod string_view_array;
170172
mod struct_array;
171173
mod tape;
172174
mod timestamp_array;
@@ -732,6 +734,7 @@ fn make_decoder(
732734
DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
733735
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
734736
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
737+
DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
735738
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
736739
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
737740
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
@@ -751,7 +754,7 @@ mod tests {
751754
use std::io::{BufReader, Cursor, Seek};
752755

753756
use arrow_array::cast::AsArray;
754-
use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray};
757+
use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
755758
use arrow_buffer::{ArrowNativeType, Buffer};
756759
use arrow_cast::display::{ArrayFormatter, FormatOptions};
757760
use arrow_data::ArrayDataBuilder;
@@ -902,6 +905,145 @@ mod tests {
902905
assert_eq!(col2.value(4), "");
903906
}
904907

908+
#[test]
909+
fn test_long_string_view_allocation() {
910+
// The JSON input contains field "a" with different string lengths.
911+
// According to the implementation in the decoder:
912+
// - For a string, capacity is only increased if its length > 12 bytes.
913+
// Therefore, for:
914+
// Row 1: "short" (5 bytes) -> capacity += 0
915+
// Row 2: "this is definitely long" (24 bytes) -> capacity += 24
916+
// Row 3: "hello" (5 bytes) -> capacity += 0
917+
// Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
918+
// Expected total capacity = 24 + 17 = 41
919+
let expected_capacity: usize = 41;
920+
921+
let buf = r#"
922+
{"a": "short", "b": "dummy"}
923+
{"a": "this is definitely long", "b": "dummy"}
924+
{"a": "hello", "b": "dummy"}
925+
{"a": "\nfoobar😀asfgÿ", "b": "dummy"}
926+
"#;
927+
928+
let schema = Arc::new(Schema::new(vec![
929+
Field::new("a", DataType::Utf8View, true),
930+
Field::new("b", DataType::LargeUtf8, true),
931+
]));
932+
933+
let batches = do_read(buf, 1024, false, false, schema);
934+
assert_eq!(batches.len(), 1, "Expected one record batch");
935+
936+
// Get the first column ("a") as a StringViewArray.
937+
let col_a = batches[0].column(0);
938+
let string_view_array = col_a
939+
.as_any()
940+
.downcast_ref::<StringViewArray>()
941+
.expect("Column should be a StringViewArray");
942+
943+
// Retrieve the underlying data buffer from the array.
944+
// The builder pre-allocates capacity based on the sum of lengths for long strings.
945+
let data_buffer = string_view_array.to_data().buffers()[0].len();
946+
947+
// Check that the allocated capacity is at least what we expected.
948+
// (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
949+
assert!(
950+
data_buffer >= expected_capacity,
951+
"Data buffer length ({}) should be at least {}",
952+
data_buffer,
953+
expected_capacity
954+
);
955+
956+
// Additionally, verify that the decoded values are correct.
957+
assert_eq!(string_view_array.value(0), "short");
958+
assert_eq!(string_view_array.value(1), "this is definitely long");
959+
assert_eq!(string_view_array.value(2), "hello");
960+
assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
961+
}
962+
963+
/// Test the memory capacity allocation logic when converting numeric types to strings.
964+
#[test]
965+
fn test_numeric_view_allocation() {
966+
// For numeric types, the expected capacity calculation is as follows:
967+
// Row 1: 123456789 -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
968+
// Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
969+
// which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
970+
// Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
971+
// Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
972+
// Total expected capacity = 13 + 10 + 10 = 33 bytes.
973+
let expected_capacity: usize = 33;
974+
975+
let buf = r#"
976+
{"n": 123456789}
977+
{"n": 1000000000000}
978+
{"n": 3.1415}
979+
{"n": 2.718281828459045}
980+
"#;
981+
982+
let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));
983+
984+
let batches = do_read(buf, 1024, true, false, schema);
985+
assert_eq!(batches.len(), 1, "Expected one record batch");
986+
987+
let col_n = batches[0].column(0);
988+
let string_view_array = col_n
989+
.as_any()
990+
.downcast_ref::<StringViewArray>()
991+
.expect("Column should be a StringViewArray");
992+
993+
// Check that the underlying data buffer capacity is at least the expected value.
994+
let data_buffer = string_view_array.to_data().buffers()[0].len();
995+
assert!(
996+
data_buffer >= expected_capacity,
997+
"Data buffer length ({}) should be at least {}",
998+
data_buffer,
999+
expected_capacity
1000+
);
1001+
1002+
// Verify that the converted string values are correct.
1003+
// Note: The format of the number converted to a string should match the actual implementation.
1004+
assert_eq!(string_view_array.value(0), "123456789");
1005+
assert_eq!(string_view_array.value(1), "1000000000000");
1006+
assert_eq!(string_view_array.value(2), "3.1415");
1007+
assert_eq!(string_view_array.value(3), "2.718281828459045");
1008+
}
1009+
1010+
#[test]
1011+
fn test_string_with_uft8view() {
1012+
let buf = r#"
1013+
{"a": "1", "b": "2"}
1014+
{"a": "hello", "b": "shoo"}
1015+
{"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}
1016+
1017+
{"b": null}
1018+
{"b": "", "a": null}
1019+
1020+
"#;
1021+
let schema = Arc::new(Schema::new(vec![
1022+
Field::new("a", DataType::Utf8View, true),
1023+
Field::new("b", DataType::LargeUtf8, true),
1024+
]));
1025+
1026+
let batches = do_read(buf, 1024, false, false, schema);
1027+
assert_eq!(batches.len(), 1);
1028+
1029+
let col1 = batches[0].column(0).as_string_view();
1030+
assert_eq!(col1.null_count(), 2);
1031+
assert_eq!(col1.value(0), "1");
1032+
assert_eq!(col1.value(1), "hello");
1033+
assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
1034+
assert!(col1.is_null(3));
1035+
assert!(col1.is_null(4));
1036+
assert_eq!(col1.data_type(), &DataType::Utf8View);
1037+
1038+
let col2 = batches[0].column(1).as_string::<i64>();
1039+
assert_eq!(col2.null_count(), 1);
1040+
assert_eq!(col2.value(0), "2");
1041+
assert_eq!(col2.value(1), "shoo");
1042+
assert_eq!(col2.value(2), "\t😁foo");
1043+
assert!(col2.is_null(3));
1044+
assert_eq!(col2.value(4), "");
1045+
}
1046+
9051047
#[test]
9061048
fn test_complex() {
9071049
let buf = r#"
+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::builder::GenericByteViewBuilder;
19+
use arrow_array::types::StringViewType;
20+
use arrow_array::Array;
21+
use arrow_data::ArrayData;
22+
use arrow_schema::ArrowError;
23+
use std::fmt::Write;
24+
25+
use crate::reader::tape::{Tape, TapeElement};
26+
use crate::reader::ArrayDecoder;
27+
28+
const TRUE: &str = "true";
29+
const FALSE: &str = "false";
30+
31+
pub struct StringViewArrayDecoder {
32+
coerce_primitive: bool,
33+
}
34+
35+
impl StringViewArrayDecoder {
36+
pub fn new(coerce_primitive: bool) -> Self {
37+
Self { coerce_primitive }
38+
}
39+
}
40+
41+
impl ArrayDecoder for StringViewArrayDecoder {
42+
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
43+
let coerce = self.coerce_primitive;
44+
let mut data_capacity = 0;
45+
for &p in pos {
46+
// note that StringView is different that StringArray in that only
47+
// "long" strings (longer than 12 bytes) are stored in the buffer.
48+
// "short" strings are inlined into a fixed length structure.
49+
match tape.get(p) {
50+
TapeElement::String(idx) => {
51+
let s = tape.get_string(idx);
52+
// Only increase capacity if the string length is greater than 12 bytes
53+
if s.len() > 12 {
54+
data_capacity += s.len();
55+
}
56+
}
57+
TapeElement::Null => {
58+
// Do not increase capacity for null values
59+
}
60+
// For booleans, do not increase capacity (both "true" and "false" are less than
61+
// 12 bytes)
62+
TapeElement::True if coerce => {}
63+
TapeElement::False if coerce => {}
64+
// For Number, use the same strategy as for strings
65+
TapeElement::Number(idx) if coerce => {
66+
let s = tape.get_string(idx);
67+
if s.len() > 12 {
68+
data_capacity += s.len();
69+
}
70+
}
71+
// For I64, only add capacity if the absolute value is greater than 999,999,999,999
72+
// (the largest number that can fit in 12 bytes)
73+
TapeElement::I64(_) if coerce => {
74+
match tape.get(p + 1) {
75+
TapeElement::I32(_) => {
76+
let high = match tape.get(p) {
77+
TapeElement::I64(h) => h,
78+
_ => unreachable!(),
79+
};
80+
let low = match tape.get(p + 1) {
81+
TapeElement::I32(l) => l,
82+
_ => unreachable!(),
83+
};
84+
let val = ((high as i64) << 32) | (low as u32) as i64;
85+
if val.abs() > 999_999_999_999 {
86+
// Only allocate capacity based on the string representation if the number is large
87+
data_capacity += val.to_string().len();
88+
}
89+
}
90+
_ => unreachable!(),
91+
}
92+
}
93+
// For I32, do not increase capacity (the longest string representation is <= 12 bytes)
94+
TapeElement::I32(_) if coerce => {}
95+
// For F32 and F64, keep the existing estimate
96+
TapeElement::F32(_) if coerce => {
97+
data_capacity += 10;
98+
}
99+
TapeElement::F64(_) if coerce => {
100+
data_capacity += 10;
101+
}
102+
_ => {
103+
return Err(tape.error(p, "string"));
104+
}
105+
}
106+
}
107+
108+
let mut builder = GenericByteViewBuilder::<StringViewType>::with_capacity(data_capacity);
109+
// Temporary buffer to avoid per-iteration allocation for numeric types
110+
let mut tmp_buf = String::new();
111+
112+
for &p in pos {
113+
match tape.get(p) {
114+
TapeElement::String(idx) => {
115+
builder.append_value(tape.get_string(idx));
116+
}
117+
TapeElement::Null => {
118+
builder.append_null();
119+
}
120+
TapeElement::True if coerce => {
121+
builder.append_value(TRUE);
122+
}
123+
TapeElement::False if coerce => {
124+
builder.append_value(FALSE);
125+
}
126+
TapeElement::Number(idx) if coerce => {
127+
builder.append_value(tape.get_string(idx));
128+
}
129+
TapeElement::I64(high) if coerce => match tape.get(p + 1) {
130+
TapeElement::I32(low) => {
131+
let val = ((high as i64) << 32) | (low as u32) as i64;
132+
tmp_buf.clear();
133+
// Reuse the temporary buffer instead of allocating a new String
134+
write!(&mut tmp_buf, "{}", val).unwrap();
135+
builder.append_value(&tmp_buf);
136+
}
137+
_ => unreachable!(),
138+
},
139+
TapeElement::I32(n) if coerce => {
140+
tmp_buf.clear();
141+
write!(&mut tmp_buf, "{}", n).unwrap();
142+
builder.append_value(&tmp_buf);
143+
}
144+
TapeElement::F32(n) if coerce => {
145+
tmp_buf.clear();
146+
write!(&mut tmp_buf, "{}", n).unwrap();
147+
builder.append_value(&tmp_buf);
148+
}
149+
TapeElement::F64(high) if coerce => match tape.get(p + 1) {
150+
TapeElement::F32(low) => {
151+
let val = f64::from_bits(((high as u64) << 32) | (low as u64));
152+
tmp_buf.clear();
153+
write!(&mut tmp_buf, "{}", val).unwrap();
154+
builder.append_value(&tmp_buf);
155+
}
156+
_ => unreachable!(),
157+
},
158+
_ => unreachable!(),
159+
}
160+
}
161+
162+
let array = builder.finish();
163+
Ok(array.into_data())
164+
}
165+
}

arrow/benches/json_reader.rs

+30
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,35 @@ fn small_bench_primitive(c: &mut Criterion) {
6464
do_bench(c, "small_bench_primitive", json_content, schema)
6565
}
6666

67+
fn small_bench_primitive_with_utf8view(c: &mut Criterion) {
68+
let schema = Arc::new(Schema::new(vec![
69+
Field::new("c1", DataType::Utf8View, true),
70+
Field::new("c2", DataType::Float64, true),
71+
Field::new("c3", DataType::UInt32, true),
72+
Field::new("c4", DataType::Boolean, true),
73+
]));
74+
75+
let json_content = r#"
76+
{"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
77+
{"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
78+
{"c1": null, "c2": 3, "c3": 125, "c4": null}
79+
{"c2": -35, "c3": 100.0, "c4": true}
80+
{"c1": "fifteen", "c2": null, "c4": true}
81+
{"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
82+
{"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
83+
{"c1": null, "c2": 3, "c3": 125, "c4": null}
84+
{"c2": -35, "c3": 100.0, "c4": true}
85+
{"c1": "fifteen", "c2": null, "c4": true}
86+
"#;
87+
88+
do_bench(
89+
c,
90+
"small_bench_primitive_with_utf8view",
91+
json_content,
92+
schema,
93+
)
94+
}
95+
6796
fn large_bench_primitive(c: &mut Criterion) {
6897
let schema = Arc::new(Schema::new(vec![
6998
Field::new("c1", DataType::Utf8, true),
@@ -142,6 +171,7 @@ fn criterion_benchmark(c: &mut Criterion) {
142171
small_bench_primitive(c);
143172
large_bench_primitive(c);
144173
small_bench_list(c);
174+
small_bench_primitive_with_utf8view(c);
145175
}
146176

147177
criterion_group!(benches, criterion_benchmark);

0 commit comments

Comments
 (0)