Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 191 additions & 76 deletions crates/iceberg/src/catalog/memory/catalog.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/iceberg/src/catalog/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
mod catalog;
mod namespace_state;

pub use catalog::MemoryCatalog;
pub use catalog::*;
31 changes: 31 additions & 0 deletions crates/iceberg/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,34 @@ pub use storage_s3::*;
pub(crate) fn is_truthy(value: &str) -> bool {
["true", "t", "1", "on"].contains(&value.to_lowercase().as_str())
}

/// All supported File IO types
#[derive(Debug)]
pub enum FileIOType {
/// In-Memory FileIO with scheme prefix 'memory'
Memory,
/// Local file system FileIO with scheme prefix 'file'
Fs,
/// S3 FileIO with scheme prefix 's3', 's3a'
S3,
/// GCS FileIO with scheme prefix 'gs', 'gcs'
Gcs,
/// Object Storage Service FileIO with scheme prefix 'oss'
Oss,
/// Azdls FileIO with scheme prefix 'abfss', 'abfs', 'wasbs', 'wasb'
Azdls,
}

impl FileIOType {
/// Get string representation of file io type
pub const fn as_str(&self) -> &str {
match self {
FileIOType::Memory => "memory",
FileIOType::S3 => "s3",
FileIOType::Fs => "file",
FileIOType::Gcs => "gcs",
FileIOType::Oss => "oss",
FileIOType::Azdls => "azdls",
}
}
}
20 changes: 16 additions & 4 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@
//! ## Scan A Table
//!
//! ```rust, no_run
//! use std::collections::HashMap;
//!
//! use futures::TryStreamExt;
//! use iceberg::io::FileIOType::Memory;
//! use iceberg::io::{FileIO, FileIOBuilder};
//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent};
//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder};
//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Build your file IO.
//! let file_io = FileIOBuilder::new("memory").build()?;
//! // Connect to a catalog.
//! let catalog = MemoryCatalog::new(file_io, None);
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
//! HashMap::from([
//! (
//! MEMORY_CATALOG_IO_TYPE.to_string(),
//! Memory.as_str().to_string(),
//! ), // specify the file io type
//! ]),
//! )
//! .await?;
//! // Load table from catalog.
//! let table = catalog
//! .load_table(&TableIdent::from_strs(["hello", "world"])?)
Expand Down
39 changes: 30 additions & 9 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
//!
//! # Simple example for the data file writer used parquet physical format:
//! ```rust, no_run
//! use std::collections::HashMap;
//! use std::sync::Arc;
//!
//! use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
//! use async_trait::async_trait;
//! use iceberg::io::FileIOType::Memory;
//! use iceberg::io::{FileIO, FileIOBuilder};
//! use iceberg::spec::DataFile;
//! use iceberg::transaction::Transaction;
Expand All @@ -53,14 +55,23 @@
//! DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent};
//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
//! use parquet::file::properties::WriterProperties;
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Build your file IO.
//! let file_io = FileIOBuilder::new("memory").build()?;
//! // Connect to a catalog.
//! let catalog = MemoryCatalog::new(file_io, None);
//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder};
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
//! HashMap::from([
//! (
//! MEMORY_CATALOG_IO_TYPE.to_string(),
//! Memory.as_str().to_string(),
//! ), // specify the file io type
//! ]),
//! )
//! .await?;
//! // Add customized code to create a table first.
//!
//! // Load table from catalog.
Expand Down Expand Up @@ -98,18 +109,21 @@
//!
//! # Custom writer to record latency
//! ```rust, no_run
//! use std::collections::HashMap;
//! use std::time::Instant;
//!
//! use arrow_array::RecordBatch;
//! use iceberg::io::FileIOBuilder;
//! use iceberg::io::FileIOType::Memory;
//! use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MemoryCatalogBuilder};
//! use iceberg::spec::DataFile;
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
//! use iceberg::writer::file_writer::location_generator::{
//! DefaultFileNameGenerator, DefaultLocationGenerator,
//! };
//! use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
//! use iceberg::{Catalog, MemoryCatalog, Result, TableIdent};
//! use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, Result, TableIdent};
//! use parquet::file::properties::WriterProperties;
//!
//! #[derive(Clone)]
Expand Down Expand Up @@ -160,11 +174,18 @@
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//! // Build your file IO.
//! use iceberg::NamespaceIdent;
//! let file_io = FileIOBuilder::new("memory").build()?;
//! // Connect to a catalog.
//! let catalog = MemoryCatalog::new(file_io, None);
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
//! HashMap::from([
//! (
//! MEMORY_CATALOG_IO_TYPE.to_string(),
//! Memory.as_str().to_string(),
//! ), // specify the file io type
//! ]),
//! )
//! .await?;
//!
//! // Add customized code to create a table first.
//!
Expand Down
28 changes: 21 additions & 7 deletions crates/integrations/datafusion/src/physical_plan/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,13 @@ mod tests {
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::StreamExt;
use iceberg::io::FileIOBuilder;
use iceberg::io::FileIOType::Memory;
use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{
DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema,
Struct, Type,
};
use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation, TableIdent};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};

