Skip to content

Commit d1ec4c9

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents eb959c7 + f2108e9 commit d1ec4c9

File tree

21 files changed

+114
-146
lines changed

21 files changed

+114
-146
lines changed

datafusion_iceberg/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub enum Error {
5656
ParseInt(#[from] std::num::ParseIntError),
5757
/// parse int error
5858
#[error(transparent)]
59-
ConfigBuilder(#[from] crate::table::DataFusionTableConfigBuilderError),
59+
DeriveBuilder(#[from] derive_builder::UninitializedFieldError),
6060
}
6161

6262
impl From<Error> for DataFusionError {

datafusion_iceberg/src/table.rs

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ impl From<MaterializedView> for DataFusionTable {
107107
}
108108

109109
#[derive(Clone, Debug, Builder)]
110+
#[builder(build_fn(error = "Error"))]
110111
pub struct DataFusionTableConfig {
111112
/// With this option, an additional "__data_file_path" column is added to the output of the
112113
/// TableProvider that contains the path of the data-file the row originates from.
@@ -407,30 +408,22 @@ async fn table_scan(
407408

408409
let file_schema: SchemaRef = Arc::new((schema.fields()).try_into().unwrap());
409410

410-
let projection = projection.cloned().or_else(|| {
411-
Some(
412-
arrow_schema
413-
.fields()
414-
.iter()
415-
.enumerate()
416-
.map(|(i, _)| i)
417-
.collect(),
418-
)
419-
});
411+
// If no projection was specified default to projecting all the fields
412+
let projection = projection
413+
.cloned()
414+
.unwrap_or((0..arrow_schema.fields().len()).collect_vec());
420415

421-
let projection_expr: Option<Vec<_>> = projection.as_ref().map(|projection| {
422-
projection
423-
.iter()
424-
.enumerate()
425-
.map(|(i, id)| {
426-
let name = arrow_schema.fields[*id].name();
427-
(
428-
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
429-
name.to_owned(),
430-
)
431-
})
432-
.collect()
433-
});
416+
let projection_expr: Vec<_> = projection
417+
.iter()
418+
.enumerate()
419+
.map(|(i, id)| {
420+
let name = arrow_schema.fields[*id].name();
421+
(
422+
Arc::new(Column::new(name, i)) as Arc<dyn PhysicalExpr>,
423+
name.to_owned(),
424+
)
425+
})
426+
.collect();
434427

435428
if enable_data_file_path_column {
436429
table_partition_cols.push(Field::new(DATA_FILE_PATH_COLUMN, DataType::Utf8, false));
@@ -615,6 +608,31 @@ async fn table_scan(
615608

616609
let mut data_file_iter = data_files.into_iter().peekable();
617610

611+
// Gather the complete equality projection up-front, since in general the requested
612+
// projection may differ from the equality delete columns. Moreover, in principle
613+
// each equality delete file may have different deletion columns.
614+
// And since we need to reconcile them all with data files using joins and unions,
615+
// we need to make sure their schemas are fully compatible in all intermediate nodes.
616+
let mut equality_projection = projection.clone();
617+
delete_files
618+
.iter()
619+
.flat_map(|delete_manifest| delete_manifest.1.data_file().equality_ids())
620+
.flatten()
621+
.unique()
622+
.for_each(|eq_id| {
623+
// Look up the zero-based index of the column based on its equality id
624+
if let Some((id, _)) = schema
625+
.fields()
626+
.iter()
627+
.enumerate()
628+
.find(|(_, f)| f.id == *eq_id)
629+
{
630+
if !equality_projection.contains(&id) {
631+
equality_projection.push(id);
632+
}
633+
}
634+
});
635+
618636
let mut plan = stream::iter(delete_files.iter())
619637
.map(Ok::<_, DataFusionError>)
620638
.try_fold(None, |acc, delete_manifest| {
@@ -626,6 +644,8 @@ async fn table_scan(
626644
let file_schema: Arc<ArrowSchema> = file_schema.clone();
627645
let file_source = file_source.clone();
628646
let mut data_files = Vec::new();
647+
let equality_projection = equality_projection.clone();
648+
629649
while let Some(data_manifest) = data_file_iter.next_if(|x| {
630650
x.1.sequence_number().unwrap()
631651
< delete_manifest.1.sequence_number().unwrap()
@@ -657,26 +677,6 @@ async fn table_scan(
657677
);
658678
let delete_file_schema: SchemaRef =
659679
Arc::new((delete_schema.fields()).try_into().unwrap());
660-
let equality_projection: Option<Vec<usize>> =
661-
match (&projection, delete_manifest.1.data_file().equality_ids()) {
662-
(Some(projection), Some(equality_ids)) => {
663-
let collect: Vec<usize> = schema
664-
.iter()
665-
.enumerate()
666-
.filter_map(|(id, x)| {
667-
if equality_ids.contains(&x.id)
668-
&& !projection.contains(&id)
669-
{
670-
Some(id)
671-
} else {
672-
None
673-
}
674-
})
675-
.collect();
676-
Some([projection.as_slice(), &collect].concat())
677-
}
678-
_ => None,
679-
};
680680

681681
let last_updated_ms = table.metadata().last_updated_ms;
682682
let manifest_path = if enable_manifest_file_path_column {
@@ -724,7 +724,7 @@ async fn table_scan(
724724
)
725725
.with_file_group(FileGroup::new(data_files))
726726
.with_statistics(statistics)
727-
.with_projection(equality_projection)
727+
.with_projection(Some(equality_projection))
728728
.with_limit(limit)
729729
.with_table_partition_cols(table_partition_cols)
730730
.build();
@@ -804,7 +804,7 @@ async fn table_scan(
804804
)
805805
.with_file_group(FileGroup::new(additional_data_files))
806806
.with_statistics(statistics)
807-
.with_projection(projection.as_ref().cloned())
807+
.with_projection(Some(equality_projection))
808808
.with_limit(limit)
809809
.with_table_partition_cols(table_partition_cols)
810810
.build();
@@ -816,14 +816,8 @@ async fn table_scan(
816816
plan = Arc::new(UnionExec::new(vec![plan, data_files_scan]));
817817
}
818818

819-
if let Some(projection_expr) = projection_expr {
820-
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(
821-
projection_expr,
822-
plan,
823-
)?) as Arc<dyn ExecutionPlan>)
824-
} else {
825-
Ok(plan)
826-
}
819+
Ok::<_, DataFusionError>(Arc::new(ProjectionExec::try_new(projection_expr, plan)?)
820+
as Arc<dyn ExecutionPlan>)
827821
}
828822
})
829823
.try_collect::<Vec<_>>()
@@ -859,7 +853,7 @@ async fn table_scan(
859853
FileScanConfigBuilder::new(object_store_url, file_schema, file_source)
860854
.with_file_groups(file_groups)
861855
.with_statistics(statistics)
862-
.with_projection(projection.clone())
856+
.with_projection(Some(projection.clone()))
863857
.with_limit(limit)
864858
.with_table_partition_cols(table_partition_cols)
865859
.build();
@@ -873,10 +867,7 @@ async fn table_scan(
873867

874868
match plans.len() {
875869
0 => {
876-
let projected_schema = projection
877-
.map(|p| arrow_schema.project(&p))
878-
.transpose()?
879-
.unwrap_or(arrow_schema.as_ref().clone());
870+
let projected_schema = arrow_schema.project(&projection)?;
880871
Ok(Arc::new(EmptyExec::new(Arc::new(projected_schema))))
881872
}
882873
1 => Ok(plans.remove(0)),

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,34 @@ pub async fn test_equality_delete() {
215215
.unwrap()
216216
.collect();
217217
assert_batches_eq!(expected, &duckdb_batches);
218+
219+
// Test that projecting a column that is not included in equality deletes works
220+
ctx.sql(
221+
"INSERT INTO warehouse.test.orders (id, customer_id, product_id, date, amount) VALUES
222+
(7, 3, 2, '2020-01-01', 2),
223+
(8, 2, 1, '2020-02-02', 3),
224+
(9, 1, 3, '2020-01-01', 1);",
225+
)
226+
.await
227+
.expect("Failed to create query plan for insert")
228+
.collect()
229+
.await
230+
.expect("Failed to insert values into table");
231+
232+
let batches = ctx
233+
.sql("select sum(amount) from warehouse.test.orders")
234+
.await
235+
.expect("Failed to create plan for select")
236+
.collect()
237+
.await
238+
.expect("Failed to execute select query");
239+
240+
let expected = [
241+
"+-----------------------------------+",
242+
"| sum(warehouse.test.orders.amount) |",
243+
"+-----------------------------------+",
244+
"| 13 |",
245+
"+-----------------------------------+",
246+
];
247+
assert_batches_eq!(expected, &batches);
218248
}

iceberg-rust-spec/src/error.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,30 +55,9 @@ pub enum Error {
5555
/// parse int error
5656
#[error(transparent)]
5757
ParseInt(#[from] std::num::ParseIntError),
58-
/// table metadata builder
58+
/// derive builder
5959
#[error(transparent)]
60-
TableMetadataBuilder(#[from] crate::spec::table_metadata::TableMetadataBuilderError),
61-
/// view metadata builder
62-
#[error(transparent)]
63-
ViewMetadataBuilder(#[from] crate::spec::view_metadata::GeneralViewMetadataBuilderError),
64-
/// version builder
65-
#[error(transparent)]
66-
VersionBuilder(#[from] crate::spec::view_metadata::VersionBuilderError),
67-
/// manifest builder
68-
#[error(transparent)]
69-
ManifestEntryBuilder(#[from] crate::spec::manifest::ManifestEntryBuilderError),
70-
/// datafile builder
71-
#[error(transparent)]
72-
DatafileBuilder(#[from] crate::spec::manifest::DataFileBuilderError),
73-
/// snapshot builder
74-
#[error(transparent)]
75-
SnapshotBuilder(#[from] crate::spec::snapshot::SnapshotBuilderError),
76-
/// structype builder
77-
#[error(transparent)]
78-
StructTypeBuilder(#[from] crate::spec::types::StructTypeBuilderError),
79-
/// partition spec builder
80-
#[error(transparent)]
81-
PartitionSpec(#[from] crate::spec::partition::PartitionSpecBuilderError),
60+
DeriveBuilder(#[from] derive_builder::UninitializedFieldError),
8261
}
8362

8463
impl From<apache_avro::Error> for Error {

iceberg-rust-spec/src/spec/manifest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use super::{
3434
/// Entry in manifest with the iceberg spec version 2.
3535
#[derive(Debug, Serialize, PartialEq, Clone, Getters, Builder)]
3636
#[serde(into = "ManifestEntryEnum")]
37-
#[builder(setter(prefix = "with"))]
37+
#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
3838
pub struct ManifestEntry {
3939
/// Table format version
4040
format_version: FormatVersion,
@@ -516,7 +516,7 @@ impl From<HashMap<i32, Value>> for AvroMap<ByteBuf> {
516516
}
517517

518518
#[derive(Debug, PartialEq, Clone, Getters, Builder)]
519-
#[builder(setter(prefix = "with"))]
519+
#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
520520
/// DataFile found in Manifest.
521521
pub struct DataFile {
522522
///Type of content in data file.

iceberg-rust-spec/src/spec/materialized_view_metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub type MaterializedViewMetadata = GeneralViewMetadata<FullIdentifier>;
2929
pub type MaterializedViewMetadataBuilder = GeneralViewMetadataBuilder<FullIdentifier>;
3030

3131
impl MaterializedViewMetadata {
32-
pub fn as_ref(&self) -> TabularMetadataRef {
32+
pub fn as_ref(&self) -> TabularMetadataRef<'_> {
3333
TabularMetadataRef::MaterializedView(self)
3434
}
3535
}

iceberg-rust-spec/src/spec/partition.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ impl PartitionField {
169169

170170
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder, Getters)]
171171
#[serde(rename_all = "kebab-case")]
172-
#[builder(setter(prefix = "with"))]
172+
#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
173173
/// Partition spec that defines how to produce a tuple of partition values from a record.
174174
pub struct PartitionSpec {
175175
/// Identifier for PartitionSpec

iceberg-rust-spec/src/spec/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use _serde::SnapshotEnum;
2929

3030
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Builder, Getters)]
3131
#[serde(from = "SnapshotEnum", into = "SnapshotEnum")]
32-
#[builder(setter(prefix = "with"))]
32+
#[builder(build_fn(error = "Error"), setter(prefix = "with"))]
3333
/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
3434
pub struct Snapshot {
3535
/// A unique long ID

iceberg-rust-spec/src/spec/table_metadata.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ impl TableMetadata {
204204
pub fn current_partition_fields(
205205
&self,
206206
branch: Option<&str>,
207-
) -> Result<Vec<BoundPartitionField>, Error> {
207+
) -> Result<Vec<BoundPartitionField<'_>>, Error> {
208208
let schema = self.current_schema(branch)?;
209209
let partition_spec = self.default_partition_spec()?;
210210
partition_fields(partition_spec, schema)
@@ -218,7 +218,10 @@ impl TableMetadata {
218218
/// # Returns
219219
/// * `Result<Vec<BoundPartitionField>, Error>` - Vector of partition fields bound to their source schema fields,
220220
/// or an error if the schema or partition spec cannot be found
221-
pub fn partition_fields(&self, snapshot_id: i64) -> Result<Vec<BoundPartitionField>, Error> {
221+
pub fn partition_fields(
222+
&self,
223+
snapshot_id: i64,
224+
) -> Result<Vec<BoundPartitionField<'_>>, Error> {
222225
let schema = self.schema(snapshot_id)?;
223226
self.default_partition_spec()?
224227
.fields()
@@ -325,7 +328,7 @@ impl TableMetadata {
325328
.map(|x| *x.sequence_number())
326329
}
327330

328-
pub fn as_ref(&self) -> TabularMetadataRef {
331+
pub fn as_ref(&self) -> TabularMetadataRef<'_> {
329332
TabularMetadataRef::Table(self)
330333
}
331334
}

iceberg-rust-spec/src/spec/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ impl fmt::Display for PrimitiveType {
196196
/// DataType for a specific struct
197197
#[derive(Debug, Serialize, PartialEq, Eq, Clone, Builder)]
198198
#[serde(rename = "struct", tag = "type")]
199+
#[builder(build_fn(error = "Error"))]
199200
pub struct StructType {
200201
/// Struct fields
201202
#[builder(setter(each(name = "with_struct_field")))]

0 commit comments

Comments
 (0)