diff --git a/arrow-select/src/lib.rs b/arrow-select/src/lib.rs index 8dd881145a8a..33c1ee8ddb0a 100644 --- a/arrow-select/src/lib.rs +++ b/arrow-select/src/lib.rs @@ -29,6 +29,7 @@ pub mod concat; pub mod dictionary; pub mod filter; pub mod interleave; +pub mod merge; pub mod nullif; pub mod take; pub mod union_extract; diff --git a/arrow-select/src/merge.rs b/arrow-select/src/merge.rs new file mode 100644 index 000000000000..9cac04048ccd --- /dev/null +++ b/arrow-select/src/merge.rs @@ -0,0 +1,386 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`merge`] and [`merge_n`]: Combine values from two or more arrays + +use crate::filter::{SlicesIterator, prep_null_mask_filter}; +use crate::zip::zip; +use arrow_array::{Array, ArrayRef, BooleanArray, Datum, make_array, new_empty_array}; +use arrow_data::ArrayData; +use arrow_data::transform::MutableArrayData; +use arrow_schema::ArrowError; + +/// An index for the [merge] function. +/// +/// This trait allows the indices argument for [merge] to be stored using a more +/// compact representation than `usize` when the input arrays are small. +/// If the number of input arrays is less than 256 for instance, the indices can be stored as `u8`. +/// +/// Implementation must ensure that all values which return `None` from [MergeIndex::index] are +/// considered equal by the [PartialEq] and [Eq] implementations. +pub trait MergeIndex: PartialEq + Eq + Copy { + /// Returns the index value as an `Option`. + /// + /// `None` values returned by this function indicate holes in the index array and will result + /// in null values in the array created by [merge]. + fn index(&self) -> Option; +} + +impl MergeIndex for usize { + fn index(&self) -> Option { + Some(*self) + } +} + +impl MergeIndex for Option { + fn index(&self) -> Option { + *self + } +} + +/// Merges elements by index from a list of [`Array`], creating a new [`Array`] from +/// those values. +/// +/// Each element in `indices` is the index of an array in `values`. The `indices` array is processed +/// sequentially. The first occurrence of index value `n` will be mapped to the first +/// value of the array at index `n`. The second occurrence to the second value, and so on. +/// An index value where `MergeIndex::index` returns `None` is interpreted as a null value. +/// +/// # Implementation notes +/// +/// This algorithm is similar in nature to both [zip] and +/// [interleave](crate::interleave::interleave), but there are some important differences. +/// +/// In contrast to [zip], this function supports multiple input arrays. Instead of +/// a boolean selection vector, an index array is to take values from the input arrays, and a special +/// marker values can be used to indicate null values. +/// +/// In contrast to [interleave](crate::interleave::interleave), this function does not use pairs of +/// indices. The values in `indices` serve the same purpose as the first value in the pairs passed +/// to `interleave`. +/// The index in the array is implicit and is derived from the number of times a particular array +/// index occurs. +/// The more constrained indexing mechanism used by this algorithm makes it easier to copy values +/// in contiguous slices. In the example below, the two subsequent elements from array `2` can be +/// copied in a single operation from the source array instead of copying them one by one. +/// Long spans of null values are also especially cheap because they do not need to be represented +/// in an input array. +/// +/// # Safety +/// +/// This function does not check that the number of occurrences of any particular array index matches +/// the length of the corresponding input array. If an array contains more values than required, the +/// spurious values will be ignored. If an array contains fewer values than necessary, this function +/// will panic. +/// +/// # Example +/// +/// ```text +/// ┌───────────┐ ┌─────────┐ ┌─────────┐ +/// │┌─────────┐│ │ None │ │ NULL │ +/// ││ A ││ ├─────────┤ ├─────────┤ +/// │└─────────┘│ │ 1 │ │ B │ +/// │┌─────────┐│ ├─────────┤ ├─────────┤ +/// ││ B ││ │ 0 │ merge(values, indices) │ A │ +/// │└─────────┘│ ├─────────┤ ─────────────────────────▶ ├─────────┤ +/// │┌─────────┐│ │ None │ │ NULL │ +/// ││ C ││ ├─────────┤ ├─────────┤ +/// │├─────────┤│ │ 2 │ │ C │ +/// ││ D ││ ├─────────┤ ├─────────┤ +/// │└─────────┘│ │ 2 │ │ D │ +/// └───────────┘ └─────────┘ └─────────┘ +/// values indices result +/// +/// ``` +pub fn merge_n(values: &[&dyn Array], indices: &[impl MergeIndex]) -> Result { + let data_type = values[0].data_type(); + + for array in values.iter().skip(1) { + if array.data_type() != data_type { + return Err(ArrowError::InvalidArgumentError(format!( + "It is not possible to merge arrays of different data types ({} and {})", + data_type, + array.data_type() + ))); + } + } + + if indices.is_empty() { + return Ok(new_empty_array(data_type)); + } + + #[cfg(debug_assertions)] + for ix in indices { + if let Some(index) = ix.index() { + assert!( + index < values.len(), + "Index out of bounds: {} >= {}", + index, + values.len() + ); + } + } + + let data: Vec = values.iter().map(|a| a.to_data()).collect(); + let data_refs = data.iter().collect(); + + let mut mutable = MutableArrayData::new(data_refs, true, indices.len()); + + // This loop extends the mutable array by taking slices from the partial results. + // + // take_offsets keeps track of how many values have been taken from each array. + let mut take_offsets = vec![0; values.len() + 1]; + let mut start_row_ix = 0; + loop { + let array_ix = indices[start_row_ix]; + + // Determine the length of the slice to take. + let mut end_row_ix = start_row_ix + 1; + while end_row_ix < indices.len() && indices[end_row_ix] == array_ix { + end_row_ix += 1; + } + let slice_length = end_row_ix - start_row_ix; + + // Extend mutable with either nulls or with values from the array. + match array_ix.index() { + None => mutable.extend_nulls(slice_length), + Some(index) => { + let start_offset = take_offsets[index]; + let end_offset = start_offset + slice_length; + mutable.extend(index, start_offset, end_offset); + take_offsets[index] = end_offset; + } + } + + if end_row_ix == indices.len() { + break; + } else { + // Set the start_row_ix for the next slice. + start_row_ix = end_row_ix; + } + } + + Ok(make_array(mutable.freeze())) +} + +/// Merges two arrays in the order specified by a boolean mask. +/// +/// This algorithm is a variant of [zip] that does not require the truthy and +/// falsy arrays to have the same length. +/// +/// When truthy of falsy are [Scalar](arrow_array::Scalar), the single +/// scalar value is repeated whenever the mask array contains true or false respectively. +/// +/// # Example +/// +/// ```text +/// truthy +/// ┌─────────┐ mask +/// │ A │ ┌─────────┐ ┌─────────┐ +/// ├─────────┤ │ true │ │ A │ +/// │ C │ ├─────────┤ ├─────────┤ +/// ├─────────┤ │ true │ │ C │ +/// │ NULL │ ├─────────┤ ├─────────┤ +/// ├─────────┤ │ false │ merge(mask, truthy, falsy) │ B │ +/// │ D │ ├─────────┤ ─────────────────────────▶ ├─────────┤ +/// └─────────┘ │ true │ │ NULL │ +/// falsy ├─────────┤ ├─────────┤ +/// ┌─────────┐ │ false │ │ E │ +/// │ B │ ├─────────┤ ├─────────┤ +/// ├─────────┤ │ true │ │ D │ +/// │ E │ └─────────┘ └─────────┘ +/// └─────────┘ +/// ``` +pub fn merge( + mask: &BooleanArray, + truthy: &dyn Datum, + falsy: &dyn Datum, +) -> Result { + let (truthy_array, truthy_is_scalar) = truthy.get(); + let (falsy_array, falsy_is_scalar) = falsy.get(); + + if truthy_is_scalar && falsy_is_scalar { + // When both truthy and falsy are scalars, we can use `zip` since the result is the same + // and zip has optimized code for scalars. + return zip(mask, truthy, falsy); + } + + if truthy_array.data_type() != falsy_array.data_type() { + return Err(ArrowError::InvalidArgumentError( + "arguments need to have the same data type".into(), + )); + } + + if truthy_is_scalar && truthy_array.len() != 1 { + return Err(ArrowError::InvalidArgumentError( + "scalar arrays must have 1 element".into(), + )); + } + if falsy_is_scalar && falsy_array.len() != 1 { + return Err(ArrowError::InvalidArgumentError( + "scalar arrays must have 1 element".into(), + )); + } + + let falsy = falsy_array.to_data(); + let truthy = truthy_array.to_data(); + + let mut mutable = MutableArrayData::new(vec![&truthy, &falsy], false, truthy.len()); + + // the SlicesIterator slices only the true values. So the gaps left by this iterator we need to + // fill with falsy values + + // keep track of how much is filled + let mut filled = 0; + let mut falsy_offset = 0; + let mut truthy_offset = 0; + + // Ensure nulls are treated as false + let mask_buffer = match mask.null_count() { + 0 => mask.values().clone(), + _ => prep_null_mask_filter(mask).into_parts().0, + }; + + SlicesIterator::from(&mask_buffer).for_each(|(start, end)| { + // the gap needs to be filled with falsy values + if start > filled { + if falsy_is_scalar { + for _ in filled..start { + // Copy the first item from the 'falsy' array into the output buffer. + mutable.extend(1, 0, 1); + } + } else { + let falsy_length = start - filled; + let falsy_end = falsy_offset + falsy_length; + mutable.extend(1, falsy_offset, falsy_end); + falsy_offset = falsy_end; + } + } + // fill with truthy values + if truthy_is_scalar { + for _ in start..end { + // Copy the first item from the 'truthy' array into the output buffer. + mutable.extend(0, 0, 1); + } + } else { + let truthy_length = end - start; + let truthy_end = truthy_offset + truthy_length; + mutable.extend(0, truthy_offset, truthy_end); + truthy_offset = truthy_end; + } + filled = end; + }); + // the remaining part is falsy + if filled < mask.len() { + if falsy_is_scalar { + for _ in filled..mask.len() { + // Copy the first item from the 'falsy' array into the output buffer. + mutable.extend(1, 0, 1); + } + } else { + let falsy_length = mask.len() - filled; + let falsy_end = falsy_offset + falsy_length; + mutable.extend(1, falsy_offset, falsy_end); + } + } + + let data = mutable.freeze(); + Ok(make_array(data)) +} + +#[cfg(test)] +mod tests { + use crate::merge::{MergeIndex, merge, merge_n}; + use arrow_array::cast::AsArray; + use arrow_array::{Array, BooleanArray, StringArray}; + + #[derive(PartialEq, Eq, Copy, Clone)] + struct CompactMergeIndex { + index: u8, + } + + impl MergeIndex for CompactMergeIndex { + fn index(&self) -> Option { + if self.index == u8::MAX { + None + } else { + Some(self.index as usize) + } + } + } + + #[test] + fn test_merge() { + let a1 = StringArray::from(vec![Some("A"), Some("B"), Some("E"), None]); + let a2 = StringArray::from(vec![Some("C"), Some("D")]); + + let indices = BooleanArray::from(vec![true, false, true, false, true, true]); + + let merged = merge(&indices, &a1, &a2).unwrap(); + let merged = merged.as_string::(); + + assert_eq!(merged.len(), indices.len()); + assert!(merged.is_valid(0)); + assert_eq!(merged.value(0), "A"); + assert!(merged.is_valid(1)); + assert_eq!(merged.value(1), "C"); + assert!(merged.is_valid(2)); + assert_eq!(merged.value(2), "B"); + assert!(merged.is_valid(3)); + assert_eq!(merged.value(3), "D"); + assert!(merged.is_valid(4)); + assert_eq!(merged.value(4), "E"); + assert!(!merged.is_valid(5)); + } + + #[test] + fn test_merge_n() { + let a1 = StringArray::from(vec![Some("A")]); + let a2 = StringArray::from(vec![Some("B"), None, None]); + let a3 = StringArray::from(vec![Some("C"), Some("D")]); + + let indices = vec![ + CompactMergeIndex { index: u8::MAX }, + CompactMergeIndex { index: 1 }, + CompactMergeIndex { index: 0 }, + CompactMergeIndex { index: u8::MAX }, + CompactMergeIndex { index: 2 }, + CompactMergeIndex { index: 2 }, + CompactMergeIndex { index: 1 }, + CompactMergeIndex { index: 1 }, + ]; + + let arrays = [a1, a2, a3]; + let array_refs = arrays.iter().map(|a| a as &dyn Array).collect::>(); + let merged = merge_n(&array_refs, &indices).unwrap(); + let merged = merged.as_string::(); + + assert_eq!(merged.len(), indices.len()); + assert!(!merged.is_valid(0)); + assert!(merged.is_valid(1)); + assert_eq!(merged.value(1), "B"); + assert!(merged.is_valid(2)); + assert_eq!(merged.value(2), "A"); + assert!(!merged.is_valid(3)); + assert!(merged.is_valid(4)); + assert_eq!(merged.value(4), "C"); + assert!(merged.is_valid(5)); + assert_eq!(merged.value(5), "D"); + assert!(!merged.is_valid(6)); + assert!(!merged.is_valid(7)); + } +} diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 743628c8c7d1..51dcc8396d91 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -177,6 +177,11 @@ name = "interleave_kernels" harness = false required-features = ["test_utils"] +[[bench]] +name = "merge_kernels" +harness = false +required-features = ["test_utils"] + [[bench]] name = "zip_kernels" harness = false diff --git a/arrow/benches/merge_kernels.rs b/arrow/benches/merge_kernels.rs new file mode 100644 index 000000000000..d52bbccd625b --- /dev/null +++ b/arrow/benches/merge_kernels.rs @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::measurement::WallTime; +use criterion::{BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main}; +use rand::distr::{Distribution, StandardUniform}; +use rand::prelude::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint; +use std::sync::Arc; + +use arrow::array::*; +use arrow::datatypes::*; +use arrow::util::bench_util::*; +use arrow_select::merge::merge; + +trait InputGenerator { + fn name(&self) -> &str; + + /// Return an ArrayRef containing a single null value + fn generate_scalar_with_null_value(&self) -> ArrayRef; + + /// Generate a `number_of_scalars` unique scalars + fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) -> Vec; + + /// Generate an array with the specified length and null percentage + fn generate_array(&self, seed: u64, array_length: usize, null_percentage: f32) -> ArrayRef; +} + +struct GeneratePrimitive { + description: String, + _marker: std::marker::PhantomData, +} + +impl InputGenerator for GeneratePrimitive +where + T: ArrowPrimitiveType, + StandardUniform: Distribution, +{ + fn name(&self) -> &str { + self.description.as_str() + } + + fn generate_scalar_with_null_value(&self) -> ArrayRef { + new_null_array(&T::DATA_TYPE, 1) + } + + fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) -> Vec { + let rng = StdRng::seed_from_u64(seed); + + rng.sample_iter::(StandardUniform) + .take(number_of_scalars) + .map(|v: T::Native| { + Arc::new(PrimitiveArray::::new_scalar(v).into_inner()) as ArrayRef + }) + .collect() + } + + fn generate_array(&self, seed: u64, array_length: usize, null_percentage: f32) -> ArrayRef { + Arc::new(create_primitive_array_with_seed::( + array_length, + null_percentage, + seed, + )) + } +} + +struct GenerateBytes { + range_length: std::ops::Range, + description: String, + + _marker: std::marker::PhantomData, +} + +impl InputGenerator for GenerateBytes +where + Byte: ByteArrayType, +{ + fn name(&self) -> &str { + self.description.as_str() + } + + fn generate_scalar_with_null_value(&self) -> ArrayRef { + new_null_array(&Byte::DATA_TYPE, 1) + } + + fn generate_non_null_scalars(&self, seed: u64, number_of_scalars: usize) -> Vec { + let array = self.generate_array(seed, number_of_scalars, 0.0); + + (0..number_of_scalars).map(|i| array.slice(i, 1)).collect() + } + + fn generate_array(&self, seed: u64, array_length: usize, null_percentage: f32) -> ArrayRef { + let is_binary = + Byte::DATA_TYPE == DataType::Binary || Byte::DATA_TYPE == DataType::LargeBinary; + if is_binary { + Arc::new(create_binary_array_with_len_range_and_prefix_and_seed::< + Byte::Offset, + >( + array_length, + null_percentage, + self.range_length.start, + self.range_length.end - 1, + &[], + seed, + )) + } else { + Arc::new(create_string_array_with_len_range_and_prefix_and_seed::< + Byte::Offset, + >( + array_length, + null_percentage, + self.range_length.start, + self.range_length.end - 1, + "", + seed, + )) + } + } +} + +fn mask_cases(len: usize) -> Vec<(&'static str, BooleanArray)> { + vec![ + ("all_true", create_boolean_array(len, 0.0, 1.0)), + ("99pct_true", create_boolean_array(len, 0.0, 0.99)), + ("90pct_true", create_boolean_array(len, 0.0, 0.9)), + ("50pct_true", create_boolean_array(len, 0.0, 0.5)), + ("10pct_true", create_boolean_array(len, 0.0, 0.1)), + ("1pct_true", create_boolean_array(len, 0.0, 0.01)), + ("all_false", create_boolean_array(len, 0.0, 0.0)), + ("50pct_nulls", create_boolean_array(len, 0.5, 0.5)), + ] +} + +fn bench_merge_on_input_generator(c: &mut Criterion, input_generator: &impl InputGenerator) { + const ARRAY_LEN: usize = 8192; + + let mut group = + c.benchmark_group(format!("merge_{ARRAY_LEN}_from_{}", input_generator.name()).as_str()); + + let null_scalar = input_generator.generate_scalar_with_null_value(); + let [non_null_scalar_1, non_null_scalar_2]: [_; 2] = input_generator + .generate_non_null_scalars(42, 2) + .try_into() + .unwrap(); + + // For simplicity, we generate arrays with length ARRAY_LEN. Not all input values will be used. + let array_1_10pct_nulls = input_generator.generate_array(42, ARRAY_LEN, 0.1); + let array_2_10pct_nulls = input_generator.generate_array(18, ARRAY_LEN, 0.1); + + let masks = mask_cases(ARRAY_LEN); + + // Benchmarks for different scalar combinations + for (description, truthy, falsy) in &[ + ("null_vs_non_null_scalar", &null_scalar, &non_null_scalar_1), + ( + "non_null_scalar_vs_null_scalar", + &non_null_scalar_1, + &null_scalar, + ), + ("non_nulls_scalars", &non_null_scalar_1, &non_null_scalar_2), + ] { + bench_merge_input_on_all_masks( + description, + &mut group, + &masks, + &Scalar::new(truthy), + &Scalar::new(falsy), + ); + } + + bench_merge_input_on_all_masks( + "array_vs_non_null_scalar", + &mut group, + &masks, + &array_1_10pct_nulls, + &non_null_scalar_1, + ); + + bench_merge_input_on_all_masks( + "non_null_scalar_vs_array", + &mut group, + &masks, + &array_1_10pct_nulls, + &non_null_scalar_1, + ); + + bench_merge_input_on_all_masks( + "array_vs_array", + &mut group, + &masks, + &array_1_10pct_nulls, + &array_2_10pct_nulls, + ); + + group.finish(); +} + +fn bench_merge_input_on_all_masks( + description: &str, + group: &mut BenchmarkGroup, + masks: &[(&str, BooleanArray)], + truthy: &impl Datum, + falsy: &impl Datum, +) { + for (mask_description, mask) in masks { + let id = BenchmarkId::new(description, mask_description); + group.bench_with_input(id, mask, |b, mask| { + b.iter(|| hint::black_box(merge(mask, truthy, falsy))) + }); + } +} + +fn add_benchmark(c: &mut Criterion) { + // Primitive + bench_merge_on_input_generator( + c, + &GeneratePrimitive:: { + description: "i32".to_string(), + _marker: std::marker::PhantomData, + }, + ); + + // Short strings + bench_merge_on_input_generator( + c, + &GenerateBytes::> { + description: "short strings (3..10)".to_string(), + range_length: 3..10, + _marker: std::marker::PhantomData, + }, + ); + + // Long strings + bench_merge_on_input_generator( + c, + &GenerateBytes::> { + description: "long strings (100..400)".to_string(), + range_length: 100..400, + _marker: std::marker::PhantomData, + }, + ); + + // Short Bytes + bench_merge_on_input_generator( + c, + &GenerateBytes::> { + description: "short bytes (3..10)".to_string(), + range_length: 3..10, + _marker: std::marker::PhantomData, + }, + ); + + // Long Bytes + bench_merge_on_input_generator( + c, + &GenerateBytes::> { + description: "long bytes (100..400)".to_string(), + range_length: 100..400, + _marker: std::marker::PhantomData, + }, + ); +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/arrow/src/compute/kernels.rs b/arrow/src/compute/kernels.rs index ff8d4a5ad97c..466f24205339 100644 --- a/arrow/src/compute/kernels.rs +++ b/arrow/src/compute/kernels.rs @@ -22,7 +22,7 @@ pub use arrow_cast::cast; pub use arrow_cast::parse as cast_utils; pub use arrow_ord::{cmp, partition, rank, sort}; pub use arrow_select::{ - coalesce, concat, filter, interleave, nullif, take, union_extract, window, zip, + coalesce, concat, filter, interleave, merge, nullif, take, union_extract, window, zip, }; pub use arrow_string::{concat_elements, length, regexp, substring};