Skip to content

Commit 10f395d

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents 9d8829a + 36a4c55 commit 10f395d

File tree

10 files changed

+506
-97
lines changed

10 files changed

+506
-97
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,12 +663,13 @@ pub mod tests {
663663
arrow::array::{Float64Array, Int64Array},
664664
common::tree_node::{TransformedResult, TreeNode},
665665
execution::SessionStateBuilder,
666-
prelude::SessionContext,
666+
prelude::{SessionConfig, SessionContext},
667667
};
668668
use datafusion_iceberg::{
669669
catalog::catalog::IcebergCatalog,
670670
planner::{iceberg_transform, IcebergQueryPlanner},
671671
};
672+
use futures::StreamExt;
672673
use iceberg_rust::{
673674
catalog::{namespace::Namespace, Catalog},
674675
object_store::{Bucket, ObjectStoreBuilder},
@@ -734,7 +735,17 @@ pub mod tests {
734735
.unwrap(),
735736
);
736737

738+
let mut config = SessionConfig::default();
739+
740+
config.options_mut().execution.minimum_parallel_output_files = 1;
741+
config
742+
.options_mut()
743+
.execution
744+
.parquet
745+
.maximum_parallel_row_group_writers = 4;
746+
737747
let state = SessionStateBuilder::new()
748+
.with_config(config)
738749
.with_default_features()
739750
.with_query_planner(Arc::new(IcebergQueryPlanner::new()))
740751
.build();
@@ -801,7 +812,7 @@ pub mod tests {
801812
L_RECEIPTDATE DATE NOT NULL,
802813
L_SHIPINSTRUCT VARCHAR NOT NULL,
803814
L_SHIPMODE VARCHAR NOT NULL,
804-
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem' PARTITIONED BY ( \"month(L_SHIPDATE)\" );";
815+
L_COMMENT VARCHAR NOT NULL ) STORED AS ICEBERG LOCATION 's3://warehouse/tpch/lineitem';";
805816

806817
let plan = ctx.state().create_logical_plan(sql).await.unwrap();
807818

@@ -886,6 +897,21 @@ pub mod tests {
886897
.unwrap();
887898

888899
assert_eq!(std::str::from_utf8(&version_hint).unwrap(), "1");
900+
901+
let files = object_store.list(None).collect::<Vec<_>>().await;
902+
903+
assert_eq!(
904+
files
905+
.iter()
906+
.filter(|x| x
907+
.as_ref()
908+
.unwrap()
909+
.location
910+
.extension()
911+
.is_some_and(|x| x == "parquet"))
912+
.count(),
913+
1
914+
);
889915
}
890916

891917
#[tokio::test]

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ derive_builder = { workspace = true }
1919
futures = { workspace = true }
2020
iceberg-rust = { path = "../iceberg-rust", version = "0.8.0" }
2121
itertools = { workspace = true }
22+
lru = { workspace = true }
2223
object_store = { workspace = true }
2324
pin-project-lite = "0.2.16"
2425
regex = { workspace = true }

datafusion_iceberg/src/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
Error type for iceberg
33
*/
44

5-
use datafusion::error::DataFusionError;
5+
use datafusion::{arrow::array::RecordBatch, error::DataFusionError};
66
use iceberg_rust::error::Error as IcebergError;
77
use thiserror::Error;
88

@@ -54,6 +54,15 @@ pub enum Error {
5454
/// parse int error
5555
#[error(transparent)]
5656
ParseInt(#[from] std::num::ParseIntError),
57+
/// Tokio error
58+
#[error(transparent)]
59+
TokioSend(
60+
#[from]
61+
tokio::sync::mpsc::error::SendError<(
62+
object_store::path::Path,
63+
tokio::sync::mpsc::Receiver<RecordBatch>,
64+
)>,
65+
),
5766
/// parse int error
5867
#[error(transparent)]
5968
DeriveBuilder(#[from] derive_builder::UninitializedFieldError),

0 commit comments

Comments
 (0)