Skip to content

Commit

Permalink
feat: Read Parquet data file with projection
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 10, 2024
1 parent 0914f7a commit abc1c2a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 21 deletions.
58 changes: 49 additions & 9 deletions crates/iceberg/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@

//! Parquet file data reader
use crate::{Error, ErrorKind};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use async_stream::try_stream;
use futures::stream::StreamExt;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use std::sync::Arc;

use crate::io::FileIO;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::SchemaRef;

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
column_names: Vec<String>,
file_io: FileIO,
schema: SchemaRef,
}
Expand All @@ -37,6 +42,7 @@ impl ArrowReaderBuilder {
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
column_names: vec![],
file_io,
schema,
}
Expand All @@ -49,10 +55,17 @@ impl ArrowReaderBuilder {
self
}

/// Sets the desired column projection.
pub fn with_column_projection(mut self, column_names: Vec<String>) -> Self {
self.column_names = column_names;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
column_names: self.column_names,
schema: self.schema,
file_io: self.file_io,
}
Expand All @@ -62,6 +75,7 @@ impl ArrowReaderBuilder {
/// Reads data from Parquet files
pub struct ArrowReader {
batch_size: Option<usize>,
column_names: Vec<String>,
#[allow(dead_code)]
schema: SchemaRef,
file_io: FileIO,
Expand All @@ -75,17 +89,18 @@ impl ArrowReader {

Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {

let projection_mask = self.get_arrow_projection_mask(&task);

let parquet_reader = file_io
.new_input(task.data_file().file_path())?
.reader()
.await?;

let mut batch_stream_builder = ParquetRecordBatchStreamBuilder::new(parquet_reader)
.await?
.with_projection(projection_mask);
.await?;

let metadata = batch_stream_builder.metadata();
let parquet_schema = batch_stream_builder.schema();
let projection_mask = self.get_arrow_projection_mask(metadata, parquet_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
Expand All @@ -101,8 +116,33 @@ impl ArrowReader {
.boxed())
}

fn get_arrow_projection_mask(&self, _task: &FileScanTask) -> ProjectionMask {
// TODO: full implementation
ProjectionMask::all()
fn get_arrow_projection_mask(
&self,
metadata: &Arc<ParquetMetaData>,
parquet_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
if self.column_names.is_empty() {
Ok(ProjectionMask::all())
} else {
let mut indices = vec![];
for column_name in &self.column_names {
match parquet_schema.index_of(column_name) {
Ok(index) => indices.push(index),
Err(_) => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, parquet_schema
),
));
}
}
}
Ok(ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
indices,
))
}
}
}
64 changes: 52 additions & 12 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ impl<'a> TableScanBuilder<'a> {
if schema.field_by_name(column_name).is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Column {} not found in table.", column_name),
format!(
"Column {} not found in table. Schema: {}",
column_name, schema
),
));
}
}
Expand Down Expand Up @@ -180,6 +183,9 @@ impl TableScan {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

arrow_reader_builder =
arrow_reader_builder.with_column_projection(self.column_names.clone());

if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
}
Expand Down Expand Up @@ -383,18 +389,29 @@ mod tests {

// prepare data
let schema = {
let fields =
vec![
arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
];
let fields = vec![
arrow_schema::Field::new("x", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string(),
)])),
arrow_schema::Field::new("y", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
arrow_schema::Field::new("z", arrow_schema::DataType::Int64, false)
.with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string(),
)])),
];
Arc::new(arrow_schema::Schema::new(fields))
};
let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as ArrayRef;
let col2 = Arc::new(Int64Array::from_iter_values(vec![2; 1024])) as ArrayRef;
let col3 = Arc::new(Int64Array::from_iter_values(vec![3; 1024])) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(), vec![col1, col2, col3]).unwrap();

// Write the Parquet files
let props = WriterProperties::builder()
Expand Down Expand Up @@ -524,9 +541,32 @@ mod tests {

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

let col = batches[0].column_by_name("col").unwrap();
let col = batches[0].column_by_name("x").unwrap();

let int64_arr = col.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);
}

#[tokio::test]
async fn test_open_parquet_with_projection() {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

// Create table scan for current snapshot and plan files
let table_scan = fixture.table.scan().select(["x", "z"]).build().unwrap();

let batch_stream = table_scan.to_arrow().await.unwrap();

let batches: Vec<_> = batch_stream.try_collect().await.unwrap();

assert_eq!(batches[0].num_columns(), 2);

let col1 = batches[0].column_by_name("x").unwrap();
let int64_arr = col1.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 1);

let col2 = batches[0].column_by_name("z").unwrap();
let int64_arr = col2.as_any().downcast_ref::<Int64Array>().unwrap();
assert_eq!(int64_arr.value(0), 3);
}
}

0 comments on commit abc1c2a

Please sign in to comment.