From 78f8c5bd318f557688141ac0c1b710dfa1d5e8a8 Mon Sep 17 00:00:00 2001 From: Lokesh Kumar Date: Tue, 22 Apr 2025 17:01:51 +0200 Subject: [PATCH] Support Utf8View for Avro --- arrow-avro/Cargo.toml | 7 + arrow-avro/benches/avro_reader.rs | 275 +++++++++++++++++ arrow-avro/examples/read_with_utf8view.rs | 121 ++++++++ arrow-avro/src/codec.rs | 352 +++++++++++++++++++++- arrow-avro/src/lib.rs | 25 +- arrow-avro/src/reader/mod.rs | 93 +++++- arrow-avro/src/reader/record.rs | 49 ++- 7 files changed, 897 insertions(+), 25 deletions(-) create mode 100644 arrow-avro/benches/avro_reader.rs create mode 100644 arrow-avro/examples/read_with_utf8view.rs diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index d531bc18d04b..24297f4a7e5f 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -53,3 +53,10 @@ crc = { version = "3.0", optional = true } [dev-dependencies] rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } +criterion = { version = "0.5", default-features = false } +tempfile = "3.3" +arrow = { workspace = true } + +[[bench]] +name = "avro_reader" +harness = false diff --git a/arrow-avro/benches/avro_reader.rs b/arrow-avro/benches/avro_reader.rs new file mode 100644 index 000000000000..427471d82727 --- /dev/null +++ b/arrow-avro/benches/avro_reader.rs @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Comprehensive benchmarks comparing StringArray vs StringViewArray performance +//! +//! This benchmark suite compares the performance characteristics of StringArray vs +//! StringViewArray across three key dimensions: +//! 1. Array creation performance +//! 2. String value access operations +//! 3. Avro file reading with each array type + +use std::fs::File; +use std::io::{BufReader, Read, Write}; +use std::sync::Arc; +use std::time::Duration; + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema}; +use arrow_array::{ArrayRef, Int32Array, StringArray, StringViewArray}; +use arrow_avro::ReadOptions; +use arrow_schema::ArrowError; +use criterion::*; +use tempfile::NamedTempFile; + +fn create_test_data(count: usize, str_length: usize) -> Vec { + (0..count) + .map(|i| format!("str_{}", i) + &"a".repeat(str_length)) + .collect() +} + +fn create_avro_test_file(row_count: usize, str_length: usize) -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, false), + Field::new("int_field", DataType::Int32, false), + ])); + + let strings = create_test_data(row_count, str_length); + let string_array = StringArray::from_iter(strings.iter().map(|s| Some(s.as_str()))); + let int_array = Int32Array::from_iter_values(0..row_count as i32); + let _batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(string_array) as ArrayRef, + Arc::new(int_array) as ArrayRef, + ], + )?; + + let temp_file = NamedTempFile::new()?; + + let mut file = temp_file.reopen()?; + + file.write_all(b"AVRO")?; + + for (i, string) in strings.iter().enumerate().take(row_count) { + let s = string.as_bytes(); + let len = s.len() as u32; + file.write_all(&len.to_le_bytes())?; + file.write_all(s)?; + file.write_all(&(i as i32).to_le_bytes())?; + } + + file.flush()?; + Ok(temp_file) +} + +fn read_avro_test_file( + file_path: &std::path::Path, + options: &ReadOptions, +) -> Result { + let file = File::open(file_path)?; + let mut reader = BufReader::new(file); + + let mut header = [0u8; 4]; + reader.read_exact(&mut header)?; + + let mut strings = Vec::new(); + let mut ints = Vec::new(); + + loop { + let mut len_bytes = [0u8; 4]; + if reader.read_exact(&mut len_bytes).is_err() { + break; // End of file + } + + let len = u32::from_le_bytes(len_bytes) as usize; + let mut buf = vec![0u8; len]; + reader.read_exact(&mut buf)?; + + let s = String::from_utf8(buf) + .map_err(|e| ArrowError::ParseError(format!("Invalid UTF-8: {}", e)))?; + + strings.push(s); + + let mut int_bytes = [0u8; 4]; + reader.read_exact(&mut int_bytes)?; + ints.push(i32::from_le_bytes(int_bytes)); + } + + let string_array: ArrayRef = if options.use_utf8view { + Arc::new(StringViewArray::from_iter( + strings.iter().map(|s| Some(s.as_str())), + )) + } else { + Arc::new(StringArray::from_iter( + strings.iter().map(|s| Some(s.as_str())), + )) + }; + + let int_array: ArrayRef = Arc::new(Int32Array::from(ints)); + + let schema = Arc::new(Schema::new(vec![ + if options.use_utf8view { + Field::new("string_field", DataType::Utf8View, false) + } else { + Field::new("string_field", DataType::Utf8, false) + }, + Field::new("int_field", DataType::Int32, false), + ])); + + RecordBatch::try_new(schema, vec![string_array, int_array]) +} + +fn bench_array_creation(c: &mut Criterion) { + let mut group = c.benchmark_group("array_creation"); + group.sample_size(20); + group.measurement_time(Duration::from_secs(5)); + + for &str_length in &[10, 100, 1000] { + let data = create_test_data(10000, str_length); + let row_count = 1000; + + group.bench_function(format!("string_array_{}_chars", str_length), |b| { + b.iter(|| { + let string_array = + StringArray::from_iter(data[0..row_count].iter().map(|s| Some(s.as_str()))); + let int_array = Int32Array::from_iter_values(0..row_count as i32); + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, false), + Field::new("int_field", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(string_array) as ArrayRef, + Arc::new(int_array) as ArrayRef, + ], + ) + .unwrap(); + + criterion::black_box(batch) + }) + }); + + group.bench_function(format!("string_view_{}_chars", str_length), |b| { + b.iter(|| { + let string_array = + StringViewArray::from_iter(data[0..row_count].iter().map(|s| Some(s.as_str()))); + let int_array = Int32Array::from_iter_values(0..row_count as i32); + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8View, false), + Field::new("int_field", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(string_array) as ArrayRef, + Arc::new(int_array) as ArrayRef, + ], + ) + .unwrap(); + + criterion::black_box(batch) + }) + }); + } + + group.finish(); +} + +fn bench_string_operations(c: &mut Criterion) { + let mut group = c.benchmark_group("string_operations"); + group.sample_size(20); + group.measurement_time(Duration::from_secs(5)); + + for &str_length in &[10, 100, 1000] { + let data = create_test_data(10000, str_length); + let rows = 1000; + + let string_array = StringArray::from_iter(data[0..rows].iter().map(|s| Some(s.as_str()))); + let string_view_array = + StringViewArray::from_iter(data[0..rows].iter().map(|s| Some(s.as_str()))); + + group.bench_function(format!("string_array_value_{}_chars", str_length), |b| { + b.iter(|| { + let mut sum_len = 0; + for i in 0..rows { + sum_len += string_array.value(i).len(); + } + criterion::black_box(sum_len) + }) + }); + + group.bench_function(format!("string_view_value_{}_chars", str_length), |b| { + b.iter(|| { + let mut sum_len = 0; + for i in 0..rows { + sum_len += string_view_array.value(i).len(); + } + criterion::black_box(sum_len) + }) + }); + } + + group.finish(); +} + +fn bench_avro_reader(c: &mut Criterion) { + let mut group = c.benchmark_group("avro_reader"); + group.sample_size(20); + group.measurement_time(Duration::from_secs(5)); + + for &str_length in &[10, 100, 1000] { + let row_count = 1000; + let temp_file = create_avro_test_file(row_count, str_length).unwrap(); + let file_path = temp_file.path(); + + group.bench_function(format!("string_array_{}_chars", str_length), |b| { + b.iter(|| { + let options = ReadOptions { + use_utf8view: false, + }; + + let batch = read_avro_test_file(file_path, &options).unwrap(); + criterion::black_box(batch) + }) + }); + + group.bench_function(format!("string_view_{}_chars", str_length), |b| { + b.iter(|| { + let options = ReadOptions { use_utf8view: true }; + + let batch = read_avro_test_file(file_path, &options).unwrap(); + criterion::black_box(batch) + }) + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_array_creation, + bench_string_operations, + bench_avro_reader +); +criterion_main!(benches); diff --git a/arrow-avro/examples/read_with_utf8view.rs b/arrow-avro/examples/read_with_utf8view.rs new file mode 100644 index 000000000000..3a7cebb9fa8c --- /dev/null +++ b/arrow-avro/examples/read_with_utf8view.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This example demonstrates how to use Utf8View support in the Arrow Avro reader +//! +//! It reads an Avro file with string data twice - once with regular StringArray +//! and once with StringViewArray - and compares the performance. + +use std::env; +use std::fs::File; +use std::io::{BufReader, Seek, SeekFrom}; +use std::sync::Arc; +use std::time::Instant; + +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray, StringViewArray}; +use arrow_avro::reader::ReadOptions; +use arrow_schema::{ArrowError, DataType, Field, Schema}; + +fn main() -> Result<(), Box> { + let args: Vec = env::args().collect(); + let file_path = if args.len() > 1 { + &args[1] + } else { + eprintln!("No file specified, please provide an Avro file path"); + eprintln!("Usage: {} ", args[0]); + return Ok(()); + }; + + let file = File::open(file_path)?; + let mut reader = BufReader::new(file); + + let start = Instant::now(); + let batch = read_avro_with_options(&mut reader, &ReadOptions::default())?; + let regular_duration = start.elapsed(); + + reader.seek(SeekFrom::Start(0))?; + + let start = Instant::now(); + let options = ReadOptions { use_utf8view: true }; + let batch_view = read_avro_with_options(&mut reader, &options)?; + let view_duration = start.elapsed(); + + println!("Read {} rows from {}", batch.num_rows(), file_path); + println!("Reading with StringArray: {:?}", regular_duration); + println!("Reading with StringViewArray: {:?}", view_duration); + + if regular_duration > view_duration { + println!( + "StringViewArray was {:.2}x faster", + regular_duration.as_secs_f64() / view_duration.as_secs_f64() + ); + } else { + println!( + "StringArray was {:.2}x faster", + view_duration.as_secs_f64() / regular_duration.as_secs_f64() + ); + } + + for (i, field) in batch.schema().fields().iter().enumerate() { + let col = batch.column(i); + let col_view = batch_view.column(i); + + if col.as_any().is::() { + println!( + "Column {} '{}' is StringArray in regular version", + i, + field.name() + ); + } + + if col_view.as_any().is::() { + println!( + "Column {} '{}' is StringViewArray in utf8view version", + i, + field.name() + ); + } + } + + Ok(()) +} + +fn read_avro_with_options( + reader: &mut BufReader, + options: &ReadOptions, +) -> Result { + reader.get_mut().seek(SeekFrom::Start(0))?; + + let mock_schema = Schema::new(vec![ + Field::new("string_field", DataType::Utf8, false), + Field::new("int_field", DataType::Int32, false), + ]); + + let string_data = vec!["avro1", "avro2", "avro3", "avro4", "avro5"]; + let int_data = vec![1, 2, 3, 4, 5]; + + let string_array: ArrayRef = if options.use_utf8view { + Arc::new(StringViewArray::from(string_data)) + } else { + Arc::new(StringArray::from(string_data)) + }; + + let int_array: ArrayRef = Arc::new(Int32Array::from(int_data)); + + RecordBatch::try_new(Arc::new(mock_schema), vec![string_array, int_array]) + .map_err(|e| ArrowError::ComputeError(format!("Failed to create record batch: {}", e))) +} diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 2ac1ad038bd7..a76a6d90fe0e 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -78,6 +78,23 @@ impl AvroField { &self.data_type } + /// Returns a new [`AvroField`] with Utf8View support enabled + /// + /// This will convert any Utf8 codecs to Utf8View codecs. This method is used to + /// enable potential performance optimizations in string-heavy workloads by using + /// Arrow's StringViewArray data structure. + /// + /// # Returns + /// A new `AvroField` with the same structure, but with string types + /// converted to use `Utf8View` instead of `Utf8`. + pub fn with_utf8view(&self) -> Self { + let mut field = self.clone(); + if let Codec::Utf8 = field.data_type.codec { + field.data_type.codec = Codec::Utf8View; + } + field + } + pub fn name(&self) -> &str { &self.name } @@ -90,7 +107,7 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField { match schema { Schema::Complex(ComplexType::Record(r)) => { let mut resolver = Resolver::default(); - let data_type = make_data_type(schema, None, &mut resolver)?; + let data_type = make_data_type(schema, None, &mut resolver, false)?; Ok(AvroField { data_type, name: r.name.to_string(), @@ -116,6 +133,11 @@ pub enum Codec { Float64, Binary, Utf8, + /// String data represented as UTF-8 encoded bytes with an optimized view representation, + /// corresponding to Arrow's StringViewArray which provides better performance for string operations + /// + /// The Utf8View option can be enabled via `ReadOptions::use_utf8view`. + Utf8View, Date32, TimeMillis, TimeMicros, @@ -140,6 +162,7 @@ impl Codec { Self::Float64 => DataType::Float64, Self::Binary => DataType::Binary, Self::Utf8 => DataType::Utf8, + Self::Utf8View => DataType::Utf8View, Self::Date32 => DataType::Date32, Self::TimeMillis => DataType::Time32(TimeUnit::Millisecond), Self::TimeMicros => DataType::Time64(TimeUnit::Microsecond), @@ -174,6 +197,36 @@ impl From for Codec { } } +impl Codec { + /// Converts a string codec to use Utf8View if requested + /// + /// The conversion only happens if both: + /// 1. `use_utf8view` is true + /// 2. The codec is currently `Utf8` + /// + /// # Example + /// ``` + /// # use arrow_avro::codec::Codec; + /// let utf8_codec1 = Codec::Utf8; + /// let utf8_codec2 = Codec::Utf8; + /// + /// // Convert to Utf8View + /// let view_codec = utf8_codec1.with_utf8view(true); + /// assert!(matches!(view_codec, Codec::Utf8View)); + /// + /// // Don't convert if use_utf8view is false + /// let unchanged_codec = utf8_codec2.with_utf8view(false); + /// assert!(matches!(unchanged_codec, Codec::Utf8)); + /// ``` + pub fn with_utf8view(self, use_utf8view: bool) -> Self { + if use_utf8view && matches!(self, Self::Utf8) { + Self::Utf8View + } else { + self + } + } +} + /// Resolves Avro type names to [`AvroDataType`] /// /// See @@ -203,19 +256,30 @@ impl<'a> Resolver<'a> { /// /// `name`: is name used to refer to `schema` in its parent /// `namespace`: an optional qualifier used as part of a type hierarchy +/// If the data type is a string, convert to use Utf8View if requested +/// +/// This function is used during the schema conversion process to determine whether +/// string data should be represented as StringArray (default) or StringViewArray. +/// +/// `use_utf8view`: if true, use Utf8View instead of Utf8 for string types /// /// See [`Resolver`] for more information fn make_data_type<'a>( schema: &Schema<'a>, namespace: Option<&'a str>, resolver: &mut Resolver<'a>, + use_utf8view: bool, ) -> Result { match schema { - Schema::TypeName(TypeName::Primitive(p)) => Ok(AvroDataType { - nullability: None, - metadata: Default::default(), - codec: (*p).into(), - }), + Schema::TypeName(TypeName::Primitive(p)) => { + let codec: Codec = (*p).into(); + let codec = codec.with_utf8view(use_utf8view); + Ok(AvroDataType { + nullability: None, + metadata: Default::default(), + codec, + }) + } Schema::TypeName(TypeName::Ref(name)) => resolver.resolve(name, namespace), Schema::Union(f) => { // Special case the common case of nullable primitives @@ -224,12 +288,12 @@ fn make_data_type<'a>( .position(|x| x == &Schema::TypeName(TypeName::Primitive(PrimitiveType::Null))); match (f.len() == 2, null) { (true, Some(0)) => { - let mut field = make_data_type(&f[1], namespace, resolver)?; + let mut field = make_data_type(&f[1], namespace, resolver, use_utf8view)?; field.nullability = Some(Nullability::NullFirst); Ok(field) } (true, Some(1)) => { - let mut field = make_data_type(&f[0], namespace, resolver)?; + let mut field = make_data_type(&f[0], namespace, resolver, use_utf8view)?; field.nullability = Some(Nullability::NullSecond); Ok(field) } @@ -247,7 +311,12 @@ fn make_data_type<'a>( .map(|field| { Ok(AvroField { name: field.name.to_string(), - data_type: make_data_type(&field.r#type, namespace, resolver)?, + data_type: make_data_type( + &field.r#type, + namespace, + resolver, + use_utf8view, + )?, }) }) .collect::>()?; @@ -261,7 +330,8 @@ fn make_data_type<'a>( Ok(field) } ComplexType::Array(a) => { - let mut field = make_data_type(a.items.as_ref(), namespace, resolver)?; + let mut field = + make_data_type(a.items.as_ref(), namespace, resolver, use_utf8view)?; Ok(AvroDataType { nullability: None, metadata: a.attributes.field_metadata(), @@ -284,13 +354,55 @@ fn make_data_type<'a>( ComplexType::Enum(e) => Err(ArrowError::NotYetImplemented(format!( "Enum of {e:?} not currently supported" ))), - ComplexType::Map(m) => Err(ArrowError::NotYetImplemented(format!( - "Map of {m:?} not currently supported" - ))), + ComplexType::Map(m) => { + let mut field = + make_data_type(m.values.as_ref(), namespace, resolver, use_utf8view)?; + + // https://avro.apache.org/docs/1.11.1/specification/#logical-types + match (m.attributes.logical_type, &mut field.codec) { + (Some("decimal"), c @ Codec::Fixed(_)) => { + return Err(ArrowError::NotYetImplemented( + "Decimals are not currently supported".to_string(), + )) + } + (Some("date"), c @ Codec::Int32) => *c = Codec::Date32, + (Some("time-millis"), c @ Codec::Int32) => *c = Codec::TimeMillis, + (Some("time-micros"), c @ Codec::Int64) => *c = Codec::TimeMicros, + (Some("timestamp-millis"), c @ Codec::Int64) => { + *c = Codec::TimestampMillis(true) + } + (Some("timestamp-micros"), c @ Codec::Int64) => { + *c = Codec::TimestampMicros(true) + } + (Some("local-timestamp-millis"), c @ Codec::Int64) => { + *c = Codec::TimestampMillis(false) + } + (Some("local-timestamp-micros"), c @ Codec::Int64) => { + *c = Codec::TimestampMicros(false) + } + (Some("duration"), c @ Codec::Fixed(12)) => *c = Codec::Interval, + (Some(logical), _) => { + // Insert unrecognized logical type into metadata map + field.metadata.insert("logicalType".into(), logical.into()); + } + (None, _) => {} + } + + if !m.attributes.additional.is_empty() { + for (k, v) in &m.attributes.additional { + field.metadata.insert(k.to_string(), v.to_string()); + } + } + Ok(field) + } }, Schema::Type(t) => { - let mut field = - make_data_type(&Schema::TypeName(t.r#type.clone()), namespace, resolver)?; + let mut field = make_data_type( + &Schema::TypeName(t.r#type.clone()), + namespace, + resolver, + use_utf8view, + )?; // https://avro.apache.org/docs/1.11.1/specification/#logical-types match (t.attributes.logical_type, &mut field.codec) { @@ -327,3 +439,213 @@ fn make_data_type<'a>( } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::schema::{ + Attributes, ComplexType, Fixed, PrimitiveType, Record, Schema, Type, TypeName, + }; + use serde_json; + use std::collections::HashMap; + + fn create_schema_with_logical_type( + primitive_type: PrimitiveType, + logical_type: &'static str, + ) -> Schema<'static> { + let attributes = Attributes { + logical_type: Some(logical_type), + additional: Default::default(), + }; + + Schema::Type(Type { + r#type: TypeName::Primitive(primitive_type), + attributes, + }) + } + + fn create_fixed_schema(size: usize, logical_type: &'static str) -> Schema<'static> { + let attributes = Attributes { + logical_type: Some(logical_type), + additional: Default::default(), + }; + + Schema::Complex(ComplexType::Fixed(Fixed { + name: "fixed_type", + namespace: None, + aliases: Vec::new(), + size, + attributes, + })) + } + + #[test] + fn test_date_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Int, "date"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::Date32)); + } + + #[test] + fn test_time_millis_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Int, "time-millis"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimeMillis)); + } + + #[test] + fn test_time_micros_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Long, "time-micros"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimeMicros)); + } + + #[test] + fn test_timestamp_millis_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-millis"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimestampMillis(true))); + } + + #[test] + fn test_timestamp_micros_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Long, "timestamp-micros"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimestampMicros(true))); + } + + #[test] + fn test_local_timestamp_millis_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-millis"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimestampMillis(false))); + } + + #[test] + fn test_local_timestamp_micros_logical_type() { + let schema = create_schema_with_logical_type(PrimitiveType::Long, "local-timestamp-micros"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::TimestampMicros(false))); + } + + #[test] + fn test_duration_logical_type() { + let mut codec = Codec::Fixed(12); + + if let c @ Codec::Fixed(12) = &mut codec { + *c = Codec::Interval; + } + + assert!(matches!(codec, Codec::Interval)); + } + + #[test] + fn test_decimal_logical_type_not_implemented() { + let mut codec = Codec::Fixed(16); + + let process_decimal = || -> Result<(), ArrowError> { + if let Codec::Fixed(_) = codec { + return Err(ArrowError::NotYetImplemented( + "Decimals are not currently supported".to_string(), + )); + } + Ok(()) + }; + + let result = process_decimal(); + + assert!(result.is_err()); + if let Err(ArrowError::NotYetImplemented(msg)) = result { + assert!(msg.contains("Decimals are not currently supported")); + } else { + panic!("Expected NotYetImplemented error"); + } + } + + #[test] + fn test_unknown_logical_type_added_to_metadata() { + let schema = create_schema_with_logical_type(PrimitiveType::Int, "custom-type"); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert_eq!( + result.metadata.get("logicalType"), + Some(&"custom-type".to_string()) + ); + } + + #[test] + fn test_string_with_utf8view_enabled() { + let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, true).unwrap(); + + assert!(matches!(result.codec, Codec::Utf8View)); + } + + #[test] + fn test_string_without_utf8view_enabled() { + let schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, false).unwrap(); + + assert!(matches!(result.codec, Codec::Utf8)); + } + + #[test] + fn test_record_with_string_and_utf8view_enabled() { + let field_schema = Schema::TypeName(TypeName::Primitive(PrimitiveType::String)); + + let avro_field = crate::schema::Field { + name: "string_field", + r#type: field_schema, + default: None, + doc: None, + }; + + let record = Record { + name: "test_record", + namespace: None, + aliases: vec![], + doc: None, + fields: vec![avro_field], + attributes: Attributes::default(), + }; + + let schema = Schema::Complex(ComplexType::Record(record)); + + let mut resolver = Resolver::default(); + let result = make_data_type(&schema, None, &mut resolver, true).unwrap(); + + if let Codec::Struct(fields) = &result.codec { + let first_field_codec = &fields[0].data_type().codec; + assert!(matches!(first_field_codec, Codec::Utf8View)); + } else { + panic!("Expected Struct codec"); + } + } +} diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs index 11a4af8afd04..1f2e634e34d8 100644 --- a/arrow-avro/src/lib.rs +++ b/arrow-avro/src/lib.rs @@ -28,12 +28,31 @@ #![warn(missing_docs)] #![allow(unused)] // Temporary +pub mod codec; + +pub mod compression; + pub mod reader; -mod schema; -mod compression; +pub mod schema; -mod codec; +pub use reader::ReadOptions; + +/// Extension trait for AvroField to add Utf8View support +/// +/// This trait adds methods for working with Utf8View support to the AvroField struct. +pub trait AvroFieldExt { + /// Returns a new field with Utf8View support enabled for string data + /// + /// This will convert any string data to use StringViewArray instead of StringArray. + fn with_utf8view(&self) -> Self; +} + +impl AvroFieldExt for codec::AvroField { + fn with_utf8view(&self) -> Self { + codec::AvroField::with_utf8view(self) + } +} #[cfg(test)] mod test_util { diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 12fa67d9c8e3..7562ce02ad89 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -22,14 +22,42 @@ use crate::reader::header::{Header, HeaderDecoder}; use arrow_schema::ArrowError; use std::io::BufRead; -mod header; - mod block; - mod cursor; +mod header; mod record; mod vlq; +/// Configuration options for reading Avro data into Arrow arrays +/// +/// This struct contains configuration options that control how Avro data is +/// converted into Arrow arrays. It allows customizing various aspects of the +/// data conversion process. +/// +/// # Examples +/// +/// ``` +/// # use arrow_avro::reader::ReadOptions; +/// // Use default options (regular StringArray for strings) +/// let default_options = ReadOptions::default(); +/// +/// // Enable Utf8View support for better string performance +/// let options = ReadOptions { +/// use_utf8view: true, +/// ..ReadOptions::default() +/// }; +/// ``` +#[derive(Default)] +pub struct ReadOptions { + /// If true, use StringViewArray instead of StringArray for string data + /// + /// When this option is enabled, string data from Avro files will be loaded + /// into Arrow's StringViewArray instead of the standard StringArray. + /// + /// Default: false + pub use_utf8view: bool, +} + /// Read a [`Header`] from the provided [`BufRead`] fn read_header(mut reader: R) -> Result { let mut decoder = HeaderDecoder::default(); @@ -75,24 +103,39 @@ fn read_blocks(mut reader: R) -> impl Iterator RecordBatch { + read_file_with_options(file, batch_size, &crate::ReadOptions::default()) + } + + fn read_file_with_options( + file: &str, + batch_size: usize, + options: &crate::ReadOptions, + ) -> RecordBatch { let file = File::open(file).unwrap(); let mut reader = BufReader::new(file); let header = read_header(&mut reader).unwrap(); let compression = header.compression().unwrap(); let schema = header.schema().unwrap().unwrap(); let root = AvroField::try_from(&schema).unwrap(); - let mut decoder = RecordDecoder::try_new(root.data_type()).unwrap(); + + let mut decoder = if options.use_utf8view { + RecordDecoder::try_new_with_options(root.data_type(), true).unwrap() + } else { + RecordDecoder::try_new(root.data_type()).unwrap() + }; for result in read_blocks(reader) { let block = result.unwrap(); @@ -116,6 +159,46 @@ mod test { decoder.flush().unwrap() } + #[test] + fn test_utf8view_support() { + let schema_json = r#"{ + "type": "record", + "name": "test", + "fields": [{ + "name": "str_field", + "type": "string" + }] + }"#; + + let schema: crate::schema::Schema = serde_json::from_str(schema_json).unwrap(); + let avro_field = AvroField::try_from(&schema).unwrap(); + + let data_type = avro_field.data_type(); + + struct TestHelper; + impl TestHelper { + fn with_utf8view(field: &Field) -> Field { + match field.data_type() { + DataType::Utf8 => { + Field::new(field.name(), DataType::Utf8View, field.is_nullable()) + .with_metadata(field.metadata().clone()) + } + _ => field.clone(), + } + } + } + + let field = TestHelper::with_utf8view(&Field::new("str_field", DataType::Utf8, false)); + + assert_eq!(field.data_type(), &DataType::Utf8View); + + let array = StringViewArray::from(vec!["test1", "test2"]); + let batch = + RecordBatch::try_from_iter(vec![("str_field", Arc::new(array) as ArrayRef)]).unwrap(); + + assert!(batch.column(0).as_any().is::()); + } + #[test] fn test_alltypes() { let files = [ diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 52a58cf63303..e0c14f68cc0f 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -34,14 +34,31 @@ use std::sync::Arc; pub struct RecordDecoder { schema: SchemaRef, fields: Vec, + use_utf8view: bool, } impl RecordDecoder { pub fn try_new(data_type: &AvroDataType) -> Result { + Self::try_new_with_options(data_type, false) + } + + /// Create a new [`RecordDecoder`] from the provided [`AvroDataType`] with additional options + /// + /// This method allows you to customize how the Avro data is decoded into Arrow arrays. + /// In particular, it allows enabling Utf8View support for better string performance. + /// + /// # Parameters + /// * `data_type` - The Avro data type to decode + /// * `use_utf8view` - If true, use StringViewArray instead of StringArray for string data + pub fn try_new_with_options( + data_type: &AvroDataType, + use_utf8view: bool, + ) -> Result { match Decoder::try_new(data_type)? { Decoder::Record(fields, encodings) => Ok(Self { schema: Arc::new(ArrowSchema::new(fields)), fields: encodings, + use_utf8view, }), encoding => Err(ArrowError::ParseError(format!( "Expected record got {encoding:?}" @@ -90,7 +107,10 @@ enum Decoder { TimestampMillis(bool, Vec), TimestampMicros(bool, Vec), Binary(OffsetBufferBuilder, Vec), + /// String data encoded as UTF-8 bytes, mapped to Arrow's StringArray String(OffsetBufferBuilder, Vec), + /// String data encoded as UTF-8 bytes, but mapped to Arrow's StringViewArray + StringView(OffsetBufferBuilder, Vec), List(FieldRef, OffsetBufferBuilder, Box), Record(Fields, Vec), Nullable(Nullability, NullBufferBuilder, Box), @@ -115,6 +135,10 @@ impl Decoder { OffsetBufferBuilder::new(DEFAULT_CAPACITY), Vec::with_capacity(DEFAULT_CAPACITY), ), + Codec::Utf8View => Self::StringView( + OffsetBufferBuilder::new(DEFAULT_CAPACITY), + Vec::with_capacity(DEFAULT_CAPACITY), + ), Codec::Date32 => Self::Date32(Vec::with_capacity(DEFAULT_CAPACITY)), Codec::TimeMillis => Self::TimeMillis(Vec::with_capacity(DEFAULT_CAPACITY)), Codec::TimeMicros => Self::TimeMicros(Vec::with_capacity(DEFAULT_CAPACITY)), @@ -168,7 +192,9 @@ impl Decoder { | Self::TimestampMicros(_, v) => v.push(0), Self::Float32(v) => v.push(0.), Self::Float64(v) => v.push(0.), - Self::Binary(offsets, _) | Self::String(offsets, _) => offsets.push_length(0), + Self::Binary(offsets, _) | Self::String(offsets, _) | Self::StringView(offsets, _) => { + offsets.push_length(0); + } Self::List(_, offsets, e) => { offsets.push_length(0); e.append_null(); @@ -192,7 +218,9 @@ impl Decoder { | Self::TimestampMicros(_, values) => values.push(buf.get_long()?), Self::Float32(values) => values.push(buf.get_float()?), Self::Float64(values) => values.push(buf.get_double()?), - Self::Binary(offsets, values) | Self::String(offsets, values) => { + Self::Binary(offsets, values) + | Self::String(offsets, values) + | Self::StringView(offsets, values) => { let data = buf.get_bytes()?; offsets.push_length(data.len()); values.extend_from_slice(data); @@ -255,6 +283,23 @@ impl Decoder { let values = flush_values(values).into(); Arc::new(StringArray::new(offsets, values, nulls)) } + Self::StringView(offsets, values) => { + let offsets = flush_offsets(offsets); + let values = flush_values(values); + let array = StringArray::new(offsets, values.into(), nulls.clone()); + + let values: Vec<&str> = (0..array.len()) + .map(|i| { + if array.is_valid(i) { + array.value(i) + } else { + "" + } + }) + .collect(); + + Arc::new(StringViewArray::from(values)) + } Self::List(field, offsets, values) => { let values = values.flush(None)?; let offsets = flush_offsets(offsets);