Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Mar 10, 2024
1 parent abc1c2a commit 30010ca
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
36 changes: 17 additions & 19 deletions crates/iceberg/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::spec::SchemaRef;
/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
column_names: Vec<String>,
columns: Vec<usize>,
file_io: FileIO,
schema: SchemaRef,
}
Expand All @@ -42,7 +42,7 @@ impl ArrowReaderBuilder {
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
column_names: vec![],
columns: vec![],
file_io,
schema,
}
Expand All @@ -56,16 +56,16 @@ impl ArrowReaderBuilder {
}

/// Sets the desired column projection.
pub fn with_column_projection(mut self, column_names: Vec<String>) -> Self {
self.column_names = column_names;
pub fn with_column_projection(mut self, columns: Vec<usize>) -> Self {
self.columns = columns;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
column_names: self.column_names,
columns: self.columns,
schema: self.schema,
file_io: self.file_io,
}
Expand All @@ -75,7 +75,7 @@ impl ArrowReaderBuilder {
/// Reads data from Parquet files
pub struct ArrowReader {
batch_size: Option<usize>,
column_names: Vec<String>,
columns: Vec<usize>,
#[allow(dead_code)]
schema: SchemaRef,
file_io: FileIO,
Expand Down Expand Up @@ -121,23 +121,21 @@ impl ArrowReader {
metadata: &Arc<ParquetMetaData>,
parquet_schema: &ArrowSchemaRef,
) -> crate::Result<ProjectionMask> {
if self.column_names.is_empty() {
if self.columns.is_empty() {
Ok(ProjectionMask::all())
} else {
let mut indices = vec![];
for column_name in &self.column_names {
match parquet_schema.index_of(column_name) {
Ok(index) => indices.push(index),
Err(_) => {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, parquet_schema
),
));
}
for col in &self.columns {
if *col > parquet_schema.fields().len() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Column index {} out of range. Schema: {}",
col, parquet_schema
),
));
}
indices.push(*col - 1);
}
Ok(ProjectionMask::roots(
metadata.file_metadata().schema_descr(),
Expand Down
17 changes: 15 additions & 2 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,21 @@ impl TableScan {
let mut arrow_reader_builder =
ArrowReaderBuilder::new(self.file_io.clone(), self.schema.clone());

arrow_reader_builder =
arrow_reader_builder.with_column_projection(self.column_names.clone());
let mut field_ids = vec![];
for column_name in &self.column_names {
let field_id = self.schema.field_id_by_name(column_name).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Column {} not found in table. Schema: {}",
column_name, self.schema
),
)
})?;
field_ids.push(field_id as usize);
}

arrow_reader_builder = arrow_reader_builder.with_column_projection(field_ids);

if let Some(batch_size) = self.batch_size {
arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size);
Expand Down

0 comments on commit 30010ca

Please sign in to comment.