Skip to content

feat: Support Utf8View in JSON reader #7263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ use crate::reader::map_array::MapArrayDecoder;
use crate::reader::null_array::NullArrayDecoder;
use crate::reader::primitive_array::PrimitiveArrayDecoder;
use crate::reader::string_array::StringArrayDecoder;
use crate::reader::string_view_array::StringViewArrayDecoder;
use crate::reader::struct_array::StructArrayDecoder;
use crate::reader::tape::{Tape, TapeDecoder};
use crate::reader::timestamp_array::TimestampArrayDecoder;
Expand All @@ -167,6 +168,7 @@ mod primitive_array;
mod schema;
mod serializer;
mod string_array;
mod string_view_array;
mod struct_array;
mod tape;
mod timestamp_array;
Expand Down Expand Up @@ -732,6 +734,7 @@ fn make_decoder(
DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
Expand All @@ -751,7 +754,7 @@ mod tests {
use std::io::{BufReader, Cursor, Seek};

use arrow_array::cast::AsArray;
use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray};
use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_data::ArrayDataBuilder;
Expand Down Expand Up @@ -902,6 +905,145 @@ mod tests {
assert_eq!(col2.value(4), "");
}

#[test]
fn test_long_string_view_allocation() {
// The JSON input contains field "a" with different string lengths.
// According to the implementation in the decoder:
// - For a string, capacity is only increased if its length > 12 bytes.
// Therefore, for:
// Row 1: "short" (5 bytes) -> capacity += 0
// Row 2: "this is definitely long" (24 bytes) -> capacity += 24
// Row 3: "hello" (5 bytes) -> capacity += 0
// Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17
// Expected total capacity = 24 + 17 = 41
let expected_capacity: usize = 41;

let buf = r#"
{"a": "short", "b": "dummy"}
{"a": "this is definitely long", "b": "dummy"}
{"a": "hello", "b": "dummy"}
{"a": "\nfoobar😀asfgÿ", "b": "dummy"}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8View, true),
Field::new("b", DataType::LargeUtf8, true),
]));

let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1, "Expected one record batch");

// Get the first column ("a") as a StringViewArray.
let col_a = batches[0].column(0);
let string_view_array = col_a
.as_any()
.downcast_ref::<StringViewArray>()
.expect("Column should be a StringViewArray");

// Retrieve the underlying data buffer from the array.
// The builder pre-allocates capacity based on the sum of lengths for long strings.
let data_buffer = string_view_array.to_data().buffers()[0].len();

// Check that the allocated capacity is at least what we expected.
// (The actual buffer may be larger than expected due to rounding or internal allocation strategies.)
assert!(
data_buffer >= expected_capacity,
"Data buffer length ({}) should be at least {}",
data_buffer,
expected_capacity
);

// Additionally, verify that the decoded values are correct.
assert_eq!(string_view_array.value(0), "short");
assert_eq!(string_view_array.value(1), "this is definitely long");
assert_eq!(string_view_array.value(2), "hello");
assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ");
}

/// Test the memory capacity allocation logic when converting numeric types to strings.
#[test]
fn test_numeric_view_allocation() {
// For numeric types, the expected capacity calculation is as follows:
// Row 1: 123456789 -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added.
// Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13),
// which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added.
// Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added.
// Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added.
// Total expected capacity = 13 + 10 + 10 = 33 bytes.
let expected_capacity: usize = 33;

let buf = r#"
{"n": 123456789}
{"n": 1000000000000}
{"n": 3.1415}
{"n": 2.718281828459045}
"#;

let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)]));

let batches = do_read(buf, 1024, true, false, schema);
assert_eq!(batches.len(), 1, "Expected one record batch");

let col_n = batches[0].column(0);
let string_view_array = col_n
.as_any()
.downcast_ref::<StringViewArray>()
.expect("Column should be a StringViewArray");

// Check that the underlying data buffer capacity is at least the expected value.
let data_buffer = string_view_array.to_data().buffers()[0].len();
assert!(
data_buffer >= expected_capacity,
"Data buffer length ({}) should be at least {}",
data_buffer,
expected_capacity
);

// Verify that the converted string values are correct.
// Note: The format of the number converted to a string should match the actual implementation.
assert_eq!(string_view_array.value(0), "123456789");
assert_eq!(string_view_array.value(1), "1000000000000");
assert_eq!(string_view_array.value(2), "3.1415");
assert_eq!(string_view_array.value(3), "2.718281828459045");
}

