Skip to content

Commit 9d8829a

Browse files
committed
Merge remote-tracking branch 'upstream/main'
2 parents d1ec4c9 + 11182f0 commit 9d8829a

File tree

2 files changed

+60
-8
lines changed

2 files changed

+60
-8
lines changed

iceberg-rust/src/catalog/commit.rs

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
//! All changes are made atomically - either all updates succeed or none are applied.
1010
//! Requirements are checked first to ensure concurrent modifications don't corrupt state.
1111
12-
use std::collections::HashMap;
13-
1412
use iceberg_rust_spec::{
1513
spec::{
1614
partition::PartitionSpec,
@@ -24,6 +22,8 @@ use iceberg_rust_spec::{
2422
view_metadata::Materialization,
2523
};
2624
use serde_derive::{Deserialize, Serialize};
25+
use std::collections::HashMap;
26+
use std::time::{SystemTime, UNIX_EPOCH};
2727
use uuid::Uuid;
2828

2929
use crate::error::Error;
@@ -417,6 +417,9 @@ pub fn apply_table_updates(
417417
metadata: &mut TableMetadata,
418418
updates: Vec<TableUpdate>,
419419
) -> Result<(), Error> {
420+
let mut added_schema_id = None;
421+
let mut added_spec_id = None;
422+
let mut added_sort_order_id = None;
420423
for update in updates {
421424
match update {
422425
TableUpdate::UpgradeFormatVersion { format_version } => {
@@ -431,25 +434,63 @@ pub fn apply_table_updates(
431434
schema,
432435
last_column_id,
433436
} => {
434-
metadata.schemas.insert(*schema.schema_id(), schema);
437+
let schema_id = *schema.schema_id();
438+
metadata.schemas.insert(schema_id, schema);
439+
added_schema_id = Some(schema_id);
435440
if let Some(last_column_id) = last_column_id {
436441
metadata.last_column_id = last_column_id;
437442
}
438443
}
439444
TableUpdate::SetCurrentSchema { schema_id } => {
440-
metadata.current_schema_id = schema_id;
445+
if schema_id == -1 {
446+
if let Some(added_schema_id) = added_schema_id {
447+
metadata.current_schema_id = added_schema_id;
448+
} else {
449+
return Err(Error::InvalidFormat(
450+
"Cannot set current schema to -1 without adding a schema first"
451+
.to_string(),
452+
));
453+
}
454+
} else {
455+
metadata.current_schema_id = schema_id;
456+
}
441457
}
442458
TableUpdate::AddSpec { spec } => {
443-
metadata.partition_specs.insert(*spec.spec_id(), spec);
459+
let spec_id = *spec.spec_id();
460+
metadata.partition_specs.insert(spec_id, spec);
461+
added_spec_id = Some(spec_id);
444462
}
445463
TableUpdate::SetDefaultSpec { spec_id } => {
446-
metadata.default_spec_id = spec_id;
464+
if spec_id == -1 {
465+
if let Some(added_spec_id) = added_spec_id {
466+
metadata.default_spec_id = added_spec_id;
467+
} else {
468+
return Err(Error::InvalidFormat(
469+
"Cannot set default spec to -1 without adding a spec first".to_string(),
470+
));
471+
}
472+
} else {
473+
metadata.default_spec_id = spec_id;
474+
}
447475
}
448476
TableUpdate::AddSortOrder { sort_order } => {
449-
metadata.sort_orders.insert(sort_order.order_id, sort_order);
477+
let sort_order_id = sort_order.order_id;
478+
metadata.sort_orders.insert(sort_order_id, sort_order);
479+
added_sort_order_id = Some(sort_order_id);
450480
}
451481
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
452-
metadata.default_sort_order_id = sort_order_id;
482+
if sort_order_id == -1 {
483+
if let Some(added_sort_order_id) = added_sort_order_id {
484+
metadata.default_sort_order_id = added_sort_order_id;
485+
} else {
486+
return Err(Error::InvalidFormat(
487+
"Cannot set default sort order to -1 without adding a sort order first"
488+
.to_string(),
489+
));
490+
}
491+
} else {
492+
metadata.default_sort_order_id = sort_order_id;
493+
}
453494
}
454495
TableUpdate::AddSnapshot { snapshot } => {
455496
metadata.snapshot_log.push(SnapshotLog {
@@ -489,6 +530,12 @@ pub fn apply_table_updates(
489530
}
490531
};
491532
}
533+
534+
// Lastly make sure `last-updated-ms` field is up-to-date
535+
metadata.last_updated_ms = SystemTime::now()
536+
.duration_since(UNIX_EPOCH)
537+
.unwrap()
538+
.as_millis() as i64;
492539
Ok(())
493540
}
494541

iceberg-rust/tests/overwrite_test.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ async fn test_table_transaction_overwrite() {
8383
.await
8484
.expect("Failed to create table");
8585

86+
let mut previous_last_updated_ms = table.metadata().last_updated_ms;
87+
8688
// 4. Create initial Arrow RecordBatch and write to parquet
8789
let initial_batch = create_initial_record_batch();
8890
let initial_stream = stream::iter(vec![Ok(initial_batch.clone())]);
@@ -104,6 +106,8 @@ async fn test_table_transaction_overwrite() {
104106
!table.metadata().snapshots.is_empty(),
105107
"Table should have at least one snapshot after append"
106108
);
109+
assert!(table.metadata().last_updated_ms > previous_last_updated_ms);
110+
previous_last_updated_ms = table.metadata().last_updated_ms;
107111

108112
// 6. Create overwrite RecordBatch with additional rows
109113
let overwrite_batch = create_overwrite_record_batch();
@@ -134,6 +138,7 @@ async fn test_table_transaction_overwrite() {
134138
final_snapshots.len() >= 2,
135139
"Table should have at least 2 snapshots after overwrite"
136140
);
141+
assert!(table.metadata().last_updated_ms > previous_last_updated_ms);
137142

138143
// Get the current snapshot (should be the overwrite snapshot)
139144
let current_snapshot = table

0 commit comments

Comments
 (0)