Skip to content

Commit

Permalink
Make plan_files as asynchronous stream (#243)
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Mar 11, 2024
1 parent b8919cd commit 7a1cb12
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, Tab
use crate::table::Table;
use crate::{Error, ErrorKind};
use arrow_array::RecordBatch;
use async_stream::try_stream;
use futures::stream::{iter, BoxStream};
use futures::StreamExt;

Expand Down Expand Up @@ -143,37 +144,43 @@ pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;
impl TableScan {
/// Returns a stream of file scan tasks.
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
let manifest_list = self
.snapshot
.load_manifest_list(&self.file_io, &self.table_metadata)
let snapshot = self.snapshot.clone();
let table_metadata = self.table_metadata.clone();
let file_io = self.file_io.clone();

Ok(try_stream! {
let manifest_list = snapshot
.clone()
.load_manifest_list(&file_io, &table_metadata)
.await?;

// Generate data file stream
let mut file_scan_tasks = Vec::with_capacity(manifest_list.entries().len());
for manifest_list_entry in manifest_list.entries().iter() {
// Data file
let manifest = manifest_list_entry.load_manifest(&self.file_io).await?;

for manifest_entry in manifest.entries().iter().filter(|e| e.is_alive()) {
match manifest_entry.content_type() {
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"Delete files are not supported yet.",
));
}
DataContentType::Data => {
file_scan_tasks.push(Ok(FileScanTask {
data_file: manifest_entry.clone(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
}));
// Generate data file stream
let mut entries = iter(manifest_list.entries());
while let Some(entry) = entries.next().await {
let manifest = entry.load_manifest(&file_io).await?;

let mut manifest_entries = iter(manifest.entries().iter().filter(|e| e.is_alive()));
while let Some(manifest_entry) = manifest_entries.next().await {
match manifest_entry.content_type() {
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
yield Err(Error::new(
ErrorKind::FeatureUnsupported,
"Delete files are not supported yet.",
))?;
}
DataContentType::Data => {
let scan_task: crate::Result<FileScanTask> = Ok(FileScanTask {
data_file: manifest_entry.clone(),
start: 0,
length: manifest_entry.file_size_in_bytes(),
});
yield scan_task?;
}
}
}
}
}

Ok(iter(file_scan_tasks).boxed())
.boxed())
}

pub async fn to_arrow(&self) -> crate::Result<ArrowRecordBatchStream> {
Expand Down

0 comments on commit 7a1cb12

Please sign in to comment.