#[test]
fn test_string_with_uft8view() {
let buf = r#"
{"a": "1", "b": "2"}
{"a": "hello", "b": "shoo"}
{"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"}

{"b": null}
{"b": "", "a": null}

"#;
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Utf8View, true),
Field::new("b", DataType::LargeUtf8, true),
]));

let batches = do_read(buf, 1024, false, false, schema);
assert_eq!(batches.len(), 1);

let col1 = batches[0].column(0).as_string_view();
assert_eq!(col1.null_count(), 2);
assert_eq!(col1.value(0), "1");
assert_eq!(col1.value(1), "hello");
assert_eq!(col1.value(2), "\nfoobar😀asfgÿ");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this value has more than 12 bytes so it will exercise the longer string view path

assert!(col1.is_null(3));
assert!(col1.is_null(4));
assert_eq!(col1.data_type(), &DataType::Utf8View);

let col2 = batches[0].column(1).as_string::<i64>();
assert_eq!(col2.null_count(), 1);
assert_eq!(col2.value(0), "2");
assert_eq!(col2.value(1), "shoo");
assert_eq!(col2.value(2), "\t😁foo");
assert!(col2.is_null(3));
assert_eq!(col2.value(4), "");
}

#[test]
fn test_complex() {
let buf = r#"
Expand Down
165 changes: 165 additions & 0 deletions arrow-json/src/reader/string_view_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::builder::GenericByteViewBuilder;
use arrow_array::types::StringViewType;
use arrow_array::Array;
use arrow_data::ArrayData;
use arrow_schema::ArrowError;
use std::fmt::Write;

use crate::reader::tape::{Tape, TapeElement};
use crate::reader::ArrayDecoder;

const TRUE: &str = "true";
const FALSE: &str = "false";

pub struct StringViewArrayDecoder {
coerce_primitive: bool,
}

impl StringViewArrayDecoder {
pub fn new(coerce_primitive: bool) -> Self {
Self { coerce_primitive }
}
}

impl ArrayDecoder for StringViewArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let coerce = self.coerce_primitive;
let mut data_capacity = 0;
for &p in pos {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringView is different that StringArray in that only "long" strings (longer than 12 bytes) contributed to the data

Thus I think these calculations should be adjusted:

  1. only increase capacity for strings if the data is over 12 bytes
  2. don't increase for boolean
  3. For I32 probably we can use zero as well (as the longest such value is -2147483647 whcih is less than 12 bytes)
  4. For I64 maybe we could be more sophisticated and only add data length if the value is over 999999999999 etc.
  5. For F32 and F64 I am not sure what hte maximum length of a string representation is so we should probably keep the existing estimate

More details on the layout are here: https://docs.rs/arrow/latest/arrow/array/struct.GenericByteViewArray.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good explain and guide, thank you @alamb!

// note that StringView is different that StringArray in that only
// "long" strings (longer than 12 bytes) are stored in the buffer.
// "short" strings are inlined into a fixed length structure.
match tape.get(p) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a little context on the rationale here would be helpful:

Suggested change
match tape.get(p) {
// note that StringView is different that StringArray in that only
// "long" strings (longer than 12 bytes) are stored in the buffer.
// "short" strings are inlined into a fixed length structure.
match tape.get(p) {

TapeElement::String(idx) => {
let s = tape.get_string(idx);
// Only increase capacity if the string length is greater than 12 bytes
if s.len() > 12 {
data_capacity += s.len();
}
}
TapeElement::Null => {
// Do not increase capacity for null values
}
// For booleans, do not increase capacity (both "true" and "false" are less than
// 12 bytes)
TapeElement::True if coerce => {}
TapeElement::False if coerce => {}
// For Number, use the same strategy as for strings
TapeElement::Number(idx) if coerce => {
let s = tape.get_string(idx);
if s.len() > 12 {
data_capacity += s.len();
}
}
// For I64, only add capacity if the absolute value is greater than 999,999,999,999
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// For I64, only add capacity if the absolute value is greater than 999,999,999,999
// For I64, only add capacity if the absolute value is greater than 999,999,999,999
// (the largest number that can fit in 12 bytes)

// (the largest number that can fit in 12 bytes)
TapeElement::I64(_) if coerce => {
match tape.get(p + 1) {
TapeElement::I32(_) => {
let high = match tape.get(p) {
TapeElement::I64(h) => h,
_ => unreachable!(),
};
let low = match tape.get(p + 1) {
TapeElement::I32(l) => l,
_ => unreachable!(),
};
let val = ((high as i64) << 32) | (low as u32) as i64;
if val.abs() > 999_999_999_999 {
// Only allocate capacity based on the string representation if the number is large
data_capacity += val.to_string().len();
}
}
_ => unreachable!(),
}
}
// For I32, do not increase capacity (the longest string representation is <= 12 bytes)
TapeElement::I32(_) if coerce => {}
// For F32 and F64, keep the existing estimate
TapeElement::F32(_) if coerce => {
data_capacity += 10;
}
TapeElement::F64(_) if coerce => {
data_capacity += 10;
}
_ => {
return Err(tape.error(p, "string"));
}
}
}

let mut builder = GenericByteViewBuilder::<StringViewType>::with_capacity(data_capacity);
// Temporary buffer to avoid per-iteration allocation for numeric types
let mut tmp_buf = String::new();

for &p in pos {
match tape.get(p) {
TapeElement::String(idx) => {
builder.append_value(tape.get_string(idx));
}
TapeElement::Null => {
builder.append_null();
}
TapeElement::True if coerce => {
builder.append_value(TRUE);
}
TapeElement::False if coerce => {
builder.append_value(FALSE);
}
TapeElement::Number(idx) if coerce => {
builder.append_value(tape.get_string(idx));
}
TapeElement::I64(high) if coerce => match tape.get(p + 1) {
TapeElement::I32(low) => {
let val = ((high as i64) << 32) | (low as u32) as i64;
tmp_buf.clear();
// Reuse the temporary buffer instead of allocating a new String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

write!(&mut tmp_buf, "{}", val).unwrap();
builder.append_value(&tmp_buf);
}
_ => unreachable!(),
},
TapeElement::I32(n) if coerce => {
tmp_buf.clear();
write!(&mut tmp_buf, "{}", n).unwrap();
builder.append_value(&tmp_buf);
}
TapeElement::F32(n) if coerce => {
tmp_buf.clear();
write!(&mut tmp_buf, "{}", n).unwrap();
builder.append_value(&tmp_buf);
}
TapeElement::F64(high) if coerce => match tape.get(p + 1) {
TapeElement::F32(low) => {
let val = f64::from_bits(((high as u64) << 32) | (low as u64));
tmp_buf.clear();
write!(&mut tmp_buf, "{}", val).unwrap();
builder.append_value(&tmp_buf);
}
_ => unreachable!(),
},
_ => unreachable!(),
}
}

let array = builder.finish();
Ok(array.into_data())
}
}
30 changes: 30 additions & 0 deletions arrow/benches/json_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ fn small_bench_primitive(c: &mut Criterion) {
do_bench(c, "small_bench_primitive", json_content, schema)
}

fn small_bench_primitive_with_utf8view(c: &mut Criterion) {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8View, true),
Field::new("c2", DataType::Float64, true),
Field::new("c3", DataType::UInt32, true),
Field::new("c4", DataType::Boolean, true),
]));

let json_content = r#"
{"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
{"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
{"c1": null, "c2": 3, "c3": 125, "c4": null}
{"c2": -35, "c3": 100.0, "c4": true}
{"c1": "fifteen", "c2": null, "c4": true}
{"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false}
{"c1": "twelve", "c2": -55555555555555.2, "c3": 3}
{"c1": null, "c2": 3, "c3": 125, "c4": null}
{"c2": -35, "c3": 100.0, "c4": true}
{"c1": "fifteen", "c2": null, "c4": true}
"#;

do_bench(
c,
"small_bench_primitive_with_utf8view",
json_content,
schema,
)
}

fn large_bench_primitive(c: &mut Criterion) {
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Expand Down Expand Up @@ -142,6 +171,7 @@ fn criterion_benchmark(c: &mut Criterion) {
small_bench_primitive(c);
large_bench_primitive(c);
small_bench_list(c);
small_bench_primitive_with_utf8view(c);
}

criterion_group!(benches, criterion_benchmark);
Expand Down
Loading