Skip to content

Commit f451462

Browse files
authored
Merge pull request JanKaul#263 from pierre-marijon/add_transforms
Add some bucket transformation
2 parents 40c2156 + aac828c commit f451462

File tree

5 files changed

+193
-3
lines changed

5 files changed

+193
-3
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ itertools = "0.14.0"
3838
lazy_static = "1.5.0"
3939
lru = "0.16.0"
4040
object_store = { version = "0.12", features = ["aws", "gcp"] }
41+
murmur3 = { version = "0.5.2" }
4142
parquet = { version = "56", features = ["async", "object_store"] }
4243
pin-project-lite = "0.2"
4344
regex = "1.11.1"

iceberg-rust-spec/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ derive-getters = { workspace = true }
1616
derive_builder = { workspace = true }
1717
getrandom = { workspace = true }
1818
itertools = { workspace = true }
19-
murmur3 = "0.5.2"
19+
murmur3 = { workspace = true }
2020
ordered-float = { version = "5.0.0", features = ["serde"] }
2121
rust_decimal = "1.36.0"
2222
serde = { workspace = true }

iceberg-rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ iceberg-rust-spec = { path = "../iceberg-rust-spec", version = "0.8.0" }
2323
itertools = { workspace = true }
2424
lazy_static = { workspace = true }
2525
lru = { workspace = true }
26+
murmur3 = { workspace = true }
2627
object_store = { workspace = true }
2728
parquet = { workspace = true }
2829
pin-project-lite = { workspace = true }
@@ -44,4 +45,3 @@ uuid = { workspace = true }
4445
rstest = "0.23.0"
4546
chrono = { workspace = true }
4647
iceberg-sql-catalog = { path = "../catalogs/iceberg-sql-catalog" }
47-

iceberg-rust/src/arrow/transform.rs

Lines changed: 189 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
use std::sync::Arc;
1515

1616
use arrow::{
17-
array::{as_primitive_array, Array, ArrayRef, PrimitiveArray},
17+
array::{as_primitive_array, downcast_array, Array, ArrayRef, PrimitiveArray, StringArray},
18+
buffer::ScalarBuffer,
1819
compute::{binary, cast, date_part, unary, DatePart},
1920
datatypes::{
2021
DataType, Date32Type, Int16Type, Int32Type, Int64Type, TimeUnit, TimestampMicrosecondType,
@@ -43,6 +44,14 @@ static MICROS_IN_DAY: i64 = 86_400_000_000;
4344
/// * Month - Extracts month from date32 or timestamp
4445
/// * Year - Extracts year from date32 or timestamp
4546
/// * Hour - Extracts hour from timestamp
47+
/// * Int16 - Truncate value
48+
/// * Int32 - Truncate value
49+
/// * Int64 - Truncate value
50+
/// * Int32 - Use hash of value to repart it between bucket
51+
/// * Int64 - Use hash of value to repart it between bucket
52+
/// * Date32 - Use hash of value to repart it between bucket
53+
/// * Time32 - Use hash of value to repart it between bucket
54+
/// * Utf8 - Use hash of value to repart it between bucket
4655
pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRef, ArrowError> {
4756
match (array.data_type(), transform) {
4857
(_, Transform::Identity) => Ok(array),
@@ -114,6 +123,66 @@ pub fn transform_arrow(array: ArrayRef, transform: &Transform) -> Result<ArrayRe
114123
i - i.rem_euclid(*m as i64)
115124
}),
116125
)),
126+
(DataType::Int32, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
127+
unary(as_primitive_array::<Int32Type>(&array), |i| {
128+
let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
129+
(murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
130+
as i32)
131+
.rem_euclid(*m as i32)
132+
}),
133+
)),
134+
(DataType::Int64, Transform::Bucket(m)) => Ok(Arc::<PrimitiveArray<Int32Type>>::new(
135+
unary(as_primitive_array::<Int64Type>(&array), |i| {
136+
let mut buffer = std::io::Cursor::new((i).to_le_bytes());
137+
(murmur3::murmur3_32(&mut buffer, 0).expect("murmur3 hash failled for some reason")
138+
as i32)
139+
.rem_euclid(*m as i32)
140+
}),
141+
)),
142+
(DataType::Date32, Transform::Bucket(m)) => {
143+
let temp = cast(&array, &DataType::Int32)?;
144+
145+
Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
146+
as_primitive_array::<Int32Type>(&temp),
147+
|i| {
148+
let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
149+
(murmur3::murmur3_32(&mut buffer, 0)
150+
.expect("murmur3 hash failled for some reason") as i32)
151+
.rem_euclid(*m as i32)
152+
},
153+
)))
154+
}
155+
(DataType::Time32(TimeUnit::Millisecond), Transform::Bucket(m)) => {
156+
let temp = cast(&array, &DataType::Int32)?;
157+
158+
Ok(Arc::<PrimitiveArray<Int32Type>>::new(unary(
159+
as_primitive_array::<Int32Type>(&temp),
160+
|i: i32| {
161+
let mut buffer = std::io::Cursor::new((i as i64).to_le_bytes());
162+
(murmur3::murmur3_32(&mut buffer, 0)
163+
.expect("murmur3 hash failled for some reason") as i32)
164+
.rem_euclid(*m as i32)
165+
},
166+
)))
167+
}
168+
(DataType::Utf8, Transform::Bucket(m)) => {
169+
let nulls = array.nulls();
170+
let local_array: StringArray = downcast_array::<StringArray>(&array);
171+
172+
Ok(Arc::new(PrimitiveArray::<Int32Type>::new(
173+
ScalarBuffer::from_iter(local_array.iter().map(|a| {
174+
if let Some(value) = a {
175+
murmur3::murmur3_32(&mut value.as_bytes(), 0)
176+
.expect("murmur3 hash failled for some reason")
177+
as i32
178+
} else {
179+
0
180+
}
181+
.rem_euclid(*m as i32)
182+
})),
183+
nulls.cloned(),
184+
)))
185+
}
117186
_ => Err(ArrowError::ComputeError(
118187
"Failed to perform transform for datatype".to_string(),
119188
)),
@@ -322,6 +391,125 @@ mod tests {
322391
assert_eq!(&expected, &result);
323392
}
324393

