diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 8074af30b..0808ab7e3 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -15,50 +15,526 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; +use std::future::Future; +use std::ops::BitAndAssign; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; +use std::task::{Context, Poll}; + +use futures::channel::oneshot; +use futures::{StreamExt, TryStreamExt}; use roaring::RoaringTreemap; -use crate::expr::BoundPredicate; +use crate::arrow::ArrowReader; +use crate::expr::Predicate::AlwaysTrue; +use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; -use crate::scan::FileScanTaskDeleteFile; -use crate::spec::SchemaRef; +use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskDeleteFile}; +use crate::spec::DataContentType; use crate::{Error, ErrorKind, Result}; -pub(crate) struct DeleteFileManager {} +// Equality deletes may apply to more than one DataFile in a scan, and so +// the same equality delete file may be present in more than one invocation of +// DeleteFileManager::load_deletes in the same scan. We want to deduplicate these +// to avoid having to load them twice, so we immediately store cloneable futures in the +// state that can be awaited upon to get te EQ deletes. That way we can check to see if +// a load of each Eq delete file is already in progress and avoid starting another one. +#[derive(Debug, Clone)] +struct EqDelFuture {} + +impl EqDelFuture { + pub fn new() -> (oneshot::Sender, Self) { + todo!() + } +} + +impl Future for EqDelFuture { + type Output = Predicate; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + todo!() + } +} + +#[derive(Debug, Default)] +struct DeleteFileManagerState { + // delete vectors and positional deletes get merged when loaded into a single delete vector + // per data file + delete_vectors: HashMap, + + // equality delete files are parsed into unbound `Predicate`s. We store them here as + // cloneable futures (see note below) + equality_deletes: HashMap, +} + +type StateRef = Arc>; + +#[derive(Clone, Debug)] +pub(crate) struct DeleteFileManager { + state: Arc>, +} + +// Intermediate context during processing of a delete file task. +enum DeleteFileContext { + // TODO: Delete Vector loader from Puffin files + InProgEqDel(EqDelFuture), + PosDels(ArrowRecordBatchStream), + FreshEqDel { + batch_stream: ArrowRecordBatchStream, + sender: oneshot::Sender, + }, +} + +// Final result of the processing of a delete file task before +// results are fully merged into the DeleteFileManager's state +enum ParsedDeleteFileContext { + InProgEqDel(EqDelFuture), + DelVecs(HashMap), + EqDel, +} #[allow(unused_variables)] impl DeleteFileManager { + pub(crate) fn new() -> DeleteFileManager { + Self { + state: Default::default(), + } + } + pub(crate) async fn load_deletes( - delete_file_entries: Vec, + &self, + delete_file_entries: &[FileScanTaskDeleteFile], file_io: FileIO, concurrency_limit_data_files: usize, - ) -> Result { - // TODO + ) -> Result<()> { + /* + * Create a single stream of all delete file tasks irrespective of type, + so that we can respect the combined concurrency limit + * We then process each in two phases: load and parse. + * for positional deletes the load phase instantiates an ArrowRecordBatchStream to + stream the file contents out + * for eq deletes, we first check if the EQ delete is already loaded or being loaded by + another concurrently processing data file scan task. If it is, we return a future + for the pre-existing task from the load phase. If not, we create such a future + and store it in the state to prevent other data file tasks from starting to load + the same equality delete file, and return a record batch stream from the load phase + as per the other delete file types - only this time it is accompanied by a one-shot + channel sender that we will eventually use to resolve the shared future that we stored + in the state. + * When this gets updated to add support for delete vectors, the load phase will return + a PuffinReader for them. + * The parse phase parses each record batch stream according to its associated data type. + The result of this is a map of data file paths to delete vectors for the positional + delete tasks (and in future for the delete vector tasks). For equality delete + file tasks, this results in an unbound Predicate. + * The unbound Predicates resulting from equality deletes are sent to their associated oneshot + channel to store them in the right place in the delete file manager's state. + * The results of all of these futures are awaited on in parallel with the specified + level of concurrency and collected into a vec. We then combine all of the delete + vector maps that resulted from any positional delete or delete vector files into a + single map and persist it in the state. - if !delete_file_entries.is_empty() { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Reading delete files is not yet supported", - )) - } else { - Ok(DeleteFileManager {}) + + Conceptually, the data flow is like this: + + FileScanTaskDeleteFile + | + Already-loading EQ Delete | Everything Else + +---------------------------------------------------+ + | | + [get existing future] [load recordbatch stream / puffin] + DeleteFileContext::InProgEqDel DeleteFileContext + | | + | | + | +-----------------------------+--------------------------+ + | Pos Del Del Vec (Not yet Implemented) EQ Del + | | | | + | [parse pos del stream] [parse del vec puffin] [parse eq del] + | HashMap HashMap (Predicate, Sender) + | | | | + | | | [persist to state] + | | | () + | | | | + | +-----------------------------+--------------------------+ + | | + | [buffer unordered] + | | + | [combine del vectors] + | HashMap + | | + | [persist del vectors to state] + | () + | | + +-------------------------+-------------------------+ + | + [join!] + */ + + let stream_items = delete_file_entries + .iter() + .map(|t| (t.clone(), file_io.clone(), self.state.clone())) + .collect::>(); + // NOTE: removing the collect and just passing the iterator to futures::stream:iter + // results in an error 'implementation of `std::ops::FnOnce` is not general enough' + + let task_stream = futures::stream::iter(stream_items.into_iter()); + + let results: Vec = task_stream + .map(move |(task, file_io, state_ref)| async { + Self::load_file_for_task(task, file_io, state_ref).await + }) + .map(move |ctx| Ok(async { Self::parse_file_content_for_task(ctx.await?).await })) + .try_buffer_unordered(concurrency_limit_data_files) + .try_collect::>() + .await?; + + let merged_delete_vectors = results + .into_iter() + .fold(HashMap::default(), Self::merge_delete_vectors); + + self.state.write().unwrap().delete_vectors = merged_delete_vectors; + + Ok(()) + } + + async fn load_file_for_task( + task: FileScanTaskDeleteFile, + file_io: FileIO, + state: StateRef, + ) -> Result { + match task.file_type { + DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( + Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + )), + + DataContentType::EqualityDeletes => { + let (sender, fut) = EqDelFuture::new(); + { + let mut state = state.write().unwrap(); + + if let Some(existing) = state.equality_deletes.get(&task.file_path) { + return Ok(DeleteFileContext::InProgEqDel(existing.clone())); + } + + state + .equality_deletes + .insert(task.file_path.to_string(), fut); + } + + Ok(DeleteFileContext::FreshEqDel { + batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?, + sender, + }) + } + + DataContentType::Data => Err(Error::new( + ErrorKind::Unexpected, + "tasks with files of type Data not expected here", + )), + } + } + + async fn parse_file_content_for_task( + ctx: DeleteFileContext, + ) -> Result { + match ctx { + DeleteFileContext::InProgEqDel(fut) => Ok(ParsedDeleteFileContext::InProgEqDel(fut)), + DeleteFileContext::PosDels(batch_stream) => { + let del_vecs = + Self::parse_positional_deletes_record_batch_stream(batch_stream).await?; + Ok(ParsedDeleteFileContext::DelVecs(del_vecs)) + } + DeleteFileContext::FreshEqDel { + sender, + batch_stream, + } => { + let predicate = + Self::parse_equality_deletes_record_batch_stream(batch_stream).await?; + + sender + .send(predicate) + .map_err(|err| { + Error::new( + ErrorKind::Unexpected, + "Could not send eq delete predicate to state", + ) + }) + .map(|_| ParsedDeleteFileContext::EqDel) + } + } + } + + fn merge_delete_vectors( + mut merged_delete_vectors: HashMap, + item: ParsedDeleteFileContext, + ) -> HashMap { + if let ParsedDeleteFileContext::DelVecs(del_vecs) = item { + del_vecs.into_iter().for_each(|(key, val)| { + let entry = merged_delete_vectors.entry(key).or_default(); + entry.bitand_assign(val); + }); } + + merged_delete_vectors + } + + /// Loads a RecordBatchStream for a given datafile. + async fn parquet_to_batch_stream( + data_file_path: &str, + file_io: FileIO, + ) -> Result { + /* + Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly + as that introduces a circular dependency. + */ + let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( + data_file_path, + file_io.clone(), + false, + ) + .await? + .build()? + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{}", e))); + + Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - pub(crate) fn build_delete_predicate( + /// Parses a record batch stream coming from positional delete files + /// + /// Returns a map of data file path to a delete vector + async fn parse_positional_deletes_record_batch_stream( + stream: ArrowRecordBatchStream, + ) -> Result> { + // TODO + + Ok(HashMap::default()) + } + + /// Parses record batch streams from individual equality delete files + /// + /// Returns an unbound Predicate for each batch stream + async fn parse_equality_deletes_record_batch_stream( + streams: ArrowRecordBatchStream, + ) -> Result { + // TODO + + Ok(AlwaysTrue) + } + + /// Builds eq delete predicate for the provided task. + /// + /// Must await on load_deletes before calling this. + pub(crate) async fn build_delete_predicate_for_task( &self, - snapshot_schema: SchemaRef, + file_scan_task: &FileScanTask, ) -> Result> { - // TODO + // * Filter the task's deletes into just the Equality deletes + // * Retrieve the unbound predicate for each from self.state.equality_deletes + // * Logical-AND them all together to get a single combined `Predicate` + // * Bind the predicate to the task's schema to get a `BoundPredicate` + + let mut combined_predicate = AlwaysTrue; + for delete in &file_scan_task.deletes { + if !is_equality_delete(delete) { + continue; + } + + let predicate = { + let state = self.state.read().unwrap(); - Ok(None) + let Some(predicate) = state.equality_deletes.get(&delete.file_path) else { + return Err(Error::new( + ErrorKind::Unexpected, + format!( + "Missing predicate for equality delete file '{}'", + delete.file_path + ), + )); + }; + + predicate.clone() + }; + + combined_predicate = combined_predicate.and(predicate.await); + } + + if combined_predicate == AlwaysTrue { + return Ok(None); + } + + // TODO: handle case-insensitive case + let bound_predicate = combined_predicate.bind(file_scan_task.schema.clone(), false)?; + Ok(Some(bound_predicate)) } - pub(crate) fn get_positional_delete_indexes_for_data_file( + /// Retrieve a delete vector for the data file associated with a given file scan task + /// + /// Should only be called after awaiting on load_deletes. Takes the vector to avoid a + /// clone since each item is specific to a single data file and won't need to be used again + pub(crate) fn get_delete_vector_for_task( &self, - data_file_path: &str, + file_scan_task: &FileScanTask, ) -> Option { - // TODO + self.state + .write() + .unwrap() + .delete_vectors + .remove(file_scan_task.data_file_path()) + } +} + +pub(crate) fn is_equality_delete(f: &FileScanTaskDeleteFile) -> bool { + matches!(f.file_type, DataContentType::EqualityDeletes) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::path::Path; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema as ArrowSchema; + use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; + use parquet::basic::Compression; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::spec::{DataFileFormat, Schema}; + + type ArrowSchemaRef = Arc; + + const FIELD_ID_POSITIONAL_DELETE_FILE_PATH: u64 = 2147483546; + const FIELD_ID_POSITIONAL_DELETE_POS: u64 = 2147483545; + + #[tokio::test] + async fn test_delete_file_manager_load_deletes() { + // Note that with the delete file parsing not yet in place, all we can test here is that + // the call to the loader does not fail. + let delete_file_manager = DeleteFileManager::new(); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let file_scan_tasks = setup(table_location); + + delete_file_manager + .load_deletes(&file_scan_tasks[0].deletes, file_io, 5) + .await + .unwrap(); + } + + fn setup(table_location: &Path) -> Vec { + let data_file_schema = Arc::new(Schema::builder().build().unwrap()); + let positional_delete_schema = create_pos_del_schema(); + + let file_path_values = vec![format!("{}/1.parquet", table_location.to_str().unwrap()); 8]; + let pos_values = vec![0, 1, 3, 5, 6, 8, 1022, 1023]; + + let file_path_col = Arc::new(StringArray::from_iter_values(file_path_values)); + let pos_col = Arc::new(Int64Array::from_iter_values(pos_values)); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + for n in 1..=3 { + let positional_deletes_to_write = + RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col.clone(), + pos_col.clone(), + ]) + .unwrap(); + + let file = File::create(format!( + "{}/pos-del-{}.parquet", + table_location.to_str().unwrap(), + n + )) + .unwrap(); + let mut writer = ArrowWriter::try_new( + file, + positional_deletes_to_write.schema(), + Some(props.clone()), + ) + .unwrap(); + + writer + .write(&positional_deletes_to_write) + .expect("Writing batch"); + + // writer must be closed to write footer + writer.close().unwrap(); + } + + let pos_del_1 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-1.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + }; + + let pos_del_2 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-2.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + }; + + let pos_del_3 = FileScanTaskDeleteFile { + file_path: format!("{}/pos-del-3.parquet", table_location.to_str().unwrap()), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + }; + + let file_scan_tasks = vec![ + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_1, pos_del_2.clone()], + }, + FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "".to_string(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + deletes: vec![pos_del_2, pos_del_3], + }, + ]; + + file_scan_tasks + } - None + fn create_pos_del_schema() -> ArrowSchemaRef { + let fields = vec![ + arrow_schema::Field::new("file_path", arrow_schema::DataType::Utf8, false) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_FILE_PATH.to_string(), + )])), + arrow_schema::Field::new("pos", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + FIELD_ID_POSITIONAL_DELETE_POS.to_string(), + )]), + ), + ]; + Arc::new(arrow_schema::Schema::new(fields)) } } diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index fb4ec93cd..c7a2c13c2 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -63,6 +63,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReaderBuilder { @@ -76,6 +77,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + delete_file_support_enabled: false, } } @@ -104,14 +106,22 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable delete file support. + pub fn with_delete_file_support_enabled(mut self, delete_file_support_enabled: bool) -> Self { + self.delete_file_support_enabled = delete_file_support_enabled; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { batch_size: self.batch_size, + delete_file_manager: DeleteFileManager::new(), file_io: self.file_io, concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + delete_file_support_enabled: self.delete_file_support_enabled, } } } @@ -121,12 +131,14 @@ impl ArrowReaderBuilder { pub struct ArrowReader { batch_size: Option, file_io: FileIO, + delete_file_manager: DeleteFileManager, /// the maximum number of data files that can be fetched at the same time concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + delete_file_support_enabled: bool, } impl ArrowReader { @@ -134,22 +146,27 @@ impl ArrowReader { /// Returns a stream of Arrow RecordBatches containing the data from the files pub async fn read(self, tasks: FileScanTaskStream) -> Result { let file_io = self.file_io.clone(); + let delete_file_manager = self.delete_file_manager.clone(); let batch_size = self.batch_size; let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; + let delete_file_support_enabled = self.delete_file_support_enabled; let stream = tasks .map_ok(move |task| { let file_io = file_io.clone(); + let delete_file_manager = delete_file_manager.clone(); Self::process_file_scan_task( task, batch_size, file_io, + delete_file_manager, row_group_filtering_enabled, row_selection_enabled, concurrency_limit_data_files, + delete_file_support_enabled, ) }) .map_err(|err| { @@ -161,23 +178,37 @@ impl ArrowReader { Ok(Box::pin(stream) as ArrowRecordBatchStream) } + #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, file_io: FileIO, + delete_file_manager: DeleteFileManager, row_group_filtering_enabled: bool, row_selection_enabled: bool, concurrency_limit_data_files: usize, + delete_file_support_enabled: bool, ) -> Result { + if !delete_file_support_enabled && !task.deletes.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete file support is not enabled", + )); + } + let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); // concurrently retrieve delete files and create RecordBatchStreamBuilder - let (delete_file_manager, mut record_batch_stream_builder) = try_join!( - DeleteFileManager::load_deletes( - task.deletes.clone(), + let (_, mut record_batch_stream_builder) = try_join!( + delete_file_manager.load_deletes( + if delete_file_support_enabled { + &task.deletes + } else { + &[] + }, file_io.clone(), - concurrency_limit_data_files + concurrency_limit_data_files, ), Self::create_parquet_record_batch_stream_builder( &task.data_file_path, @@ -206,7 +237,9 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_predicate = delete_file_manager.build_delete_predicate(task.schema.clone())?; + let delete_predicate = delete_file_manager + .build_delete_predicate_for_task(&task) + .await?; // In addition to the optional predicate supplied in the `FileScanTask`, // we also have an optional predicate resulting from equality delete files. @@ -274,14 +307,13 @@ impl ArrowReader { } } - let positional_delete_indexes = - delete_file_manager.get_positional_delete_indexes_for_data_file(&task.data_file_path); + let positional_delete_indexes = delete_file_manager.get_delete_vector_for_task(&task); if let Some(positional_delete_indexes) = positional_delete_indexes { let delete_row_selection = Self::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, - positional_delete_indexes, + positional_delete_indexes.clone(), )?; // merge the row selection from the delete files with the row selection @@ -317,7 +349,7 @@ impl ArrowReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - async fn create_parquet_record_batch_stream_builder( + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, should_load_page_index: bool, @@ -1305,7 +1337,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> { /// - `metadata_size_hint`: Provide a hint as to the size of the parquet file's footer. /// - `preload_column_index`: Load the Column Index as part of [`Self::get_metadata`]. /// - `preload_offset_index`: Load the Offset Index as part of [`Self::get_metadata`]. -struct ArrowFileReader { +pub(crate) struct ArrowFileReader { meta: FileMetadata, r: R, }