From 7a1cb12d048ffd111b99fc1d53c1e5817e8e9117 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 11 Mar 2024 02:56:23 -0700 Subject: [PATCH] Make plan_files as asynchronous stream (#243) --- crates/iceberg/src/scan.rs | 57 +++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 37fde8f51..bd0e6adcc 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -23,6 +23,7 @@ use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, Tab use crate::table::Table; use crate::{Error, ErrorKind}; use arrow_array::RecordBatch; +use async_stream::try_stream; use futures::stream::{iter, BoxStream}; use futures::StreamExt; @@ -143,37 +144,43 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result>; impl TableScan { /// Returns a stream of file scan tasks. pub async fn plan_files(&self) -> crate::Result { - let manifest_list = self - .snapshot - .load_manifest_list(&self.file_io, &self.table_metadata) + let snapshot = self.snapshot.clone(); + let table_metadata = self.table_metadata.clone(); + let file_io = self.file_io.clone(); + + Ok(try_stream! { + let manifest_list = snapshot + .clone() + .load_manifest_list(&file_io, &table_metadata) .await?; - // Generate data file stream - let mut file_scan_tasks = Vec::with_capacity(manifest_list.entries().len()); - for manifest_list_entry in manifest_list.entries().iter() { - // Data file - let manifest = manifest_list_entry.load_manifest(&self.file_io).await?; - - for manifest_entry in manifest.entries().iter().filter(|e| e.is_alive()) { - match manifest_entry.content_type() { - DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Delete files are not supported yet.", - )); - } - DataContentType::Data => { - file_scan_tasks.push(Ok(FileScanTask { - data_file: manifest_entry.clone(), - start: 0, - length: manifest_entry.file_size_in_bytes(), - })); + // Generate data file stream + let mut entries = iter(manifest_list.entries()); + while let Some(entry) = entries.next().await { + let manifest = entry.load_manifest(&file_io).await?; + + let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive())); + while let Some(manifest_entry) = manifest_entries.next().await { + match manifest_entry.content_type() { + DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { + yield Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete files are not supported yet.", + ))?; + } + DataContentType::Data => { + let scan_task: crate::Result = Ok(FileScanTask { + data_file: manifest_entry.clone(), + start: 0, + length: manifest_entry.file_size_in_bytes(), + }); + yield scan_task?; + } } } } } - - Ok(iter(file_scan_tasks).boxed()) + .boxed()) } pub async fn to_arrow(&self) -> crate::Result {