use super::*;
use crate::physical_plan::DATA_FILES_COL_NAME;
Expand Down Expand Up @@ -374,11 +375,24 @@ mod tests {
#[tokio::test]
async fn test_iceberg_commit_exec() -> Result<(), Box<dyn std::error::Error>> {
// Create a memory catalog with in-memory file IO
let file_io = FileIOBuilder::new("memory").build()?;
let catalog = Arc::new(MemoryCatalog::new(
file_io,
Some("memory://root".to_string()),
));
let catalog = Arc::new(
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([
(
MEMORY_CATALOG_IO_TYPE.to_string(),
Memory.as_str().to_string(),
),
(
MEMORY_CATALOG_WAREHOUSE.to_string(),
"memory://root".to_string(),
),
]),
)
.await
.unwrap(),
);

// Create a namespace
let namespace = NamespaceIdent::new("test_namespace".to_string());
Expand Down
21 changes: 15 additions & 6 deletions crates/integrations/datafusion/src/physical_plan/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,12 @@ mod tests {
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::{StreamExt, stream};
use iceberg::io::FileIOBuilder;
use iceberg::io::FileIOType::Fs;
use iceberg::memory::{MEMORY_CATALOG_IO_TYPE, MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{
DataFileFormat, NestedField, PrimitiveType, Schema, Type, deserialize_data_file_from_json,
};
use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation};
use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use tempfile::TempDir;

Expand Down Expand Up @@ -425,9 +426,17 @@ mod tests {
}

/// Helper function to create a memory catalog
fn get_iceberg_catalog() -> MemoryCatalog {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
MemoryCatalog::new(file_io, Some(temp_path()))
async fn get_iceberg_catalog() -> MemoryCatalog {
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([
(MEMORY_CATALOG_IO_TYPE.to_string(), Fs.as_str().to_string()),
(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path()),
]),
)
.await
.unwrap()
}

/// Helper function to create a test table schema
Expand Down Expand Up @@ -458,7 +467,7 @@ mod tests {
#[tokio::test]
async fn test_iceberg_write_exec() -> Result<()> {
// 1. Set up test environment
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_namespace".to_string());

// Create namespace
Expand Down
27 changes: 16 additions & 11 deletions crates/integrations/datafusion/tests/integration_datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::execution::context::SessionContext;
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use expect_test::expect;
use iceberg::io::FileIOBuilder;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
use iceberg::test_utils::check_record_batches;
use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, Result, TableCreation};
use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation};
use iceberg_datafusion::IcebergCatalogProvider;
use tempfile::TempDir;

Expand All @@ -38,9 +38,14 @@ fn temp_path() -> String {
temp_dir.path().to_str().unwrap().to_string()
}

fn get_iceberg_catalog() -> MemoryCatalog {
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
MemoryCatalog::new(file_io, Some(temp_path()))
async fn get_iceberg_catalog() -> MemoryCatalog {
MemoryCatalogBuilder::default()
.load(
"memory",
HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), temp_path())]),
)
.await
.unwrap()
}

fn get_struct_type() -> StructType {
Expand Down Expand Up @@ -86,7 +91,7 @@ fn get_table_creation(

#[tokio::test]
async fn test_provider_plan_stream_schema() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_provider_get_table_schema".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand Down Expand Up @@ -139,7 +144,7 @@ async fn test_provider_plan_stream_schema() -> Result<()> {

#[tokio::test]
async fn test_provider_list_table_names() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_provider_list_table_names".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand Down Expand Up @@ -171,7 +176,7 @@ async fn test_provider_list_table_names() -> Result<()> {

#[tokio::test]
async fn test_provider_list_schema_names() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("test_provider_list_schema_names".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand All @@ -196,7 +201,7 @@ async fn test_provider_list_schema_names() -> Result<()> {

#[tokio::test]
async fn test_table_projection() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("ns".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand Down Expand Up @@ -264,7 +269,7 @@ async fn test_table_projection() -> Result<()> {

#[tokio::test]
async fn test_table_predict_pushdown() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("ns".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand Down Expand Up @@ -309,7 +314,7 @@ async fn test_table_predict_pushdown() -> Result<()> {

#[tokio::test]
async fn test_metadata_table() -> Result<()> {
let iceberg_catalog = get_iceberg_catalog();
let iceberg_catalog = get_iceberg_catalog().await;
let namespace = NamespaceIdent::new("ns".to_string());
set_test_namespace(&iceberg_catalog, &namespace).await?;

Expand Down
Loading