Skip to content

Commit eb959c7

Browse files
authored
Merge branch 'JanKaul:main' into main
2 parents 66e3c71 + 8993570 commit eb959c7

File tree

8 files changed

+112
-24
lines changed

8 files changed

+112
-24
lines changed

Cargo.lock

Lines changed: 45 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ derive_builder = "0.20"
3535
futures = "0.3.31"
3636
getrandom = { version = "0.3.1", features = ["std"] }
3737
itertools = "0.14.0"
38+
lazy_static = "1.5.0"
3839
lru = "0.16.0"
3940
object_store = { version = "0.12", features = ["aws", "gcp"] }
4041
parquet = { version = "55", features = ["async", "object_store"] }
4142
pin-project-lite = "0.2"
43+
regex = "1.11.1"
4244
serde = "^1.0"
4345
serde_derive = "^1.0"
4446
serde_json = "^1.0"

catalogs/iceberg-file-catalog/src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -885,10 +885,7 @@ pub mod tests {
885885
.await
886886
.unwrap();
887887

888-
assert_eq!(
889-
std::str::from_utf8(&version_hint).unwrap(),
890-
"s3://warehouse/tpch/lineitem/metadata/v1.metadata.json"
891-
);
888+
assert_eq!(std::str::from_utf8(&version_hint).unwrap(), "1");
892889
}
893890

894891
#[tokio::test]

catalogs/iceberg-sql-catalog/src/lib.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -781,9 +781,9 @@ pub mod tests {
781781
use testcontainers_modules::{localstack::LocalStack, postgres::Postgres};
782782
use tokio::time::sleep;
783783

784-
use std::{sync::Arc, time::Duration};
785-
786784
use crate::SqlCatalog;
785+
use iceberg_rust::object_store::store::version_hint_content;
786+
use std::{sync::Arc, time::Duration};
787787

788788
#[tokio::test]
789789
async fn test_create_update_drop_table() {
@@ -1004,8 +1004,10 @@ pub mod tests {
10041004
.await
10051005
.unwrap();
10061006

1007-
assert!(std::str::from_utf8(&version_hint)
1008-
.unwrap()
1009-
.ends_with(".metadata.json"));
1007+
let cache = iceberg_catalog.cache.read().unwrap();
1008+
let keys = cache.values().collect::<Vec<_>>();
1009+
let version = version_hint_content(&keys[0].clone().0);
1010+
1011+
assert_eq!(std::str::from_utf8(&version_hint).unwrap(), version);
10101012
}
10111013
}

datafusion_iceberg/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ iceberg-rust = { path = "../iceberg-rust", version = "0.8.0" }
2121
itertools = { workspace = true }
2222
object_store = { workspace = true }
2323
pin-project-lite = "0.2.16"
24-
regex = "1.11.1"
24+
regex = { workspace = true }
2525
serde_json = { workspace = true }
2626
thiserror = { workspace = true }
2727
tokio = { version = "1.43", features = ["rt-multi-thread"] }

datafusion_iceberg/tests/equality_delete.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -204,26 +204,14 @@ pub async fn test_equality_delete() {
204204
];
205205
assert_batches_eq!(expected, &batches);
206206

207-
let latest_version = table
208-
.object_store()
209-
.get(&object_store::path::Path::from(format!(
210-
"{table_dir}/metadata/version-hint.text"
211-
)))
212-
.await
213-
.unwrap()
214-
.bytes()
215-
.await
216-
.unwrap();
217-
let latest_version_str = std::str::from_utf8(&latest_version).unwrap();
218-
219207
let conn = Connection::open_in_memory().unwrap();
220208
conn.execute("install iceberg", []).unwrap();
221209
conn.execute("load iceberg", []).unwrap();
222210

223211
let duckdb_batches: Vec<RecordBatch> = conn
224212
.prepare("select * from iceberg_scan(?) order by id")
225213
.unwrap()
226-
.query_arrow([latest_version_str])
214+
.query_arrow([table_dir])
227215
.unwrap()
228216
.collect();
229217
assert_batches_eq!(expected, &duckdb_batches);

iceberg-rust/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@ futures = { workspace = true }
2121
getrandom = { workspace = true }
2222
iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.8.0" }
2323
itertools = { workspace = true }
24+
lazy_static = { workspace = true }
2425
lru = { workspace = true }
2526
object_store = { workspace = true }
2627
parquet = { workspace = true }
2728
pin-project-lite = { workspace = true }
29+
regex = { workspace = true }
2830
serde = { workspace = true }
2931
serde_derive = { workspace = true }
3032
serde_json = { workspace = true }
@@ -39,6 +41,7 @@ uuid = { workspace = true }
3941

4042

4143
[dev-dependencies]
44+
rstest = "0.23.0"
4245
chrono = { workspace = true }
4346
iceberg-sql-catalog = { path = "../catalogs/iceberg-sql-catalog" }
4447

iceberg-rust/src/object_store/store.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use object_store::{Attributes, ObjectStore, PutOptions, TagSet};
99

1010
use crate::error::Error;
1111
use flate2::read::GzDecoder;
12+
use lazy_static::lazy_static;
13+
use regex::Regex;
1214
use std::io::Read;
1315

1416
/// Simplify interaction with iceberg files
@@ -59,7 +61,7 @@ impl<T: ObjectStore> IcebergStore for T {
5961
"Path for version-hint for {location}"
6062
)))?
6163
.into(),
62-
location.to_string().into(),
64+
version_hint_content(location).into(),
6365
PutOptions {
6466
mode: object_store::PutMode::Overwrite,
6567
tags: TagSet::default(),
@@ -83,6 +85,39 @@ fn version_hint_path(original: &str) -> Option<String> {
8385
)
8486
}
8587

