diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 2f31a290e398..1d2737a0c629 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -74,7 +74,7 @@ ring = { version = "0.17", default-features = false, features = ["std"], optiona [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } -criterion = { version = "0.5", default-features = false } +criterion = { version = "0.5", default-features = false, features = ["async_futures"] } snap = { version = "1.0", default-features = false } tempfile = { version = "3.0", default-features = false } brotli = { version = "7.0", default-features = false, features = ["std"] } @@ -211,6 +211,10 @@ name = "arrow_statistics" required-features = ["arrow"] harness = false +[[bench]] +name = "arrow_reader_row_filter" +required-features = ["arrow", "async"] +harness = false [[bench]] name = "compression" diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs new file mode 100644 index 000000000000..e59ba59cc69d --- /dev/null +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -0,0 +1,565 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark for evaluating row filters and projections on a Parquet file. +//! +//! # Background: +//! +//! As described in [Efficient Filter Pushdown in Parquet], evaluating +//! pushdown filters is a two-step process: +//! +//! 1. Build a filter mask by decoding and evaluating filter functions on +//! the filter column(s). +//! +//! 2. Decode the rows that match the filter mask from the projected columns. +//! +//! The performance depends on factors such as the number of rows selected, +//! the clustering of results (which affects the efficiency of the filter mask), +//! and whether the same column is used for both filtering and projection. +//! +//! This benchmark helps measure the performance of these operations. +//! +//! [Efficient Filter Pushdown in Parquet]: https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/ +//! +//! The benchmark creates an in-memory Parquet file with 100K rows and ten columns. +//! The first four columns are: +//! - int64: random integers (range: 0..100) generated with a fixed seed. +//! - float64: random floating-point values (range: 0.0..100.0) generated with a fixed seed. +//! - utf8View: random strings with some empty values and occasional constant "const" values. +//! - ts: sequential timestamps in milliseconds. +//! +//! The following six columns (for filtering) are generated to mimic different +//! filter selectivity and clustering patterns: +//! - pt: for Point Lookup – exactly one row is set to "unique_point", all others are random strings. +//! - sel: for Selective Unclustered – exactly 1% of rows (those with i % 100 == 0) are "selected". +//! - mod_clustered: for Moderately Selective Clustered – in each 10K-row block, the first 10 rows are "mod_clustered". +//! - mod_unclustered: for Moderately Selective Unclustered – exactly 10% of rows (those with i % 10 == 1) are "mod_unclustered". +//! - unsel_unclustered: for Unselective Unclustered – exactly 99% of rows (those with i % 100 != 0) are "unsel_unclustered". +//! - unsel_clustered: for Unselective Clustered – in each 10K-row block, rows with an offset >= 1000 are "unsel_clustered". +//! + +use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, TimestampMillisecondArray}; +use arrow::compute::and; +use arrow::compute::kernels::cmp::{eq, gt, lt, neq}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use arrow::record_batch::RecordBatch; +use arrow_array::builder::StringViewBuilder; +use arrow_array::StringViewArray; +use arrow_cast::pretty::pretty_format_batches; +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, +}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::basic::Compression; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; +use parquet::file::properties::WriterProperties; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use std::ops::Range; +use std::sync::Arc; + +/// Generates a random string. Has a 50% chance to generate a short string (3–11 characters) +/// or a long string (13–20 characters). +fn random_string(rng: &mut StdRng) -> String { + let charset = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + let is_long = rng.random_bool(0.5); + let len = if is_long { + rng.random_range(13..21) + } else { + rng.random_range(3..12) + }; + (0..len) + .map(|_| charset[rng.random_range(0..charset.len())] as char) + .collect() +} + +/// Creates an int64 array of a given size with random integers in [0, 100). +/// Then, it overwrites a single random index with 9999 to serve as the unique value for point lookup. +fn create_int64_array(size: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(42); + let mut values: Vec = (0..size).map(|_| rng.random_range(0..100)).collect(); + let unique_index = rng.random_range(0..size); + values[unique_index] = 9999; // Unique value for point lookup + Arc::new(Int64Array::from(values)) as ArrayRef +} + +/// Creates a float64 array of a given size with random floats in [0.0, 100.0). +fn create_float64_array(size: usize) -> ArrayRef { + let mut rng = StdRng::seed_from_u64(43); + let values: Vec = (0..size).map(|_| rng.random_range(0.0..100.0)).collect(); + Arc::new(Float64Array::from(values)) as ArrayRef +} + +/// Creates a utf8View array of a given size with random strings. +/// Now, this column is used in one filter case. +fn create_utf8_view_array(size: usize, null_density: f32) -> ArrayRef { + let mut builder = StringViewBuilder::with_capacity(size); + let mut rng = StdRng::seed_from_u64(44); + for _ in 0..size { + let choice = rng.random_range(0..100); + if choice < (null_density * 100.0) as u32 { + builder.append_value(""); + } else if choice < 25 { + builder.append_value("const"); + } else { + builder.append_value(random_string(&mut rng)); + } + } + Arc::new(builder.finish()) as ArrayRef +} + +/// Creates a ts (timestamp) array of a given size. Each value is computed as i % 10_000, +/// which simulates repeating blocks (each block of 10,000) to model clustered patterns. +fn create_ts_array(size: usize) -> ArrayRef { + let values: Vec = (0..size).map(|i| (i % 10_000) as i64).collect(); + Arc::new(TimestampMillisecondArray::from(values)) as ArrayRef +} + +/// Creates a RecordBatch with 100K rows and 4 columns: int64, float64, utf8View, and ts. +fn create_record_batch(size: usize) -> RecordBatch { + let fields = vec![ + Field::new("int64", DataType::Int64, false), + Field::new("float64", DataType::Float64, false), + Field::new("utf8View", DataType::Utf8View, true), + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + ]; + let schema = Arc::new(Schema::new(fields)); + + let int64_array = create_int64_array(size); + let float64_array = create_float64_array(size); + let utf8_array = create_utf8_view_array(size, 0.2); + let ts_array = create_ts_array(size); + + let arrays: Vec = vec![int64_array, float64_array, utf8_array, ts_array]; + RecordBatch::try_new(schema, arrays).unwrap() +} + +/// Writes the RecordBatch to an in memory buffer, returning the buffer +fn write_parquet_file() -> Vec { + let batch = create_record_batch(100_000); + println!("Batch created with {} rows", 100_000); + println!( + "First 100 rows:\n{}", + pretty_format_batches(&[batch.clone().slice(0, 100)]).unwrap() + ); + let schema = batch.schema(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let mut buffer = vec![]; + { + let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + buffer +} + +/// ProjectionCase defines the projection mode for the benchmark: +/// either projecting all columns or excluding the column that is used for filtering. +#[derive(Clone)] +enum ProjectionCase { + AllColumns, + ExcludeFilterColumn, +} + +impl std::fmt::Display for ProjectionCase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ProjectionCase::AllColumns => write!(f, "all_columns"), + ProjectionCase::ExcludeFilterColumn => write!(f, "exclude_filter_column"), + } + } +} + +/// FilterType encapsulates the different filter comparisons. +/// The variants correspond to the different filter patterns. +#[derive(Clone, Copy, Debug)] +enum FilterType { + /// "Point Lookup": selects a single row + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ │ │ ... │ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ ... │ + /// │ │ │ │ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (1 RowSelection of 1 row) + PointLookup, + /// selective (1%) unclustered filter + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ ... │ │ │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │ │ │ ... │ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (1000 RowSelection of 10 rows each) + SelectiveUnclustered, + /// moderately selective (10%) clustered filter + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ │ │ │ + /// │ │ │ ... │ + /// │ │ │ │ + /// │ ... │ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (10 RowSelections of 10,000 rows each) + ModeratelySelectiveClustered, + /// moderately selective (10%) clustered filter + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ ... │ │ │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │ │ │ │ + /// │ │ │ ... │ + /// │ ... │ │ │ + /// │ │ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (10 RowSelections of 10,000 rows each) + ModeratelySelectiveUnclustered, + /// unselective (99%) unclustered filter + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (99,000 RowSelections of 10 rows each) + UnselectiveUnclustered, + /// unselective (90%) clustered filter + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ │ │ │ + /// │ │ │ ... │ + /// │ │ │ │ + /// │ ... │ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// └───────────────┘ └───────────────┘ + /// ``` + /// (99 RowSelection of 10,000 rows each) + UnselectiveClustered, + /// [`Self::SelectivelUnclusered`] `AND` + /// [`Self::ModeratelySelectiveClustered`] + Composite, + /// `utf8View <> ''` modeling [ClickBench] [Q21-Q27] + /// + /// [ClickBench]: https://github.com/ClickHouse/ClickBench + /// [Q21-Q27]: https://github.com/apache/datafusion/blob/b7177234e65cbbb2dcc04c252f6acd80bb026362/benchmarks/queries/clickbench/queries.sql#L22-L28 + Utf8ViewNonEmpty, +} + +impl std::fmt::Display for FilterType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + FilterType::PointLookup => "int64 == 9999", + FilterType::SelectiveUnclustered => "float64 > 99.0", + FilterType::ModeratelySelectiveClustered => "ts >= 9000", + FilterType::ModeratelySelectiveUnclustered => "int64 > 90", + FilterType::UnselectiveUnclustered => "float64 <= 99.0", + FilterType::UnselectiveClustered => "ts < 9000", + FilterType::Composite => "float64 > 99.0 AND ts >= 9000", + FilterType::Utf8ViewNonEmpty => "utf8View <> ''", + }; + write!(f, "{}", s) + } +} + +impl FilterType { + /// Applies the specified filter on the given RecordBatch and returns a BooleanArray mask. + fn filter_batch(&self, batch: &RecordBatch) -> arrow::error::Result { + match self { + // Point Lookup on int64 column + FilterType::PointLookup => { + let array = batch.column(batch.schema().index_of("int64")?); + let scalar = Int64Array::new_scalar(9999); + eq(array, &scalar) + } + // Selective Unclustered on float64 column: float64 > 99.0 + FilterType::SelectiveUnclustered => { + let array = batch.column(batch.schema().index_of("float64")?); + let scalar = Float64Array::new_scalar(99.0); + gt(array, &scalar) + } + // Moderately Selective Clustered on ts column: ts >= 9000 (implemented as > 8999) + FilterType::ModeratelySelectiveClustered => { + let array = batch.column(batch.schema().index_of("ts")?); + gt(array, &TimestampMillisecondArray::new_scalar(8999)) + } + // Moderately Selective Unclustered on int64 column: int64 > 90 + FilterType::ModeratelySelectiveUnclustered => { + let array = batch.column(batch.schema().index_of("int64")?); + let scalar = Int64Array::new_scalar(90); + gt(array, &scalar) + } + // Unselective Unclustered on float64 column: NOT (float64 > 99.0) + FilterType::UnselectiveUnclustered => { + let array = batch.column(batch.schema().index_of("float64")?); + gt(array, &Float64Array::new_scalar(99.0)) + } + // Unselective Clustered on ts column: ts < 9000 + FilterType::UnselectiveClustered => { + let array = batch.column(batch.schema().index_of("ts")?); + lt(array, &TimestampMillisecondArray::new_scalar(9000)) + } + // Composite filter: logical AND of (float64 > 99.0) and (ts >= 9000) + FilterType::Composite => { + let mask1 = FilterType::SelectiveUnclustered.filter_batch(batch)?; + let mask2 = FilterType::ModeratelySelectiveClustered.filter_batch(batch)?; + and(&mask1, &mask2) + } + // Utf8ViewNonEmpty: selects rows where the utf8View column is not an empty string. + FilterType::Utf8ViewNonEmpty => { + let array = batch.column(batch.schema().index_of("utf8View")?); + let scalar = StringViewArray::new_scalar(""); + neq(array, &scalar) + } + } + } + + /// Return the indexes in the batch's schema that are used for filtering. + fn filter_projection(&self) -> &'static [usize] { + match self { + FilterType::PointLookup => &[0], + FilterType::SelectiveUnclustered => &[1], + FilterType::ModeratelySelectiveClustered => &[3], + FilterType::ModeratelySelectiveUnclustered => &[0], + FilterType::UnselectiveUnclustered => &[1], + FilterType::UnselectiveClustered => &[3], + FilterType::Composite => &[1, 3], // Use float64 column and ts column as representative for composite + FilterType::Utf8ViewNonEmpty => &[2], + } + } +} + +/// Benchmark filters and projections by reading the Parquet file. +/// This benchmark iterates over all individual filter types and two projection cases. +/// It measures the time to read and filter the Parquet file according to each scenario. +fn benchmark_filters_and_projections(c: &mut Criterion) { + // make the parquet file in memory that can be shared + let parquet_file = Bytes::from(write_parquet_file()); + let filter_types = vec![ + FilterType::PointLookup, + FilterType::SelectiveUnclustered, + FilterType::ModeratelySelectiveClustered, + FilterType::ModeratelySelectiveUnclustered, + FilterType::UnselectiveUnclustered, + FilterType::UnselectiveClustered, + FilterType::Utf8ViewNonEmpty, + FilterType::Composite, + ]; + let projection_cases = vec![ + ProjectionCase::AllColumns, + ProjectionCase::ExcludeFilterColumn, + ]; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("arrow_reader_row_filter"); + + for filter_type in filter_types { + for proj_case in &projection_cases { + // All indices corresponding to the 10 columns. + let all_indices = vec![0, 1, 2, 3]; + let filter_col = filter_type.filter_projection().to_vec(); + // For the projection, either select all columns or exclude the filter column(s). + let output_projection: Vec = match proj_case { + ProjectionCase::AllColumns => all_indices.clone(), + ProjectionCase::ExcludeFilterColumn => all_indices + .into_iter() + .filter(|i| !filter_col.contains(i)) + .collect(), + }; + + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + + let schema_descr = metadata.file_metadata().schema_descr(); + let projection_mask = ProjectionMask::roots(schema_descr, output_projection.clone()); + let pred_mask = ProjectionMask::roots(schema_descr, filter_col.clone()); + + let benchmark_name = format!("{filter_type:?}/{proj_case}",); + + // run the benchmark for the async reader + let bench_id = BenchmarkId::new(benchmark_name.clone(), "async"); + let rt_captured = rt.handle().clone(); + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let projection_mask = projection_mask.clone(); + // row filters are not clone, so must make it each iter + let filter = ArrowPredicateFn::new(pred_mask, move |batch: RecordBatch| { + Ok(filter_type.filter_batch(&batch).unwrap()) + }); + let row_filter = RowFilter::new(vec![Box::new(filter)]); + + rt_captured.block_on(async { + benchmark_async_reader(reader, projection_mask, row_filter).await; + }) + }); + }); + + // run the benchmark for the sync reader + let bench_id = BenchmarkId::new(benchmark_name, "sync"); + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let projection_mask = projection_mask.clone(); + // row filters are not clone, so must make it each iter + let filter = ArrowPredicateFn::new(pred_mask, move |batch: RecordBatch| { + Ok(filter_type.filter_batch(&batch).unwrap()) + }); + let row_filter = RowFilter::new(vec![Box::new(filter)]); + + benchmark_sync_reader(reader, projection_mask, row_filter) + }); + }); + } + } +} + +/// Use async API +async fn benchmark_async_reader( + reader: InMemoryReader, + projection_mask: ProjectionMask, + row_filter: RowFilter, +) { + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .with_row_filter(row_filter) + .build() + .unwrap(); + while let Some(b) = stream.next().await { + b.unwrap(); // consume the batches, no buffering + } +} + +/// Use sync API +fn benchmark_sync_reader( + reader: InMemoryReader, + projection_mask: ProjectionMask, + row_filter: RowFilter, +) { + let stream = ParquetRecordBatchReaderBuilder::try_new(reader.into_inner()) + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .with_row_filter(row_filter) + .build() + .unwrap(); + for b in stream { + b.unwrap(); // consume the batches, no buffering + } +} + +/// Adapter to read asynchronously from in memory bytes and always loads the +/// metadata with page indexes. +#[derive(Debug, Clone)] +struct InMemoryReader { + inner: Bytes, + metadata: Arc, +} + +impl InMemoryReader { + fn try_new(inner: &Bytes) -> parquet::errors::Result { + let mut metadata_reader = ParquetMetaDataReader::new().with_page_indexes(true); + metadata_reader.try_parse(inner)?; + let metadata = metadata_reader.finish().map(Arc::new)?; + + Ok(Self { + // clone of bytes is cheap -- increments a refcount + inner: inner.clone(), + metadata, + }) + } + + fn metadata(&self) -> &Arc { + &self.metadata + } + + fn into_inner(self) -> Bytes { + self.inner + } +} + +impl AsyncFileReader for InMemoryReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + let data = self.inner.slice(range.start as usize..range.end as usize); + async move { Ok(data) }.boxed() + } + + fn get_metadata<'a>( + &'a mut self, + _options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result>> { + let metadata = Arc::clone(&self.metadata); + async move { Ok(metadata) }.boxed() + } +} + +criterion_group!(benches, benchmark_filters_and_projections,); +criterion_main!(benches);