Skip to content

Commit

Permalink
fix(metadata): export iceberg schema in manifests table (#871)
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 authored Feb 8, 2025
1 parent 98c1874 commit 32404bf
Showing 1 changed file with 176 additions and 84 deletions.
260 changes: 176 additions & 84 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::builder::{
BooleanBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
BooleanBuilder, GenericListBuilder, ListBuilder, PrimitiveBuilder, StringBuilder, StructBuilder,
};
use arrow_array::types::{Int32Type, Int64Type, Int8Type};
use arrow_array::types::{Int32Type, Int64Type};
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema};
use arrow_schema::{DataType, Field, Fields};
use futures::{stream, StreamExt};

use crate::arrow::schema_to_arrow_schema;
use crate::scan::ArrowRecordBatchStream;
use crate::spec::{FieldSummary, ListType, NestedField, PrimitiveType, StructType, Type};
use crate::table::Table;
use crate::Result;

Expand All @@ -40,44 +43,111 @@ impl<'a> ManifestsTable<'a> {
Self { table }
}

fn partition_summary_fields() -> Vec<Field> {
vec![
Field::new("contains_null", DataType::Boolean, false),
Field::new("contains_nan", DataType::Boolean, true),
Field::new("lower_bound", DataType::Utf8, true),
Field::new("upper_bound", DataType::Utf8, true),
]
}

/// Returns the schema of the manifests table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new("content", DataType::Int8, false),
Field::new("path", DataType::Utf8, false),
Field::new("length", DataType::Int64, false),
Field::new("partition_spec_id", DataType::Int32, false),
Field::new("added_snapshot_id", DataType::Int64, false),
Field::new("added_data_files_count", DataType::Int32, false),
Field::new("existing_data_files_count", DataType::Int32, false),
Field::new("deleted_data_files_count", DataType::Int32, false),
Field::new("added_delete_files_count", DataType::Int32, false),
Field::new("existing_delete_files_count", DataType::Int32, false),
Field::new("deleted_delete_files_count", DataType::Int32, false),
Field::new(
/// Returns the iceberg schema of the manifests table.
pub fn schema(&self) -> crate::spec::Schema {
let fields = vec![
NestedField::new(14, "content", Type::Primitive(PrimitiveType::Int), true),
NestedField::new(1, "path", Type::Primitive(PrimitiveType::String), true),
NestedField::new(2, "length", Type::Primitive(PrimitiveType::Long), true),
NestedField::new(
3,
"partition_spec_id",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
4,
"added_snapshot_id",
Type::Primitive(PrimitiveType::Long),
true,
),
NestedField::new(
5,
"added_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
6,
"existing_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
7,
"deleted_data_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
15,
"added_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
16,
"existing_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
17,
"deleted_delete_files_count",
Type::Primitive(PrimitiveType::Int),
true,
),
NestedField::new(
8,
"partition_summaries",
DataType::List(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
))),
false,
Type::List(ListType {
element_field: Arc::new(NestedField::new(
9,
"item",
Type::Struct(StructType::new(vec![
Arc::new(NestedField::new(
10,
"contains_null",
Type::Primitive(PrimitiveType::Boolean),
true,
)),
Arc::new(NestedField::new(
11,
"contains_nan",
Type::Primitive(PrimitiveType::Boolean),
false,
)),
Arc::new(NestedField::new(
12,
"lower_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
Arc::new(NestedField::new(
13,
"upper_bound",
Type::Primitive(PrimitiveType::String),
false,
)),
])),
true,
)),
}),
true,
),
])
];

crate::spec::Schema::builder()
.with_fields(fields.into_iter().map(|f| f.into()))
.build()
.unwrap()
}

/// Scans the manifests table.
pub async fn scan(&self) -> Result<ArrowRecordBatchStream> {
let mut content = PrimitiveBuilder::<Int8Type>::new();
let schema = schema_to_arrow_schema(&self.schema())?;

let mut content = PrimitiveBuilder::<Int32Type>::new();
let mut path = StringBuilder::new();
let mut length = PrimitiveBuilder::<Int64Type>::new();
let mut partition_spec_id = PrimitiveBuilder::<Int32Type>::new();
Expand All @@ -88,22 +158,14 @@ impl<'a> ManifestsTable<'a> {
let mut added_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut existing_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut deleted_delete_files_count = PrimitiveBuilder::<Int32Type>::new();
let mut partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(Self::partition_summary_fields()),
0,
))
.with_field(Arc::new(Field::new_struct(
"item",
Self::partition_summary_fields(),
false,
)));
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i8);
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
length.append_value(manifest.manifest_length);
partition_spec_id.append_value(manifest.partition_spec_id);
Expand All @@ -119,32 +181,11 @@ impl<'a> ManifestsTable<'a> {
.append_value(manifest.existing_files_count.unwrap_or(0) as i32);
deleted_delete_files_count
.append_value(manifest.deleted_files_count.unwrap_or(0) as i32);

let partition_summaries_builder = partition_summaries.values();
for summary in &manifest.partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
partition_summaries.append(true);
self.append_partition_summaries(&mut partition_summaries, &manifest.partitions);
}
}

let batch = RecordBatch::try_new(Arc::new(self.schema()), vec![
let batch = RecordBatch::try_new(Arc::new(schema), vec![
Arc::new(content.finish()),
Arc::new(path.finish()),
Arc::new(length.finish()),
Expand All @@ -158,9 +199,60 @@ impl<'a> ManifestsTable<'a> {
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
}

fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
let schema = schema_to_arrow_schema(&self.schema())?;
let partition_summary_fields =
match schema.field_with_name("partition_summaries")?.data_type() {
DataType::List(list_type) => match list_type.data_type() {
DataType::Struct(fields) => fields.to_vec(),
_ => unreachable!(),
},
_ => unreachable!(),
};

let partition_summaries = ListBuilder::new(StructBuilder::from_fields(
Fields::from(partition_summary_fields.clone()),
0,
))
.with_field(Arc::new(
Field::new_struct("item", partition_summary_fields, false).with_metadata(
HashMap::from([("PARQUET:field_id".to_string(), "9".to_string())]),
),
));

Ok(partition_summaries)
}

fn append_partition_summaries(
&self,
builder: &mut GenericListBuilder<i32, StructBuilder>,
partitions: &[FieldSummary],
) {
let partition_summaries_builder = builder.values();
for summary in partitions {
partition_summaries_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_value(summary.contains_null);
partition_summaries_builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_option(summary.contains_nan);
partition_summaries_builder
.field_builder::<StringBuilder>(2)
.unwrap()
.append_option(summary.lower_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder
.field_builder::<StringBuilder>(3)
.unwrap()
.append_option(summary.upper_bound.as_ref().map(|v| v.to_string()));
partition_summaries_builder.append(true);
}
builder.append(true);
}
}

#[cfg(test)]
Expand All @@ -175,25 +267,25 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let batch_stream = fixture.table.inspect().manifests().scan().await.unwrap();
let record_batch = fixture.table.inspect().manifests().scan().await.unwrap();

check_record_batches(
batch_stream,
record_batch,
expect![[r#"
Field { name: "content", data_type: Int8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
Field { name: "content", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14"} },
Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} },
Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} },
Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5"} },
Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6"} },
Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7"} },
Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15"} },
Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16"} },
Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17"} },
Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10"} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11"} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12"} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9"} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8"} }"#]],
expect![[r#"
content: PrimitiveArray<Int8>
content: PrimitiveArray<Int32>
[
0,
],
Expand Down

0 comments on commit 32404bf

Please sign in to comment.