diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs index 14a8f6809f70..cd33e337be08 100644 --- a/arrow-json/src/reader/mod.rs +++ b/arrow-json/src/reader/mod.rs @@ -154,6 +154,7 @@ use crate::reader::map_array::MapArrayDecoder; use crate::reader::null_array::NullArrayDecoder; use crate::reader::primitive_array::PrimitiveArrayDecoder; use crate::reader::string_array::StringArrayDecoder; +use crate::reader::string_view_array::StringViewArrayDecoder; use crate::reader::struct_array::StructArrayDecoder; use crate::reader::tape::{Tape, TapeDecoder}; use crate::reader::timestamp_array::TimestampArrayDecoder; @@ -167,6 +168,7 @@ mod primitive_array; mod schema; mod serializer; mod string_array; +mod string_view_array; mod struct_array; mod tape; mod timestamp_array; @@ -732,6 +734,7 @@ fn make_decoder( DataType::Decimal256(p, s) => Ok(Box::new(DecimalArrayDecoder::::new(p, s))), DataType::Boolean => Ok(Box::::default()), DataType::Utf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), + DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))), DataType::LargeUtf8 => Ok(Box::new(StringArrayDecoder::::new(coerce_primitive))), DataType::List(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), DataType::LargeList(_) => Ok(Box::new(ListArrayDecoder::::new(data_type, coerce_primitive, strict_mode, is_nullable, struct_mode)?)), @@ -751,7 +754,7 @@ mod tests { use std::io::{BufReader, Cursor, Seek}; use arrow_array::cast::AsArray; - use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray}; + use arrow_array::{Array, BooleanArray, Float64Array, ListArray, StringArray, StringViewArray}; use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_cast::display::{ArrayFormatter, FormatOptions}; use arrow_data::ArrayDataBuilder; @@ -902,6 +905,145 @@ mod tests { assert_eq!(col2.value(4), ""); } + #[test] + fn test_long_string_view_allocation() { + // The JSON input contains field "a" with different string lengths. + // According to the implementation in the decoder: + // - For a string, capacity is only increased if its length > 12 bytes. + // Therefore, for: + // Row 1: "short" (5 bytes) -> capacity += 0 + // Row 2: "this is definitely long" (24 bytes) -> capacity += 24 + // Row 3: "hello" (5 bytes) -> capacity += 0 + // Row 4: "\nfoobar😀asfgÿ" (17 bytes) -> capacity += 17 + // Expected total capacity = 24 + 17 = 41 + let expected_capacity: usize = 41; + + let buf = r#" + {"a": "short", "b": "dummy"} + {"a": "this is definitely long", "b": "dummy"} + {"a": "hello", "b": "dummy"} + {"a": "\nfoobar😀asfgÿ", "b": "dummy"} + "#; + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::LargeUtf8, true), + ])); + + let batches = do_read(buf, 1024, false, false, schema); + assert_eq!(batches.len(), 1, "Expected one record batch"); + + // Get the first column ("a") as a StringViewArray. + let col_a = batches[0].column(0); + let string_view_array = col_a + .as_any() + .downcast_ref::() + .expect("Column should be a StringViewArray"); + + // Retrieve the underlying data buffer from the array. + // The builder pre-allocates capacity based on the sum of lengths for long strings. + let data_buffer = string_view_array.to_data().buffers()[0].len(); + + // Check that the allocated capacity is at least what we expected. + // (The actual buffer may be larger than expected due to rounding or internal allocation strategies.) + assert!( + data_buffer >= expected_capacity, + "Data buffer length ({}) should be at least {}", + data_buffer, + expected_capacity + ); + + // Additionally, verify that the decoded values are correct. + assert_eq!(string_view_array.value(0), "short"); + assert_eq!(string_view_array.value(1), "this is definitely long"); + assert_eq!(string_view_array.value(2), "hello"); + assert_eq!(string_view_array.value(3), "\nfoobar😀asfgÿ"); + } + + /// Test the memory capacity allocation logic when converting numeric types to strings. + #[test] + fn test_numeric_view_allocation() { + // For numeric types, the expected capacity calculation is as follows: + // Row 1: 123456789 -> Number converts to the string "123456789" (length 9), 9 <= 12, so no capacity is added. + // Row 2: 1000000000000 -> Treated as an I64 number; its string is "1000000000000" (length 13), + // which is >12 and its absolute value is > 999_999_999_999, so 13 bytes are added. + // Row 3: 3.1415 -> F32 number, a fixed estimate of 10 bytes is added. + // Row 4: 2.718281828459045 -> F64 number, a fixed estimate of 10 bytes is added. + // Total expected capacity = 13 + 10 + 10 = 33 bytes. + let expected_capacity: usize = 33; + + let buf = r#" + {"n": 123456789} + {"n": 1000000000000} + {"n": 3.1415} + {"n": 2.718281828459045} + "#; + + let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Utf8View, true)])); + + let batches = do_read(buf, 1024, true, false, schema); + assert_eq!(batches.len(), 1, "Expected one record batch"); + + let col_n = batches[0].column(0); + let string_view_array = col_n + .as_any() + .downcast_ref::() + .expect("Column should be a StringViewArray"); + + // Check that the underlying data buffer capacity is at least the expected value. + let data_buffer = string_view_array.to_data().buffers()[0].len(); + assert!( + data_buffer >= expected_capacity, + "Data buffer length ({}) should be at least {}", + data_buffer, + expected_capacity + ); + + // Verify that the converted string values are correct. + // Note: The format of the number converted to a string should match the actual implementation. + assert_eq!(string_view_array.value(0), "123456789"); + assert_eq!(string_view_array.value(1), "1000000000000"); + assert_eq!(string_view_array.value(2), "3.1415"); + assert_eq!(string_view_array.value(3), "2.718281828459045"); + } + + #[test] + fn test_string_with_uft8view() { + let buf = r#" + {"a": "1", "b": "2"} + {"a": "hello", "b": "shoo"} + {"b": "\t😁foo", "a": "\nfoobar\ud83d\ude00\u0061\u0073\u0066\u0067\u00FF"} + + {"b": null} + {"b": "", "a": null} + + "#; + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::LargeUtf8, true), + ])); + + let batches = do_read(buf, 1024, false, false, schema); + assert_eq!(batches.len(), 1); + + let col1 = batches[0].column(0).as_string_view(); + assert_eq!(col1.null_count(), 2); + assert_eq!(col1.value(0), "1"); + assert_eq!(col1.value(1), "hello"); + assert_eq!(col1.value(2), "\nfoobar😀asfgÿ"); + assert!(col1.is_null(3)); + assert!(col1.is_null(4)); + assert_eq!(col1.data_type(), &DataType::Utf8View); + + let col2 = batches[0].column(1).as_string::(); + assert_eq!(col2.null_count(), 1); + assert_eq!(col2.value(0), "2"); + assert_eq!(col2.value(1), "shoo"); + assert_eq!(col2.value(2), "\t😁foo"); + assert!(col2.is_null(3)); + assert_eq!(col2.value(4), ""); + } + #[test] fn test_complex() { let buf = r#" diff --git a/arrow-json/src/reader/string_view_array.rs b/arrow-json/src/reader/string_view_array.rs new file mode 100644 index 000000000000..8aeb1c805899 --- /dev/null +++ b/arrow-json/src/reader/string_view_array.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::builder::GenericByteViewBuilder; +use arrow_array::types::StringViewType; +use arrow_array::Array; +use arrow_data::ArrayData; +use arrow_schema::ArrowError; +use std::fmt::Write; + +use crate::reader::tape::{Tape, TapeElement}; +use crate::reader::ArrayDecoder; + +const TRUE: &str = "true"; +const FALSE: &str = "false"; + +pub struct StringViewArrayDecoder { + coerce_primitive: bool, +} + +impl StringViewArrayDecoder { + pub fn new(coerce_primitive: bool) -> Self { + Self { coerce_primitive } + } +} + +impl ArrayDecoder for StringViewArrayDecoder { + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + let coerce = self.coerce_primitive; + let mut data_capacity = 0; + for &p in pos { + // note that StringView is different that StringArray in that only + // "long" strings (longer than 12 bytes) are stored in the buffer. + // "short" strings are inlined into a fixed length structure. + match tape.get(p) { + TapeElement::String(idx) => { + let s = tape.get_string(idx); + // Only increase capacity if the string length is greater than 12 bytes + if s.len() > 12 { + data_capacity += s.len(); + } + } + TapeElement::Null => { + // Do not increase capacity for null values + } + // For booleans, do not increase capacity (both "true" and "false" are less than + // 12 bytes) + TapeElement::True if coerce => {} + TapeElement::False if coerce => {} + // For Number, use the same strategy as for strings + TapeElement::Number(idx) if coerce => { + let s = tape.get_string(idx); + if s.len() > 12 { + data_capacity += s.len(); + } + } + // For I64, only add capacity if the absolute value is greater than 999,999,999,999 + // (the largest number that can fit in 12 bytes) + TapeElement::I64(_) if coerce => { + match tape.get(p + 1) { + TapeElement::I32(_) => { + let high = match tape.get(p) { + TapeElement::I64(h) => h, + _ => unreachable!(), + }; + let low = match tape.get(p + 1) { + TapeElement::I32(l) => l, + _ => unreachable!(), + }; + let val = ((high as i64) << 32) | (low as u32) as i64; + if val.abs() > 999_999_999_999 { + // Only allocate capacity based on the string representation if the number is large + data_capacity += val.to_string().len(); + } + } + _ => unreachable!(), + } + } + // For I32, do not increase capacity (the longest string representation is <= 12 bytes) + TapeElement::I32(_) if coerce => {} + // For F32 and F64, keep the existing estimate + TapeElement::F32(_) if coerce => { + data_capacity += 10; + } + TapeElement::F64(_) if coerce => { + data_capacity += 10; + } + _ => { + return Err(tape.error(p, "string")); + } + } + } + + let mut builder = GenericByteViewBuilder::::with_capacity(data_capacity); + // Temporary buffer to avoid per-iteration allocation for numeric types + let mut tmp_buf = String::new(); + + for &p in pos { + match tape.get(p) { + TapeElement::String(idx) => { + builder.append_value(tape.get_string(idx)); + } + TapeElement::Null => { + builder.append_null(); + } + TapeElement::True if coerce => { + builder.append_value(TRUE); + } + TapeElement::False if coerce => { + builder.append_value(FALSE); + } + TapeElement::Number(idx) if coerce => { + builder.append_value(tape.get_string(idx)); + } + TapeElement::I64(high) if coerce => match tape.get(p + 1) { + TapeElement::I32(low) => { + let val = ((high as i64) << 32) | (low as u32) as i64; + tmp_buf.clear(); + // Reuse the temporary buffer instead of allocating a new String + write!(&mut tmp_buf, "{}", val).unwrap(); + builder.append_value(&tmp_buf); + } + _ => unreachable!(), + }, + TapeElement::I32(n) if coerce => { + tmp_buf.clear(); + write!(&mut tmp_buf, "{}", n).unwrap(); + builder.append_value(&tmp_buf); + } + TapeElement::F32(n) if coerce => { + tmp_buf.clear(); + write!(&mut tmp_buf, "{}", n).unwrap(); + builder.append_value(&tmp_buf); + } + TapeElement::F64(high) if coerce => match tape.get(p + 1) { + TapeElement::F32(low) => { + let val = f64::from_bits(((high as u64) << 32) | (low as u64)); + tmp_buf.clear(); + write!(&mut tmp_buf, "{}", val).unwrap(); + builder.append_value(&tmp_buf); + } + _ => unreachable!(), + }, + _ => unreachable!(), + } + } + + let array = builder.finish(); + Ok(array.into_data()) + } +} diff --git a/arrow/benches/json_reader.rs b/arrow/benches/json_reader.rs index c698a93fe869..e0e46d3d78db 100644 --- a/arrow/benches/json_reader.rs +++ b/arrow/benches/json_reader.rs @@ -64,6 +64,35 @@ fn small_bench_primitive(c: &mut Criterion) { do_bench(c, "small_bench_primitive", json_content, schema) } +fn small_bench_primitive_with_utf8view(c: &mut Criterion) { + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::Utf8View, true), + Field::new("c2", DataType::Float64, true), + Field::new("c3", DataType::UInt32, true), + Field::new("c4", DataType::Boolean, true), + ])); + + let json_content = r#" + {"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false} + {"c1": "twelve", "c2": -55555555555555.2, "c3": 3} + {"c1": null, "c2": 3, "c3": 125, "c4": null} + {"c2": -35, "c3": 100.0, "c4": true} + {"c1": "fifteen", "c2": null, "c4": true} + {"c1": "eleven", "c2": 6.2222222225, "c3": 5.0, "c4": false} + {"c1": "twelve", "c2": -55555555555555.2, "c3": 3} + {"c1": null, "c2": 3, "c3": 125, "c4": null} + {"c2": -35, "c3": 100.0, "c4": true} + {"c1": "fifteen", "c2": null, "c4": true} + "#; + + do_bench( + c, + "small_bench_primitive_with_utf8view", + json_content, + schema, + ) +} + fn large_bench_primitive(c: &mut Criterion) { let schema = Arc::new(Schema::new(vec![ Field::new("c1", DataType::Utf8, true), @@ -142,6 +171,7 @@ fn criterion_benchmark(c: &mut Criterion) { small_bench_primitive(c); large_bench_primitive(c); small_bench_list(c); + small_bench_primitive_with_utf8view(c); } criterion_group!(benches, criterion_benchmark);