88+
lazy_static! {
89+
static ref SUPPORTED_METADATA_FILE_FORMATS: Vec<Regex> = vec![
90+
// The standard metastore format https://iceberg.apache.org/spec/#metastore-tables
91+
Regex::new(
92+
r"^(?<version>[0-9]{5}-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}).(?:gz.)?metadata.json$"
93+
)
94+
.unwrap(),
95+
// The legacy file-system format https://iceberg.apache.org/spec/#file-system-tables
96+
Regex::new(r"^v(?<version>[0-9]+).metadata.json$").unwrap(),
97+
];
98+
}
99+
100+
/// Given a full path to a metadata file, extract an appropriate version hint that other readers
101+
/// without access to the catalog can parse.
102+
pub fn version_hint_content(original: &str) -> String {
103+
original
104+
.split("/")
105+
.last()
106+
.and_then(|filename| {
107+
SUPPORTED_METADATA_FILE_FORMATS
108+
.iter()
109+
.filter_map(|regex| {
110+
regex.captures(filename).and_then(|capture| {
111+
capture
112+
.name("version")
113+
.and_then(|m| m.as_str().parse().ok())
114+
})
115+
})
116+
.next()
117+
})
118+
.unwrap_or(original.to_string())
119+
}
120+
86121
fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error> {
87122
if location.ends_with(".gz.metadata.json") {
88123
let mut decoder = GzDecoder::new(bytes);
@@ -99,6 +134,7 @@ fn parse_metadata(location: &str, bytes: &[u8]) -> Result<TabularMetadata, Error
99134
#[cfg(test)]
100135
mod tests {
101136
use super::*;
137+
use rstest::rstest;
102138
use std::io::Write;
103139

104140
#[test]
@@ -142,6 +178,21 @@ mod tests {
142178
assert_eq!(version_hint_path(input), Some(expected.to_string()));
143179
}
144180

181+
#[rstest]
182+
#[case::file_format("/path/to/metadata/v2.metadata.json", "2")]
183+
#[case::metastore_format_no_gzip(
184+
"/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.metadata.json",
185+
"00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
186+
)]
187+
#[case::metastore_format_with_gzip(
188+
"/path/to/metadata/00004-3f569e94-5601-48f3-9199-8d71df4ea7b0.gz.metadata.json",
189+
"00004-3f569e94-5601-48f3-9199-8d71df4ea7b0"
190+
)]
191+
#[test]
192+
fn test_version_hint_content(#[case] input: &str, #[case] expected: &str) {
193+
assert_eq!(version_hint_content(input), expected);
194+
}
195+
145196
#[test]
146197
fn test_parse_metadata_table_plain_json() {
147198
let location = "/path/to/metadata/v1.metadata.json";

0 commit comments

Comments
 (0)