From d96443e43f58b2b69f60a63046075aea33a1b6a9 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Fri, 15 Aug 2025 15:45:11 -0700 Subject: [PATCH 1/5] Add memory catalog loader impl --- crates/iceberg/src/catalog/memory/catalog.rs | 267 +++++++++++++----- crates/iceberg/src/catalog/memory/mod.rs | 2 +- crates/iceberg/src/io/mod.rs | 31 ++ crates/iceberg/src/lib.rs | 20 +- crates/iceberg/src/writer/mod.rs | 39 ++- .../datafusion/src/physical_plan/commit.rs | 28 +- .../datafusion/src/physical_plan/write.rs | 21 +- .../tests/integration_datafusion_test.rs | 27 +- 8 files changed, 321 insertions(+), 114 deletions(-) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 12d18b9f36..f1591acfd6 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -24,17 +24,84 @@ use futures::lock::{Mutex, MutexGuard}; use itertools::Itertools; use super::namespace_state::NamespaceState; -use crate::io::FileIO; +use crate::io::{FileIO, FileIOBuilder}; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; +/// Memory catalog io type +pub const MEMORY_CATALOG_IO_TYPE: &str = "io_type"; +/// Memory catalog warehouse location +pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse"; + /// namespace `location` property const LOCATION: &str = "location"; +/// Builder for [`MemoryCatalog`]. +#[derive(Debug)] +pub struct MemoryCatalogBuilder(MemoryCatalogConfig); + +impl Default for MemoryCatalogBuilder { + fn default() -> Self { + Self(MemoryCatalogConfig { + name: None, + io_type: None, + warehouse: None, + props: HashMap::new(), + }) + } +} + +impl CatalogBuilder for MemoryCatalogBuilder { + type C = MemoryCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + self.0.name = Some(name.into()); + + if props.contains_key(MEMORY_CATALOG_IO_TYPE) { + self.0.io_type = props.get(MEMORY_CATALOG_IO_TYPE).cloned() + } + + if props.contains_key(MEMORY_CATALOG_WAREHOUSE) { + self.0.warehouse = props.get(MEMORY_CATALOG_WAREHOUSE).cloned() + } + + // Collect other remaining properties + self.0.props = props + .into_iter() + .filter(|(k, _)| k != MEMORY_CATALOG_IO_TYPE && k != MEMORY_CATALOG_WAREHOUSE) + .collect(); + + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else { + Ok(MemoryCatalog::new(self.0)) + } + }; + + std::future::ready(result) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct MemoryCatalogConfig { + name: Option, + io_type: Option, + warehouse: Option, + props: HashMap, +} + /// Memory catalog implementation. #[derive(Debug)] pub struct MemoryCatalog { @@ -45,11 +112,20 @@ pub struct MemoryCatalog { impl MemoryCatalog { /// Creates a memory catalog. - pub fn new(file_io: FileIO, warehouse_location: Option) -> Self { + fn new(config: MemoryCatalogConfig) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io, - warehouse_location, + file_io: match config.io_type { + Some(scheme) => FileIOBuilder::new(scheme) + .with_props(config.props) + .build() + .unwrap(), + None => FileIOBuilder::new_fs_io() + .with_props(config.props) + .build() + .unwrap(), + }, + warehouse_location: config.warehouse, } } @@ -340,10 +416,15 @@ mod tests { temp_dir.path().to_str().unwrap().to_string() } - fn new_memory_catalog() -> impl Catalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + async fn new_memory_catalog() -> impl Catalog { let warehouse_location = temp_path(); - MemoryCatalog::new(file_io, Some(warehouse_location)) + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]), + ) + .await + .unwrap() } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -453,14 +534,14 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); } #[tokio::test] async fn test_list_namespaces_returns_single_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -471,7 +552,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -484,7 +565,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_only_top_level_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("b".into()); @@ -503,7 +584,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_no_namespaces_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -519,7 +600,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_namespace_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("c".into()); @@ -546,7 +627,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".to_string()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -578,7 +659,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_false() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -592,7 +673,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_true() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -601,7 +682,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_empty_properties() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); assert_eq!( @@ -620,7 +701,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_properties() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -642,7 +723,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_throws_error_if_namespace_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -666,7 +747,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let parent_namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &parent_namespace_ident).await; @@ -688,7 +769,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -711,7 +792,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -733,7 +814,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident_a).await; @@ -767,7 +848,7 @@ mod tests { #[tokio::test] async fn test_get_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -785,7 +866,7 @@ mod tests { #[tokio::test] async fn test_get_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -798,7 +879,7 @@ mod tests { #[tokio::test] async fn test_get_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -817,7 +898,7 @@ mod tests { #[tokio::test] async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("b".into()); @@ -836,7 +917,7 @@ mod tests { #[tokio::test] async fn test_update_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -856,7 +937,7 @@ mod tests { #[tokio::test] async fn test_update_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -877,7 +958,7 @@ mod tests { #[tokio::test] async fn test_update_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -904,7 +985,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("def".into()); @@ -923,7 +1004,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -934,7 +1015,7 @@ mod tests { #[tokio::test] async fn test_drop_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -953,7 +1034,7 @@ mod tests { #[tokio::test] async fn test_drop_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -988,7 +1069,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); assert_eq!( @@ -1006,7 +1087,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = @@ -1026,7 +1107,7 @@ mod tests { #[tokio::test] async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1046,7 +1127,7 @@ mod tests { #[tokio::test] async fn test_create_table_with_location() { let tmp_dir = TempDir::new().unwrap(); - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1084,9 +1165,17 @@ mod tests { #[tokio::test] async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1126,9 +1215,17 @@ mod tests { #[tokio::test] async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1179,9 +1276,17 @@ mod tests { #[tokio::test] async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); // note: no location specified in namespace_properties @@ -1220,9 +1325,17 @@ mod tests { #[tokio::test] async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); catalog @@ -1268,8 +1381,10 @@ mod tests { #[tokio::test] async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let catalog = MemoryCatalog::new(file_io, None); + let catalog = MemoryCatalogBuilder::default() + .load("memory", HashMap::from([])) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1298,7 +1413,7 @@ mod tests { #[tokio::test] async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; let table_name = "tbl1"; @@ -1330,7 +1445,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_empty_vector() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1339,7 +1454,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_a_single_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1353,7 +1468,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_multiple_tables() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1369,7 +1484,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_tables_from_correct_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("n1".into()); let namespace_ident_2 = NamespaceIdent::new("n2".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -1397,7 +1512,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_table_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1413,7 +1528,7 @@ mod tests { #[tokio::test] async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); @@ -1432,7 +1547,7 @@ mod tests { #[tokio::test] async fn test_drop_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1443,7 +1558,7 @@ mod tests { #[tokio::test] async fn test_drop_table_drops_table_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1461,7 +1576,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1482,7 +1597,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1503,7 +1618,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_true() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1514,7 +1629,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_false() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1529,7 +1644,7 @@ mod tests { #[tokio::test] async fn test_table_exists_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1550,7 +1665,7 @@ mod tests { #[tokio::test] async fn test_table_exists_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1571,7 +1686,7 @@ mod tests { #[tokio::test] async fn test_rename_table_in_same_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1590,7 +1705,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let src_namespace_ident = NamespaceIdent::new("a".into()); let dst_namespace_ident = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; @@ -1616,7 +1731,7 @@ mod tests { #[tokio::test] async fn test_rename_table_src_table_is_same_as_dst_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); @@ -1634,7 +1749,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_nested_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -1661,7 +1776,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = @@ -1686,7 +1801,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); create_namespace(&catalog, &src_namespace_ident).await; @@ -1710,7 +1825,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1728,7 +1843,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_table_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1751,7 +1866,7 @@ mod tests { #[tokio::test] async fn test_register_table() { // Create a catalog and namespace - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("test_namespace".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1794,7 +1909,7 @@ mod tests { #[tokio::test] async fn test_update_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let table = create_table_with_namespace(&catalog).await; @@ -1829,7 +1944,7 @@ mod tests { #[tokio::test] async fn test_update_table_fails_if_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; diff --git a/crates/iceberg/src/catalog/memory/mod.rs b/crates/iceberg/src/catalog/memory/mod.rs index bcad16dda7..b8d9e4a8ee 100644 --- a/crates/iceberg/src/catalog/memory/mod.rs +++ b/crates/iceberg/src/catalog/memory/mod.rs @@ -20,4 +20,4 @@ mod catalog; mod namespace_state; -pub use catalog::MemoryCatalog; +pub use catalog::*; diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 5eb5964345..814ad3d403 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -101,3 +101,34 @@ pub use storage_s3::*; pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) } + +/// All supported File IO types +#[derive(Debug)] +pub enum FileIOType { + /// In-Memory FileIO with scheme prefix 'memory' + Memory, + /// Local file system FileIO with scheme prefix 'file' + Fs, + /// S3 FileIO with scheme prefix 's3', 's3a' + S3, + /// GCS FileIO with scheme prefix 'gs', 'gcs' + Gcs, + /// Object Storage Service FileIO with scheme prefix 'oss' + Oss, + /// Azdls FileIO with scheme prefix 'abfss', 'abfs', 'wasbs', 'wasb' + Azdls, +} + +impl FileIOType { + /// Get string representation of file io type + pub const fn as_str(&self) -> &str { + match self { + FileIOType::Memory => "memory", + FileIOType::S3 => "s3", + FileIOType::Fs => "file", + FileIOType::Gcs => "gcs", + FileIOType::Oss => "oss", + FileIOType::Azdls => "azdls", + } + } +} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 612a5db601..727a6af06e 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -22,16 +22,28 @@ //! ## Scan A Table //! //! ```rust, no_run +//! use std::collections::HashMap; +//! //! use futures::TryStreamExt; +//! use iceberg::io::FileIOType::Memory; //! use iceberg::io::{FileIO, FileIOBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([ +//! ( +//! MEMORY_CATALOG_IO_TYPE.to_string(), +//! Memory.as_str().to_string(), +//! ), // specify the file io type +//! ]), +//! ) +//! .await?; //! // Load table from catalog. //! let table = catalog //! .load_table(&TableIdent::from_strs(["hello", "world"])?) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 2076d7707d..cf583d9630 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -40,10 +40,12 @@ //! //! # Simple example for the data file writer used parquet physical format: //! ```rust, no_run +//! use std::collections::HashMap; //! use std::sync::Arc; //! //! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; //! use async_trait::async_trait; +//! use iceberg::io::FileIOType::Memory; //! use iceberg::io::{FileIO, FileIOBuilder}; //! use iceberg::spec::DataFile; //! use iceberg::transaction::Transaction; @@ -53,14 +55,23 @@ //! DefaultFileNameGenerator, DefaultLocationGenerator, //! }; //! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! use parquet::file::properties::WriterProperties; //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([ +//! ( +//! MEMORY_CATALOG_IO_TYPE.to_string(), +//! Memory.as_str().to_string(), +//! ), // specify the file io type +//! ]), +//! ) +//! .await?; //! // Add customized code to create a table first. //! //! // Load table from catalog. @@ -98,10 +109,13 @@ //! //! # Custom writer to record latency //! ```rust, no_run +//! use std::collections::HashMap; //! use std::time::Instant; //! //! use arrow_array::RecordBatch; //! use iceberg::io::FileIOBuilder; +//! use iceberg::io::FileIOType::Memory; +//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; //! use iceberg::spec::DataFile; //! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; //! use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -109,7 +123,7 @@ //! DefaultFileNameGenerator, DefaultLocationGenerator, //! }; //! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! use parquet::file::properties::WriterProperties; //! //! #[derive(Clone)] @@ -160,11 +174,18 @@ //! //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! use iceberg::NamespaceIdent; -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([ +//! ( +//! MEMORY_CATALOG_IO_TYPE.to_string(), +//! Memory.as_str().to_string(), +//! ), // specify the file io type +//! ]), +//! ) +//! .await?; //! //! // Add customized code to create a table first. //! diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 1ce8fa8d82..0772bca493 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -271,12 +271,13 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::StreamExt; - use iceberg::io::FileIOBuilder; + use iceberg::io::FileIOType::Memory; + use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema, Struct, Type, }; - use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; use super::*; use crate::physical_plan::DATA_FILES_COL_NAME; @@ -374,11 +375,24 @@ mod tests { #[tokio::test] async fn test_iceberg_commit_exec() -> Result<(), Box> { // Create a memory catalog with in-memory file IO - let file_io = FileIOBuilder::new("memory").build()?; - let catalog = Arc::new(MemoryCatalog::new( - file_io, - Some("memory://root".to_string()), - )); + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([ + ( + MEMORY_CATALOG_IO_TYPE.to_string(), + Memory.as_str().to_string(), + ), + ( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://root".to_string(), + ), + ]), + ) + .await + .unwrap(), + ); // Create a namespace let namespace = NamespaceIdent::new("test_namespace".to_string()); diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4b8ef1ab11..e36df02553 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -328,11 +328,12 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::{StreamExt, stream}; - use iceberg::io::FileIOBuilder; + use iceberg::io::FileIOType::Fs; + use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json, }; - use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation}; + use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use tempfile::TempDir; @@ -425,9 +426,17 @@ mod tests { } /// Helper function to create a memory catalog - fn get_iceberg_catalog() -> MemoryCatalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - MemoryCatalog::new(file_io, Some(temp_path())) + async fn get_iceberg_catalog() -> MemoryCatalog { + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([ + (MEMORY_CATALOG_IO_TYPE.to_string(), Fs.as_str().to_string()), + (MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path()), + ]), + ) + .await + .unwrap() } /// Helper function to create a test table schema @@ -458,7 +467,7 @@ mod tests { #[tokio::test] async fn test_iceberg_write_exec() -> Result<()> { // 1. Set up test environment - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_namespace".to_string()); // Create namespace diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 1491e4dbff..3ae7175344 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -26,10 +26,10 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; -use iceberg::io::FileIOBuilder; +use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; use iceberg::test_utils::check_record_batches; -use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use tempfile::TempDir; @@ -38,9 +38,14 @@ fn temp_path() -> String { temp_dir.path().to_str().unwrap().to_string() } -fn get_iceberg_catalog() -> MemoryCatalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - MemoryCatalog::new(file_io, Some(temp_path())) +async fn get_iceberg_catalog() -> MemoryCatalog { + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]), + ) + .await + .unwrap() } fn get_struct_type() -> StructType { @@ -86,7 +91,7 @@ fn get_table_creation( #[tokio::test] async fn test_provider_plan_stream_schema() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -139,7 +144,7 @@ async fn test_provider_plan_stream_schema() -> Result<()> { #[tokio::test] async fn test_provider_list_table_names() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_list_table_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -171,7 +176,7 @@ async fn test_provider_list_table_names() -> Result<()> { #[tokio::test] async fn test_provider_list_schema_names() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_list_schema_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -196,7 +201,7 @@ async fn test_provider_list_schema_names() -> Result<()> { #[tokio::test] async fn test_table_projection() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -264,7 +269,7 @@ async fn test_table_projection() -> Result<()> { #[tokio::test] async fn test_table_predict_pushdown() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -309,7 +314,7 @@ async fn test_table_predict_pushdown() -> Result<()> { #[tokio::test] async fn test_metadata_table() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; From c85744a24b145d8fc6e77803fe95f25e3db1cae5 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 25 Aug 2025 15:54:43 -0700 Subject: [PATCH 2/5] Remove file io type --- crates/iceberg/src/catalog/memory/catalog.rs | 95 +++++++------------ crates/iceberg/src/io/mod.rs | 31 ------ crates/iceberg/src/lib.rs | 13 ++- crates/iceberg/src/writer/mod.rs | 25 +++-- .../datafusion/src/physical_plan/commit.rs | 17 +--- .../datafusion/src/physical_plan/write.rs | 8 +- 6 files changed, 56 insertions(+), 133 deletions(-) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index f1591acfd6..405d636b3b 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -24,7 +24,7 @@ use futures::lock::{Mutex, MutexGuard}; use itertools::Itertools; use super::namespace_state::NamespaceState; -use crate::io::{FileIO, FileIOBuilder}; +use crate::io::FileIO; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ @@ -32,8 +32,8 @@ use crate::{ TableCommit, TableCreation, TableIdent, }; -/// Memory catalog io type -pub const MEMORY_CATALOG_IO_TYPE: &str = "io_type"; +// /// Memory catalog io type +// pub const MEMORY_CATALOG_IO_TYPE: &str = "io_type"; /// Memory catalog warehouse location pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse"; @@ -48,8 +48,7 @@ impl Default for MemoryCatalogBuilder { fn default() -> Self { Self(MemoryCatalogConfig { name: None, - io_type: None, - warehouse: None, + warehouse: "".to_string(), props: HashMap::new(), }) } @@ -65,18 +64,21 @@ impl CatalogBuilder for MemoryCatalogBuilder { ) -> impl Future> + Send { self.0.name = Some(name.into()); - if props.contains_key(MEMORY_CATALOG_IO_TYPE) { - self.0.io_type = props.get(MEMORY_CATALOG_IO_TYPE).cloned() - } + // if props.contains_key(MEMORY_CATALOG_IO_TYPE) { + // self.0.io_type = props.get(MEMORY_CATALOG_IO_TYPE).cloned() + // } if props.contains_key(MEMORY_CATALOG_WAREHOUSE) { - self.0.warehouse = props.get(MEMORY_CATALOG_WAREHOUSE).cloned() + self.0.warehouse = props + .get(MEMORY_CATALOG_WAREHOUSE) + .cloned() + .unwrap_or_default() } // Collect other remaining properties self.0.props = props .into_iter() - .filter(|(k, _)| k != MEMORY_CATALOG_IO_TYPE && k != MEMORY_CATALOG_WAREHOUSE) + .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE) .collect(); let result = { @@ -85,6 +87,11 @@ impl CatalogBuilder for MemoryCatalogBuilder { ErrorKind::DataInvalid, "Catalog name is required", )) + } else if self.0.warehouse.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog warehouse is required", + )) } else { Ok(MemoryCatalog::new(self.0)) } @@ -97,8 +104,7 @@ impl CatalogBuilder for MemoryCatalogBuilder { #[derive(Clone, Debug)] pub(crate) struct MemoryCatalogConfig { name: Option, - io_type: Option, - warehouse: Option, + warehouse: String, props: HashMap, } @@ -107,7 +113,7 @@ pub(crate) struct MemoryCatalogConfig { pub struct MemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, - warehouse_location: Option, + warehouse_location: String, } impl MemoryCatalog { @@ -115,16 +121,11 @@ impl MemoryCatalog { fn new(config: MemoryCatalogConfig) -> Self { Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io: match config.io_type { - Some(scheme) => FileIOBuilder::new(scheme) - .with_props(config.props) - .build() - .unwrap(), - None => FileIOBuilder::new_fs_io() - .with_props(config.props) - .build() - .unwrap(), - }, + file_io: FileIO::from_path(&config.warehouse) + .unwrap() + .with_props(config.props) + .build() + .unwrap(), warehouse_location: config.warehouse, } } @@ -264,22 +265,9 @@ impl Catalog for MemoryCatalog { None => { let namespace_properties = root_namespace_state.get_properties(namespace_ident)?; let location_prefix = match namespace_properties.get(LOCATION) { - Some(namespace_location) => Ok(namespace_location.clone()), - None => match self.warehouse_location.clone() { - Some(warehouse_location) => Ok(format!( - "{}/{}", - warehouse_location, - namespace_ident.join("/") - )), - None => Err(Error::new( - ErrorKind::Unexpected, - format!( - "Cannot create table {:?}. No default path is set, please specify a location when creating a table.", - &table_ident - ), - )), - }, - }?; + Some(namespace_location) => namespace_location.clone(), + None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")), + }; let location = format!("{}/{}", location_prefix, table_ident.name()); @@ -1383,32 +1371,13 @@ mod tests { { let catalog = MemoryCatalogBuilder::default() .load("memory", HashMap::from([])) - .await - .unwrap(); - - let namespace_ident = NamespaceIdent::new("a".into()); - create_namespace(&catalog, &namespace_ident).await; - - let table_name = "tbl1"; - let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + .await; + assert!(catalog.is_err()); assert_eq!( - catalog - .create_table( - &namespace_ident, - TableCreation::builder() - .name(table_name.into()) - .schema(simple_table_schema()) - .build(), - ) - .await - .unwrap_err() - .to_string(), - format!( - "Unexpected => Cannot create table {:?}. No default path is set, please specify a location when creating a table.", - &expected_table_ident - ) - ) + catalog.unwrap_err().to_string(), + "DataInvalid => Catalog warehouse is required" + ); } #[tokio::test] diff --git a/crates/iceberg/src/io/mod.rs b/crates/iceberg/src/io/mod.rs index 814ad3d403..5eb5964345 100644 --- a/crates/iceberg/src/io/mod.rs +++ b/crates/iceberg/src/io/mod.rs @@ -101,34 +101,3 @@ pub use storage_s3::*; pub(crate) fn is_truthy(value: &str) -> bool { ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) } - -/// All supported File IO types -#[derive(Debug)] -pub enum FileIOType { - /// In-Memory FileIO with scheme prefix 'memory' - Memory, - /// Local file system FileIO with scheme prefix 'file' - Fs, - /// S3 FileIO with scheme prefix 's3', 's3a' - S3, - /// GCS FileIO with scheme prefix 'gs', 'gcs' - Gcs, - /// Object Storage Service FileIO with scheme prefix 'oss' - Oss, - /// Azdls FileIO with scheme prefix 'abfss', 'abfs', 'wasbs', 'wasb' - Azdls, -} - -impl FileIOType { - /// Get string representation of file io type - pub const fn as_str(&self) -> &str { - match self { - FileIOType::Memory => "memory", - FileIOType::S3 => "s3", - FileIOType::Fs => "file", - FileIOType::Gcs => "gcs", - FileIOType::Oss => "oss", - FileIOType::Azdls => "azdls", - } - } -} diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 727a6af06e..b30534161b 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -27,21 +27,20 @@ //! use futures::TryStreamExt; //! use iceberg::io::FileIOType::Memory; //! use iceberg::io::{FileIO, FileIOBuilder}; -//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; +//! use iceberg::memory::MemoryCatalogBuilder; //! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! //! #[tokio::main] //! async fn main() -> Result<()> { //! // Connect to a catalog. +//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; //! let catalog = MemoryCatalogBuilder::default() //! .load( //! "memory", -//! HashMap::from([ -//! ( -//! MEMORY_CATALOG_IO_TYPE.to_string(), -//! Memory.as_str().to_string(), -//! ), // specify the file io type -//! ]), +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), //! ) //! .await?; //! // Load table from catalog. diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index cf583d9630..57eaef4810 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -60,16 +60,14 @@ //! #[tokio::main] //! async fn main() -> Result<()> { //! // Connect to a catalog. -//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; +//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; //! let catalog = MemoryCatalogBuilder::default() //! .load( //! "memory", -//! HashMap::from([ -//! ( -//! MEMORY_CATALOG_IO_TYPE.to_string(), -//! Memory.as_str().to_string(), -//! ), // specify the file io type -//! ]), +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), //! ) //! .await?; //! // Add customized code to create a table first. @@ -115,7 +113,7 @@ //! use arrow_array::RecordBatch; //! use iceberg::io::FileIOBuilder; //! use iceberg::io::FileIOType::Memory; -//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder}; +//! use iceberg::memory::MemoryCatalogBuilder; //! use iceberg::spec::DataFile; //! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; //! use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -175,15 +173,14 @@ //! #[tokio::main] //! async fn main() -> Result<()> { //! // Connect to a catalog. +//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; //! let catalog = MemoryCatalogBuilder::default() //! .load( //! "memory", -//! HashMap::from([ -//! ( -//! MEMORY_CATALOG_IO_TYPE.to_string(), -//! Memory.as_str().to_string(), -//! ), // specify the file io type -//! ]), +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), //! ) //! .await?; //! diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 0772bca493..0ee0504faa 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -271,8 +271,7 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::StreamExt; - use iceberg::io::FileIOType::Memory; - use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema, Struct, Type, @@ -379,16 +378,10 @@ mod tests { MemoryCatalogBuilder::default() .load( "memory", - HashMap::from([ - ( - MEMORY_CATALOG_IO_TYPE.to_string(), - Memory.as_str().to_string(), - ), - ( - MEMORY_CATALOG_WAREHOUSE.to_string(), - "memory://root".to_string(), - ), - ]), + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://root".to_string(), + )]), ) .await .unwrap(), diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index e36df02553..82e02215ff 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -328,8 +328,7 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::{StreamExt, stream}; - use iceberg::io::FileIOType::Fs; - use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json, }; @@ -430,10 +429,7 @@ mod tests { MemoryCatalogBuilder::default() .load( "memory", - HashMap::from([ - (MEMORY_CATALOG_IO_TYPE.to_string(), Fs.as_str().to_string()), - (MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path()), - ]), + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]), ) .await .unwrap() From 057bbe982c2f29b70005162c41da44f48a73c93c Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 25 Aug 2025 15:57:04 -0700 Subject: [PATCH 3/5] remove unnecessary comments --- crates/iceberg/src/catalog/memory/catalog.rs | 2 -- crates/iceberg/src/lib.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 405d636b3b..3a22a77c08 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -32,8 +32,6 @@ use crate::{ TableCommit, TableCreation, TableIdent, }; -// /// Memory catalog io type -// pub const MEMORY_CATALOG_IO_TYPE: &str = "io_type"; /// Memory catalog warehouse location pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse"; diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index b30534161b..aae8efed74 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -25,7 +25,6 @@ //! use std::collections::HashMap; //! //! use futures::TryStreamExt; -//! use iceberg::io::FileIOType::Memory; //! use iceberg::io::{FileIO, FileIOBuilder}; //! use iceberg::memory::MemoryCatalogBuilder; //! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; From 3b59048d776470aa18af4fd6466946f394f94079 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Mon, 25 Aug 2025 16:41:49 -0700 Subject: [PATCH 4/5] remove deps --- crates/iceberg/src/writer/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 57eaef4810..02c4929e7e 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -45,7 +45,6 @@ //! //! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; //! use async_trait::async_trait; -//! use iceberg::io::FileIOType::Memory; //! use iceberg::io::{FileIO, FileIOBuilder}; //! use iceberg::spec::DataFile; //! use iceberg::transaction::Transaction; @@ -112,7 +111,6 @@ //! //! use arrow_array::RecordBatch; //! use iceberg::io::FileIOBuilder; -//! use iceberg::io::FileIOType::Memory; //! use iceberg::memory::MemoryCatalogBuilder; //! use iceberg::spec::DataFile; //! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; From f444e0a8f634948ec5c71695000410b6fcd4c831 Mon Sep 17 00:00:00 2001 From: Leon Lin Date: Wed, 27 Aug 2025 19:48:34 -0700 Subject: [PATCH 5/5] Update new catalog --- crates/iceberg/src/catalog/memory/catalog.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 3a22a77c08..6ce3111419 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -62,10 +62,6 @@ impl CatalogBuilder for MemoryCatalogBuilder { ) -> impl Future> + Send { self.0.name = Some(name.into()); - // if props.contains_key(MEMORY_CATALOG_IO_TYPE) { - // self.0.io_type = props.get(MEMORY_CATALOG_IO_TYPE).cloned() - // } - if props.contains_key(MEMORY_CATALOG_WAREHOUSE) { self.0.warehouse = props .get(MEMORY_CATALOG_WAREHOUSE) @@ -91,7 +87,7 @@ impl CatalogBuilder for MemoryCatalogBuilder { "Catalog warehouse is required", )) } else { - Ok(MemoryCatalog::new(self.0)) + MemoryCatalog::new(self.0) } }; @@ -116,16 +112,14 @@ pub struct MemoryCatalog { impl MemoryCatalog { /// Creates a memory catalog. - fn new(config: MemoryCatalogConfig) -> Self { - Self { + fn new(config: MemoryCatalogConfig) -> Result { + Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io: FileIO::from_path(&config.warehouse) - .unwrap() + file_io: FileIO::from_path(&config.warehouse)? .with_props(config.props) - .build() - .unwrap(), + .build()?, warehouse_location: config.warehouse, - } + }) } /// Loads a table from the locked namespace state.