diff --git a/crates/iceberg/src/arrow/delete_file_manager.rs b/crates/iceberg/src/arrow/delete_file_manager.rs index 0808ab7e3..eb72ab550 100644 --- a/crates/iceberg/src/arrow/delete_file_manager.rs +++ b/crates/iceberg/src/arrow/delete_file_manager.rs @@ -19,10 +19,11 @@ use std::collections::HashMap; use std::future::Future; use std::ops::BitAndAssign; use std::pin::Pin; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, OnceLock, RwLock}; use std::task::{Context, Poll}; use futures::channel::oneshot; +use futures::future::join_all; use futures::{StreamExt, TryStreamExt}; use roaring::RoaringTreemap; @@ -41,11 +42,21 @@ use crate::{Error, ErrorKind, Result}; // state that can be awaited upon to get te EQ deletes. That way we can check to see if // a load of each Eq delete file is already in progress and avoid starting another one. #[derive(Debug, Clone)] -struct EqDelFuture {} +struct EqDelFuture { + result: OnceLock, +} impl EqDelFuture { pub fn new() -> (oneshot::Sender, Self) { - todo!() + let (tx, rx) = oneshot::channel(); + let result = OnceLock::new(); + + crate::runtime::spawn({ + let result = result.clone(); + async move { result.set(rx.await.unwrap()) } + }); + + (tx, Self { result }) } } @@ -53,7 +64,10 @@ impl Future for EqDelFuture { type Output = Predicate; fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - todo!() + match self.result.get() { + None => Poll::Pending, + Some(predicate) => Poll::Ready(predicate.clone()), + } } } @@ -189,6 +203,16 @@ impl DeleteFileManager { .try_collect::>() .await?; + // wait for all in-progress EQ deletes from other tasks + let _ = join_all(results.iter().filter_map(|i| { + if let ParsedDeleteFileContext::InProgEqDel(fut) = i { + Some(fut.clone()) + } else { + None + } + })) + .await; + let merged_delete_vectors = results .into_iter() .fold(HashMap::default(), Self::merge_delete_vectors);