Skip to content

Commit 85112a1

Browse files
authored
Merge pull request JanKaul#228 from JanKaul/fix-equalitly-deletes
use separate manifest files for delete files
2 parents c742b5f + 0ad6be9 commit 85112a1

File tree

10 files changed

+444
-201
lines changed

10 files changed

+444
-201
lines changed

.github/workflows/rust.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
android: true
2121
dotnet: true
2222
haskell: true
23-
large-packages: false
23+
large-packages: true
2424
docker-images: false
2525
swap-storage: false
2626
- uses: actions/checkout@v3

Cargo.lock

Lines changed: 67 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ test-iceberg-rust:
77
cargo test -p iceberg-rust --lib
88

99
test-datafusion_iceberg:
10-
cargo test -p datafusion_iceberg --tests -j 2
10+
cargo test -p datafusion_iceberg --tests -j 2 && cargo clean -p datafusion_iceberg
1111

1212
test-rest-catalog:
13-
cargo test -p iceberg-rest-catalog --lib
13+
cargo test -p iceberg-rest-catalog --lib && cargo clean -p iceberg-rest-catalog
1414

1515
test-file-catalog:
16-
cargo test -p iceberg-file-catalog --lib
16+
cargo test -p iceberg-file-catalog --lib && cargo clean -p iceberg-file-catalog
1717

1818
test-sql-catalog:
19-
cargo test -p iceberg-sql-catalog --lib
19+
cargo test -p iceberg-sql-catalog --lib && cargo clean -p iceberg-sql-catalog
2020
clippy:
2121
cargo clippy --all-targets --all-features -- -D warnings
2222
fmt:

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ url = { workspace = true }
2929
uuid = { workspace = true }
3030

3131
[dev-dependencies]
32+
duckdb = { version = "1.3.2", features = ["bundled"] }
3233
iceberg-rest-catalog = { path = "../catalogs/iceberg-rest-catalog" }
3334
iceberg-sql-catalog = { path = "../catalogs/iceberg-sql-catalog" }
3435
reqwest = "0.12"

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
use datafusion::{arrow::error::ArrowError, assert_batches_eq, prelude::SessionContext};
1+
use datafusion::{
2+
arrow::{error::ArrowError, record_batch::RecordBatch},
3+
assert_batches_eq,
4+
prelude::SessionContext,
5+
};
26
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
7+
use duckdb::Connection;
38
use futures::stream;
49
use iceberg_rust::catalog::identifier::Identifier;
510
use iceberg_rust::catalog::tabular::Tabular;
@@ -16,11 +21,15 @@ use iceberg_rust::{
1621
table::Table,
1722
};
1823
use iceberg_sql_catalog::SqlCatalog;
24+
use object_store::local::LocalFileSystem;
1925
use std::sync::Arc;
26+
use tempfile::TempDir;
2027

2128
#[tokio::test]
2229
pub async fn test_equality_delete() {
23-
let object_store = ObjectStoreBuilder::memory();
30+
let temp_dir = TempDir::new().unwrap();
31+
let table_dir = format!("{}/test/orders", temp_dir.path().to_str().unwrap());
32+
let object_store = ObjectStoreBuilder::Filesystem(Arc::new(LocalFileSystem::new()));
2433

2534
let catalog: Arc<dyn Catalog> = Arc::new(
2635
SqlCatalog::new("sqlite://", "warehouse", object_store.clone())
@@ -79,7 +88,7 @@ pub async fn test_equality_delete() {
7988

8089
let table = Table::builder()
8190
.with_name("orders")
82-
.with_location("/test/orders")
91+
.with_location(&table_dir)
8392
.with_schema(schema)
8493
.with_partition_spec(partition_spec)
8594
.build(&["test".to_owned()], catalog.clone())
@@ -194,4 +203,28 @@ pub async fn test_equality_delete() {
194203
"+----+-------------+------------+------------+--------+",
195204
];
196205
assert_batches_eq!(expected, &batches);
206+
207+
let latest_version = table
208+
.object_store()
209+
.get(&object_store::path::Path::from(format!(
210+
"{table_dir}/metadata/version-hint.text"
211+
)))
212+
.await
213+
.unwrap()
214+
.bytes()
215+
.await
216+
.unwrap();
217+
let latest_version_str = std::str::from_utf8(&latest_version).unwrap();
218+
219+
let conn = Connection::open_in_memory().unwrap();
220+
conn.execute("install iceberg", []).unwrap();
221+
conn.execute("load iceberg", []).unwrap();
222+
223+
let duckdb_batches: Vec<RecordBatch> = conn
224+
.prepare("select * from iceberg_scan(?) order by id")
225+
.unwrap()
226+
.query_arrow([latest_version_str])
227+
.unwrap()
228+
.collect();
229+
assert_batches_eq!(expected, &duckdb_batches);
197230
}

iceberg-rust-spec/src/spec/manifest_list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub struct FieldSummary {
9494
pub upper_bound: Option<Value>,
9595
}
9696

97-
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone)]
97+
#[derive(Debug, Serialize_repr, Deserialize_repr, PartialEq, Eq, Clone, Copy)]
9898
#[repr(u8)]
9999
/// Type of content stored by the data file.
100100
pub enum Content {

iceberg-rust/src/table/manifest.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
182182
snapshot_id: i64,
183183
schema: &'schema AvroSchema,
184184
table_metadata: &'metadata TableMetadata,
185+
content: manifest_list::Content,
185186
branch: Option<&str>,
186187
) -> Result<Self, Error> {
187188
let mut writer = AvroWriter::new(schema, Vec::new());
@@ -229,14 +230,20 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
229230
serde_json::to_string(&spec_id)?,
230231
)?;
231232

232-
writer.add_user_metadata("content".to_string(), "data")?;
233+
writer.add_user_metadata(
234+
"content".to_string(),
235+
match content {
236+
manifest_list::Content::Data => "data",
237+
manifest_list::Content::Deletes => "deletes",
238+
},
239+
)?;
233240

234241
let manifest = ManifestListEntry {
235242
format_version: table_metadata.format_version,
236243
manifest_path: manifest_location.to_owned(),
237244
manifest_length: 0,
238245
partition_spec_id: table_metadata.default_spec_id,
239-
content: manifest_list::Content::Data,
246+
content,
240247
sequence_number: table_metadata.last_sequence_number + 1,
241248
min_sequence_number: table_metadata.last_sequence_number + 1,
242249
added_snapshot_id: snapshot_id,
@@ -331,7 +338,13 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
331338
serde_json::to_string(&spec_id)?,
332339
)?;
333340

334-
writer.add_user_metadata("content".to_string(), "data")?;
341+
writer.add_user_metadata(
342+
"content".to_string(),
343+
match manifest.content {
344+
manifest_list::Content::Data => "data",
345+
manifest_list::Content::Deletes => "deletes",
346+
},
347+
)?;
335348

336349
writer.extend(
337350
manifest_reader
@@ -455,7 +468,13 @@ impl<'schema, 'metadata> ManifestWriter<'schema, 'metadata> {
455468
serde_json::to_string(&spec_id)?,
456469
)?;
457470

458-
writer.add_user_metadata("content".to_string(), "data")?;
471+
writer.add_user_metadata(
472+
"content".to_string(),
473+
match manifest.content {
474+
manifest_list::Content::Data => "data",
475+
manifest_list::Content::Deletes => "deletes",
476+
},
477+
)?;
459478

460479
writer.extend(manifest_reader.filter_map(|entry| {
461480
let mut entry = entry

0 commit comments

Comments
 (0)