diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 12d18b9f36..6ce3111419 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -28,29 +28,98 @@ use crate::io::FileIO; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ - Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, - TableCreation, TableIdent, + Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, + TableCommit, TableCreation, TableIdent, }; +/// Memory catalog warehouse location +pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse"; + /// namespace `location` property const LOCATION: &str = "location"; +/// Builder for [`MemoryCatalog`]. +#[derive(Debug)] +pub struct MemoryCatalogBuilder(MemoryCatalogConfig); + +impl Default for MemoryCatalogBuilder { + fn default() -> Self { + Self(MemoryCatalogConfig { + name: None, + warehouse: "".to_string(), + props: HashMap::new(), + }) + } +} + +impl CatalogBuilder for MemoryCatalogBuilder { + type C = MemoryCatalog; + + fn load( + mut self, + name: impl Into, + props: HashMap, + ) -> impl Future> + Send { + self.0.name = Some(name.into()); + + if props.contains_key(MEMORY_CATALOG_WAREHOUSE) { + self.0.warehouse = props + .get(MEMORY_CATALOG_WAREHOUSE) + .cloned() + .unwrap_or_default() + } + + // Collect other remaining properties + self.0.props = props + .into_iter() + .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE) + .collect(); + + let result = { + if self.0.name.is_none() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog name is required", + )) + } else if self.0.warehouse.is_empty() { + Err(Error::new( + ErrorKind::DataInvalid, + "Catalog warehouse is required", + )) + } else { + MemoryCatalog::new(self.0) + } + }; + + std::future::ready(result) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct MemoryCatalogConfig { + name: Option, + warehouse: String, + props: HashMap, +} + /// Memory catalog implementation. #[derive(Debug)] pub struct MemoryCatalog { root_namespace_state: Mutex, file_io: FileIO, - warehouse_location: Option, + warehouse_location: String, } impl MemoryCatalog { /// Creates a memory catalog. - pub fn new(file_io: FileIO, warehouse_location: Option) -> Self { - Self { + fn new(config: MemoryCatalogConfig) -> Result { + Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io, - warehouse_location, - } + file_io: FileIO::from_path(&config.warehouse)? + .with_props(config.props) + .build()?, + warehouse_location: config.warehouse, + }) } /// Loads a table from the locked namespace state. @@ -188,22 +257,9 @@ impl Catalog for MemoryCatalog { None => { let namespace_properties = root_namespace_state.get_properties(namespace_ident)?; let location_prefix = match namespace_properties.get(LOCATION) { - Some(namespace_location) => Ok(namespace_location.clone()), - None => match self.warehouse_location.clone() { - Some(warehouse_location) => Ok(format!( - "{}/{}", - warehouse_location, - namespace_ident.join("/") - )), - None => Err(Error::new( - ErrorKind::Unexpected, - format!( - "Cannot create table {:?}. No default path is set, please specify a location when creating a table.", - &table_ident - ), - )), - }, - }?; + Some(namespace_location) => namespace_location.clone(), + None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")), + }; let location = format!("{}/{}", location_prefix, table_ident.name()); @@ -340,10 +396,15 @@ mod tests { temp_dir.path().to_str().unwrap().to_string() } - fn new_memory_catalog() -> impl Catalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + async fn new_memory_catalog() -> impl Catalog { let warehouse_location = temp_path(); - MemoryCatalog::new(file_io, Some(warehouse_location)) + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]), + ) + .await + .unwrap() } async fn create_namespace(catalog: &C, namespace_ident: &NamespaceIdent) { @@ -453,14 +514,14 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_empty_vector() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]); } #[tokio::test] async fn test_list_namespaces_returns_single_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -471,7 +532,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -484,7 +545,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_only_top_level_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("b".into()); @@ -503,7 +564,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_no_namespaces_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -519,7 +580,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_namespace_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".into()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_3 = NamespaceIdent::new("c".into()); @@ -546,7 +607,7 @@ mod tests { #[tokio::test] async fn test_list_namespaces_returns_multiple_namespaces_under_parent() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("a".to_string()); let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap(); let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -578,7 +639,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_false() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -592,7 +653,7 @@ mod tests { #[tokio::test] async fn test_namespace_exists_returns_true() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -601,7 +662,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_empty_properties() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); assert_eq!( @@ -620,7 +681,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_with_properties() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -642,7 +703,7 @@ mod tests { #[tokio::test] async fn test_create_namespace_throws_error_if_namespace_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -666,7 +727,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let parent_namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &parent_namespace_ident).await; @@ -688,7 +749,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -711,7 +772,7 @@ mod tests { #[tokio::test] async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); @@ -733,7 +794,7 @@ mod tests { #[tokio::test] async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident_a).await; @@ -767,7 +828,7 @@ mod tests { #[tokio::test] async fn test_get_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); let mut properties: HashMap = HashMap::new(); @@ -785,7 +846,7 @@ mod tests { #[tokio::test] async fn test_get_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -798,7 +859,7 @@ mod tests { #[tokio::test] async fn test_get_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -817,7 +878,7 @@ mod tests { #[tokio::test] async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("b".into()); @@ -836,7 +897,7 @@ mod tests { #[tokio::test] async fn test_update_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -856,7 +917,7 @@ mod tests { #[tokio::test] async fn test_update_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -877,7 +938,7 @@ mod tests { #[tokio::test] async fn test_update_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -904,7 +965,7 @@ mod tests { #[tokio::test] async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await; let non_existent_namespace_ident = NamespaceIdent::new("def".into()); @@ -923,7 +984,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("abc".into()); create_namespace(&catalog, &namespace_ident).await; @@ -934,7 +995,7 @@ mod tests { #[tokio::test] async fn test_drop_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -953,7 +1014,7 @@ mod tests { #[tokio::test] async fn test_drop_deeply_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -988,7 +1049,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("abc".into()); assert_eq!( @@ -1006,7 +1067,7 @@ mod tests { #[tokio::test] async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; create_namespace(&catalog, &NamespaceIdent::new("a".into())).await; let non_existent_namespace_ident = @@ -1026,7 +1087,7 @@ mod tests { #[tokio::test] async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1046,7 +1107,7 @@ mod tests { #[tokio::test] async fn test_create_table_with_location() { let tmp_dir = TempDir::new().unwrap(); - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1084,9 +1145,17 @@ mod tests { #[tokio::test] async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1126,9 +1195,17 @@ mod tests { #[tokio::test] async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); let mut namespace_properties = HashMap::new(); @@ -1179,9 +1256,17 @@ mod tests { #[tokio::test] async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); // note: no location specified in namespace_properties @@ -1220,9 +1305,17 @@ mod tests { #[tokio::test] async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); let warehouse_location = temp_path(); - let catalog = MemoryCatalog::new(file_io, Some(warehouse_location.clone())); + let catalog = MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + warehouse_location.clone(), + )]), + ) + .await + .unwrap(); let namespace_ident = NamespaceIdent::new("a".into()); catalog @@ -1268,37 +1361,20 @@ mod tests { #[tokio::test] async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing() { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - let catalog = MemoryCatalog::new(file_io, None); - - let namespace_ident = NamespaceIdent::new("a".into()); - create_namespace(&catalog, &namespace_ident).await; - - let table_name = "tbl1"; - let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + let catalog = MemoryCatalogBuilder::default() + .load("memory", HashMap::from([])) + .await; + assert!(catalog.is_err()); assert_eq!( - catalog - .create_table( - &namespace_ident, - TableCreation::builder() - .name(table_name.into()) - .schema(simple_table_schema()) - .build(), - ) - .await - .unwrap_err() - .to_string(), - format!( - "Unexpected => Cannot create table {:?}. No default path is set, please specify a location when creating a table.", - &expected_table_ident - ) - ) + catalog.unwrap_err().to_string(), + "DataInvalid => Catalog warehouse is required" + ); } #[tokio::test] async fn test_create_table_throws_error_if_table_with_same_name_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; let table_name = "tbl1"; @@ -1330,7 +1406,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_empty_vector() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1339,7 +1415,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_a_single_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1353,7 +1429,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_multiple_tables() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1369,7 +1445,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_tables_from_correct_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_1 = NamespaceIdent::new("n1".into()); let namespace_ident_2 = NamespaceIdent::new("n2".into()); create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await; @@ -1397,7 +1473,7 @@ mod tests { #[tokio::test] async fn test_list_tables_returns_table_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1413,7 +1489,7 @@ mod tests { #[tokio::test] async fn test_list_tables_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); @@ -1432,7 +1508,7 @@ mod tests { #[tokio::test] async fn test_drop_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1443,7 +1519,7 @@ mod tests { #[tokio::test] async fn test_drop_table_drops_table_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1461,7 +1537,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1482,7 +1558,7 @@ mod tests { #[tokio::test] async fn test_drop_table_throws_error_if_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1503,7 +1579,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_true() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1514,7 +1590,7 @@ mod tests { #[tokio::test] async fn test_table_exists_returns_false() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1529,7 +1605,7 @@ mod tests { #[tokio::test] async fn test_table_exists_under_nested_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await; @@ -1550,7 +1626,7 @@ mod tests { #[tokio::test] async fn test_table_exists_throws_error_if_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_namespace_ident = NamespaceIdent::new("n1".into()); let non_existent_table_ident = @@ -1571,7 +1647,7 @@ mod tests { #[tokio::test] async fn test_rename_table_in_same_namespace() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1590,7 +1666,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let src_namespace_ident = NamespaceIdent::new("a".into()); let dst_namespace_ident = NamespaceIdent::new("b".into()); create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await; @@ -1616,7 +1692,7 @@ mod tests { #[tokio::test] async fn test_rename_table_src_table_is_same_as_dst_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into()); @@ -1634,7 +1710,7 @@ mod tests { #[tokio::test] async fn test_rename_table_across_nested_namespaces() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident_a = NamespaceIdent::new("a".into()); let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap(); let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap(); @@ -1661,7 +1737,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = @@ -1686,7 +1762,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let src_namespace_ident = NamespaceIdent::new("n1".into()); let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into()); create_namespace(&catalog, &src_namespace_ident).await; @@ -1710,7 +1786,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_src_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1728,7 +1804,7 @@ mod tests { #[tokio::test] async fn test_rename_table_throws_error_if_dst_table_already_exists() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("n1".into()); create_namespace(&catalog, &namespace_ident).await; let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into()); @@ -1751,7 +1827,7 @@ mod tests { #[tokio::test] async fn test_register_table() { // Create a catalog and namespace - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("test_namespace".into()); create_namespace(&catalog, &namespace_ident).await; @@ -1794,7 +1870,7 @@ mod tests { #[tokio::test] async fn test_update_table() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let table = create_table_with_namespace(&catalog).await; @@ -1829,7 +1905,7 @@ mod tests { #[tokio::test] async fn test_update_table_fails_if_table_doesnt_exist() { - let catalog = new_memory_catalog(); + let catalog = new_memory_catalog().await; let namespace_ident = NamespaceIdent::new("a".into()); create_namespace(&catalog, &namespace_ident).await; diff --git a/crates/iceberg/src/catalog/memory/mod.rs b/crates/iceberg/src/catalog/memory/mod.rs index bcad16dda7..b8d9e4a8ee 100644 --- a/crates/iceberg/src/catalog/memory/mod.rs +++ b/crates/iceberg/src/catalog/memory/mod.rs @@ -20,4 +20,4 @@ mod catalog; mod namespace_state; -pub use catalog::MemoryCatalog; +pub use catalog::*; diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 612a5db601..aae8efed74 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -22,16 +22,26 @@ //! ## Scan A Table //! //! ```rust, no_run +//! use std::collections::HashMap; +//! //! use futures::TryStreamExt; //! use iceberg::io::{FileIO, FileIOBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::memory::MemoryCatalogBuilder; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), +//! ) +//! .await?; //! // Load table from catalog. //! let table = catalog //! .load_table(&TableIdent::from_strs(["hello", "world"])?) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 2076d7707d..02c4929e7e 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -40,6 +40,7 @@ //! //! # Simple example for the data file writer used parquet physical format: //! ```rust, no_run +//! use std::collections::HashMap; //! use std::sync::Arc; //! //! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; @@ -53,14 +54,21 @@ //! DefaultFileNameGenerator, DefaultLocationGenerator, //! }; //! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! use parquet::file::properties::WriterProperties; //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), +//! ) +//! .await?; //! // Add customized code to create a table first. //! //! // Load table from catalog. @@ -98,10 +106,12 @@ //! //! # Custom writer to record latency //! ```rust, no_run +//! use std::collections::HashMap; //! use std::time::Instant; //! //! use arrow_array::RecordBatch; //! use iceberg::io::FileIOBuilder; +//! use iceberg::memory::MemoryCatalogBuilder; //! use iceberg::spec::DataFile; //! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; //! use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -109,7 +119,7 @@ //! DefaultFileNameGenerator, DefaultLocationGenerator, //! }; //! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; -//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent}; +//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent}; //! use parquet::file::properties::WriterProperties; //! //! #[derive(Clone)] @@ -160,11 +170,17 @@ //! //! #[tokio::main] //! async fn main() -> Result<()> { -//! // Build your file IO. -//! use iceberg::NamespaceIdent; -//! let file_io = FileIOBuilder::new("memory").build()?; //! // Connect to a catalog. -//! let catalog = MemoryCatalog::new(file_io, None); +//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; +//! let catalog = MemoryCatalogBuilder::default() +//! .load( +//! "memory", +//! HashMap::from([( +//! MEMORY_CATALOG_WAREHOUSE.to_string(), +//! "file:///path/to/warehouse".to_string(), +//! )]), +//! ) +//! .await?; //! //! // Add customized code to create a table first. //! diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 1ce8fa8d82..0ee0504faa 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -271,12 +271,12 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::StreamExt; - use iceberg::io::FileIOBuilder; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema, Struct, Type, }; - use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; use super::*; use crate::physical_plan::DATA_FILES_COL_NAME; @@ -374,11 +374,18 @@ mod tests { #[tokio::test] async fn test_iceberg_commit_exec() -> Result<(), Box> { // Create a memory catalog with in-memory file IO - let file_io = FileIOBuilder::new("memory").build()?; - let catalog = Arc::new(MemoryCatalog::new( - file_io, - Some("memory://root".to_string()), - )); + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://root".to_string(), + )]), + ) + .await + .unwrap(), + ); // Create a namespace let namespace = NamespaceIdent::new("test_namespace".to_string()); diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4b8ef1ab11..82e02215ff 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -328,11 +328,11 @@ mod tests { use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::{StreamExt, stream}; - use iceberg::io::FileIOBuilder; + use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json, }; - use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation}; + use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use tempfile::TempDir; @@ -425,9 +425,14 @@ mod tests { } /// Helper function to create a memory catalog - fn get_iceberg_catalog() -> MemoryCatalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - MemoryCatalog::new(file_io, Some(temp_path())) + async fn get_iceberg_catalog() -> MemoryCatalog { + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]), + ) + .await + .unwrap() } /// Helper function to create a test table schema @@ -458,7 +463,7 @@ mod tests { #[tokio::test] async fn test_iceberg_write_exec() -> Result<()> { // 1. Set up test environment - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_namespace".to_string()); // Create namespace diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index 1491e4dbff..3ae7175344 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -26,10 +26,10 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; -use iceberg::io::FileIOBuilder; +use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; use iceberg::test_utils::check_record_batches; -use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use tempfile::TempDir; @@ -38,9 +38,14 @@ fn temp_path() -> String { temp_dir.path().to_str().unwrap().to_string() } -fn get_iceberg_catalog() -> MemoryCatalog { - let file_io = FileIOBuilder::new_fs_io().build().unwrap(); - MemoryCatalog::new(file_io, Some(temp_path())) +async fn get_iceberg_catalog() -> MemoryCatalog { + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]), + ) + .await + .unwrap() } fn get_struct_type() -> StructType { @@ -86,7 +91,7 @@ fn get_table_creation( #[tokio::test] async fn test_provider_plan_stream_schema() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -139,7 +144,7 @@ async fn test_provider_plan_stream_schema() -> Result<()> { #[tokio::test] async fn test_provider_list_table_names() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_list_table_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -171,7 +176,7 @@ async fn test_provider_list_table_names() -> Result<()> { #[tokio::test] async fn test_provider_list_schema_names() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("test_provider_list_schema_names".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -196,7 +201,7 @@ async fn test_provider_list_schema_names() -> Result<()> { #[tokio::test] async fn test_table_projection() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -264,7 +269,7 @@ async fn test_table_projection() -> Result<()> { #[tokio::test] async fn test_table_predict_pushdown() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?; @@ -309,7 +314,7 @@ async fn test_table_predict_pushdown() -> Result<()> { #[tokio::test] async fn test_metadata_table() -> Result<()> { - let iceberg_catalog = get_iceberg_catalog(); + let iceberg_catalog = get_iceberg_catalog().await; let namespace = NamespaceIdent::new("ns".to_string()); set_test_namespace(&iceberg_catalog, &namespace).await?;