394+
#[test]
395+
fn test_bucket_hash_value() {
396+
// Check value match https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
397+
398+
// 34 -> 2017239379
399+
let mut buffer = std::io::Cursor::new((34i32 as i64).to_le_bytes());
400+
assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
401+
402+
// 34 -> 2017239379
403+
let mut buffer = std::io::Cursor::new((34i64).to_le_bytes());
404+
assert_eq!(murmur3::murmur3_32(&mut buffer, 0).unwrap(), 2017239379);
405+
406+
// daysFromUnixEpoch(2017-11-16) -> 17_486 -> -653330422
407+
let mut buffer = std::io::Cursor::new((17_486i32 as i64).to_le_bytes());
408+
assert_eq!(
409+
murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
410+
-653330422
411+
);
412+
413+
// 81_068_000_000 number of micros from midnight 22:31:08
414+
let mut buffer = std::io::Cursor::new((81_068_000_000i64).to_le_bytes());
415+
assert_eq!(
416+
murmur3::murmur3_32(&mut buffer, 0).unwrap() as i32,
417+
-662762989
418+
);
419+
420+
// utf8Bytes(iceberg) -> 1210000089
421+
assert_eq!(
422+
murmur3::murmur3_32(&mut "iceberg".as_bytes(), 0).unwrap() as i32,
423+
1210000089
424+
);
425+
}
426+
427+
#[test]
428+
fn test_int32_bucket_transform() {
429+
let array = Arc::new(arrow::array::Int32Array::from(vec![
430+
Some(34), // Spec value
431+
Some(17_486), // number of day between 2017-11-16 and epoch
432+
Some(84668000), // number of micros from midnight 22:31:08
433+
Some(-2000),
434+
Some(0),
435+
None,
436+
])) as ArrayRef;
437+
let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
438+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
439+
Some(2017239379i32.rem_euclid(1000)),
440+
Some(578), // -653330422 % 1000 not match I don't know why
441+
Some(988822981i32.rem_euclid(1000)),
442+
Some(964620854i32.rem_euclid(1000)),
443+
Some(1669671676i32.rem_euclid(1000)),
444+
None,
445+
])) as ArrayRef;
446+
assert_eq!(&expected, &result);
447+
}
448+
449+
#[test]
450+
fn test_int64_bucket_transform() {
451+
let array = Arc::new(arrow::array::Int64Array::from(vec![
452+
Some(34), // Spec value
453+
Some(17_486), // number of day between 2017-11-16 and epoch
454+
Some(2000),
455+
Some(-2000),
456+
Some(0),
457+
None,
458+
])) as ArrayRef;
459+
let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
460+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
461+
Some(2017239379i32.rem_euclid(1000)),
462+
Some(578), // -653_330_422 % 1000 not match probably like to signed number
463+
Some(117), // 716_914_497 = 1000 not match probably like to signed number
464+
Some(964_620_854i32.rem_euclid(1000)),
465+
Some(1669671676i32.rem_euclid(1000)),
466+
None,
467+
])) as ArrayRef;
468+
assert_eq!(&expected, &result);
469+
}
470+
471+
#[test]
472+
fn test_date32_bucket_transform() {
473+
let array = Arc::new(arrow::array::Date32Array::from(vec![
474+
Some(17_486), // number of day between 2017-11-16
475+
None,
476+
])) as ArrayRef;
477+
let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
478+
479+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
480+
Some(578), // -653330422 % 1000 not match probably like to signed number
481+
None,
482+
])) as ArrayRef;
483+
484+
assert_eq!(&expected, &result);
485+
}
486+
487+
#[test]
488+
fn test_time32_bucket_transform() {
489+
let array = Arc::new(arrow::array::Time32MillisecondArray::from(vec![
490+
Some(81_068_000), // number of micros from midnight 22:31:08
491+
None,
492+
])) as ArrayRef;
493+
let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
494+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
495+
Some(693), // -662762989 % 1000 not match probably like to signed number
496+
None,
497+
])) as ArrayRef;
498+
assert_eq!(&expected, &result);
499+
}
500+
501+
#[test]
502+
fn test_utf8_bucket_transform() {
503+
let array =
504+
Arc::new(arrow::array::StringArray::from(vec![Some("iceberg"), None])) as ArrayRef;
505+
let result = transform_arrow(array, &Transform::Bucket(1000)).unwrap();
506+
let expected = Arc::new(arrow::array::Int32Array::from(vec![
507+
Some(1_210_000_089i32.rem_euclid(1000)),
508+
None,
509+
])) as ArrayRef;
510+
assert_eq!(&expected, &result);
511+
}
512+
325513
#[test]
326514
fn test_unsupported_transform() {
327515
let array = Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c"])) as ArrayRef;

0 commit comments

Comments
 (0)