Skip to content

Commit 447bf3e

Browse files
Adds Duration(TimeUnit) support to arrow-avro reader and writer (#8433)
# Which issue does this PR close? - Part of #4886 # Rationale for this change This change adds support for Arrow DataType::Duration to the arrow-avro crate. Previous versions of this code return a NotYetImplemented error encouraging users to manually cast to Interval(MonthDayNano). With this change, Duration will now be represented as a long, with a logicalType value corresponding to the TimeUnit of the Arrow duration (`arrow.duration-nanos`, `arrow.duration-micros`, `arrow.duration-millis`, `arrow.duration-seconds`). This retains the Arrow paradigm for Duration of clock time, rather than the Avro approach of [calendar time](https://avro.apache.org/docs/1.11.1/specification/#duration). Because of this disconnect between the two specs, any attempt to map the value (e.g. seconds into days) would be lossy, as days and months can be of varying length. # What changes are included in this PR? Expands the arrow-avro crate to handle Arrow `Duration` types with various `TimeUnit` variants (Second, Millisecond, Microsecond, Nanosecond). Includes: - Additions in encoder/decoder to support Duration types. - Updates to schema handling for duration-specific metadata. - Comprehensive unit tests for reading and writing `Duration` types. # Are these changes tested? Yes, existing tests are all passing, and tests have been added to validate the encoding and decoding of Duration. A round trip test has been added to `writer/mod.rs`. # Are there any user-facing changes? - Crate is not yet public
1 parent fedef66 commit 447bf3e

File tree

10 files changed

+548
-16
lines changed

10 files changed

+548
-16
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ canonical_extension_types = ["arrow-schema/canonical_extension_types"]
4343
md5 = ["dep:md5"]
4444
sha256 = ["dep:sha2"]
4545
small_decimals = []
46+
avro_custom_types = []
4647

4748
[dependencies]
4849
arrow-schema = { workspace = true }

arrow-avro/src/codec.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,11 @@ impl AvroDataType {
341341
| Codec::TimeMicros
342342
| Codec::TimestampMillis(_)
343343
| Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
344+
#[cfg(feature = "avro_custom_types")]
345+
Codec::DurationNanos
346+
| Codec::DurationMicros
347+
| Codec::DurationMillis
348+
| Codec::DurationSeconds => AvroLiteral::Long(parse_json_i64(default_json, "long")?),
344349
Codec::Float32 => {
345350
let f = parse_json_f64(default_json, "float")?;
346351
if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 {
@@ -685,6 +690,18 @@ pub enum Codec {
685690
Interval,
686691
/// Represents Avro union type, maps to Arrow's Union data type
687692
Union(Arc<[AvroDataType]>, UnionFields, UnionMode),
693+
/// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Nanosecond)
694+
#[cfg(feature = "avro_custom_types")]
695+
DurationNanos,
696+
/// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Microsecond)
697+
#[cfg(feature = "avro_custom_types")]
698+
DurationMicros,
699+
/// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Millisecond)
700+
#[cfg(feature = "avro_custom_types")]
701+
DurationMillis,
702+
/// Represents Avro custom logical type to map to Arrow Duration(TimeUnit::Second)
703+
#[cfg(feature = "avro_custom_types")]
704+
DurationSeconds,
688705
}
689706

690707
impl Codec {
@@ -759,6 +776,14 @@ impl Codec {
759776
)
760777
}
761778
Self::Union(_, fields, mode) => DataType::Union(fields.clone(), *mode),
779+
#[cfg(feature = "avro_custom_types")]
780+
Self::DurationNanos => DataType::Duration(TimeUnit::Nanosecond),
781+
#[cfg(feature = "avro_custom_types")]
782+
Self::DurationMicros => DataType::Duration(TimeUnit::Microsecond),
783+
#[cfg(feature = "avro_custom_types")]
784+
Self::DurationMillis => DataType::Duration(TimeUnit::Millisecond),
785+
#[cfg(feature = "avro_custom_types")]
786+
Self::DurationSeconds => DataType::Duration(TimeUnit::Second),
762787
}
763788
}
764789

@@ -944,6 +969,11 @@ impl From<&Codec> for UnionFieldKind {
944969
Codec::Map(_) => Self::Map,
945970
Codec::Uuid => Self::Uuid,
946971
Codec::Union(..) => Self::Union,
972+
#[cfg(feature = "avro_custom_types")]
973+
Codec::DurationNanos
974+
| Codec::DurationMicros
975+
| Codec::DurationMillis
976+
| Codec::DurationSeconds => Self::Duration,
947977
}
948978
}
949979
}
@@ -1317,6 +1347,16 @@ impl<'a> Maker<'a> {
13171347
*c = Codec::TimestampMicros(false)
13181348
}
13191349
(Some("uuid"), c @ Codec::Utf8) => *c = Codec::Uuid,
1350+
#[cfg(feature = "avro_custom_types")]
1351+
(Some("arrow.duration-nanos"), c @ Codec::Int64) => *c = Codec::DurationNanos,
1352+
#[cfg(feature = "avro_custom_types")]
1353+
(Some("arrow.duration-micros"), c @ Codec::Int64) => *c = Codec::DurationMicros,
1354+
#[cfg(feature = "avro_custom_types")]
1355+
(Some("arrow.duration-millis"), c @ Codec::Int64) => *c = Codec::DurationMillis,
1356+
#[cfg(feature = "avro_custom_types")]
1357+
(Some("arrow.duration-seconds"), c @ Codec::Int64) => {
1358+
*c = Codec::DurationSeconds
1359+
}
13201360
(Some(logical), _) => {
13211361
// Insert unrecognized logical type into metadata map
13221362
field.metadata.insert("logicalType".into(), logical.into());

arrow-avro/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@
208208
//! - [`compression`]: codecs used for OCF blocks (i.e., Deflate, Snappy, Zstandard).
209209
//! - [`codec`]: internal Avro↔Arrow type conversion and row decode/encode plans.
210210
//!
211+
//! ### Features
212+
//!
213+
//! - `md5`: enables dependency `md5` for md5 fingerprint hashing
214+
//! - `sha256`: enables dependency `sha2` for sha256 fingerprint hashing
215+
//! - `small_decimals`: enables support for small decimal types
216+
//! - `avro_custom_types`: Enables custom logic that interprets an annotated Avro long with logicalType values of `arrow.duration-nanos`, `arrow.duration-micros`, `arrow.duration-millis`, or `arrow.duration-seconds` as a more descriptive Arrow Duration(TimeUnit) type.
217+
//!
211218
//! [Apache Arrow]: https://arrow.apache.org/
212219
//! [Apache Avro]: https://avro.apache.org/
213220

arrow-avro/src/reader/mod.rs

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1303,13 +1303,18 @@ mod test {
13031303
ListBuilder, MapBuilder, StringBuilder, StructBuilder,
13041304
};
13051305
use arrow_array::cast::AsArray;
1306+
use arrow_array::types::{
1307+
DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType,
1308+
DurationSecondType, Int64Type,
1309+
};
13061310
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
13071311
use arrow_array::*;
13081312
use arrow_buffer::{
13091313
i256, Buffer, IntervalMonthDayNano, NullBuffer, OffsetBuffer, ScalarBuffer,
13101314
};
13111315
use arrow_schema::{
1312-
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema, UnionFields, UnionMode,
1316+
ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, Schema, TimeUnit, UnionFields,
1317+
UnionMode,
13131318
};
13141319
use bytes::{Buf, BufMut, Bytes};
13151320
use futures::executor::block_on;
@@ -4747,6 +4752,108 @@ mod test {
47474752
}
47484753
}
47494754

4755+
#[test]
4756+
fn test_read_duration_logical_types_feature_toggle() -> Result<(), ArrowError> {
4757+
let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
4758+
.join("test/data/duration_logical_types.avro")
4759+
.to_string_lossy()
4760+
.into_owned();
4761+
4762+
let actual_batch = read_file(&file_path, 4, false);
4763+
4764+
let expected_batch = {
4765+
#[cfg(feature = "avro_custom_types")]
4766+
{
4767+
println!("Testing with 'avro_custom_types' feature ENABLED");
4768+
let schema = Arc::new(Schema::new(vec![
4769+
Field::new(
4770+
"duration_time_nanos",
4771+
DataType::Duration(TimeUnit::Nanosecond),
4772+
false,
4773+
),
4774+
Field::new(
4775+
"duration_time_micros",
4776+
DataType::Duration(TimeUnit::Microsecond),
4777+
false,
4778+
),
4779+
Field::new(
4780+
"duration_time_millis",
4781+
DataType::Duration(TimeUnit::Millisecond),
4782+
false,
4783+
),
4784+
Field::new(
4785+
"duration_time_seconds",
4786+
DataType::Duration(TimeUnit::Second),
4787+
false,
4788+
),
4789+
]));
4790+
4791+
let nanos = Arc::new(PrimitiveArray::<DurationNanosecondType>::from(vec![
4792+
10, 20, 30, 40,
4793+
])) as ArrayRef;
4794+
let micros = Arc::new(PrimitiveArray::<DurationMicrosecondType>::from(vec![
4795+
100, 200, 300, 400,
4796+
])) as ArrayRef;
4797+
let millis = Arc::new(PrimitiveArray::<DurationMillisecondType>::from(vec![
4798+
1000, 2000, 3000, 4000,
4799+
])) as ArrayRef;
4800+
let seconds = Arc::new(PrimitiveArray::<DurationSecondType>::from(vec![1, 2, 3, 4]))
4801+
as ArrayRef;
4802+
4803+
RecordBatch::try_new(schema, vec![nanos, micros, millis, seconds])?
4804+
}
4805+
#[cfg(not(feature = "avro_custom_types"))]
4806+
{
4807+
let schema = Arc::new(Schema::new(vec![
4808+
Field::new("duration_time_nanos", DataType::Int64, false).with_metadata(
4809+
[(
4810+
"logicalType".to_string(),
4811+
"arrow.duration-nanos".to_string(),
4812+
)]
4813+
.into(),
4814+
),
4815+
Field::new("duration_time_micros", DataType::Int64, false).with_metadata(
4816+
[(
4817+
"logicalType".to_string(),
4818+
"arrow.duration-micros".to_string(),
4819+
)]
4820+
.into(),
4821+
),
4822+
Field::new("duration_time_millis", DataType::Int64, false).with_metadata(
4823+
[(
4824+
"logicalType".to_string(),
4825+
"arrow.duration-millis".to_string(),
4826+
)]
4827+
.into(),
4828+
),
4829+
Field::new("duration_time_seconds", DataType::Int64, false).with_metadata(
4830+
[(
4831+
"logicalType".to_string(),
4832+
"arrow.duration-seconds".to_string(),
4833+
)]
4834+
.into(),
4835+
),
4836+
]));
4837+
4838+
let nanos =
4839+
Arc::new(PrimitiveArray::<Int64Type>::from(vec![10, 20, 30, 40])) as ArrayRef;
4840+
let micros = Arc::new(PrimitiveArray::<Int64Type>::from(vec![100, 200, 300, 400]))
4841+
as ArrayRef;
4842+
let millis = Arc::new(PrimitiveArray::<Int64Type>::from(vec![
4843+
1000, 2000, 3000, 4000,
4844+
])) as ArrayRef;
4845+
let seconds =
4846+
Arc::new(PrimitiveArray::<Int64Type>::from(vec![1, 2, 3, 4])) as ArrayRef;
4847+
4848+
RecordBatch::try_new(schema, vec![nanos, micros, millis, seconds])?
4849+
}
4850+
};
4851+
4852+
assert_eq!(actual_batch, expected_batch);
4853+
4854+
Ok(())
4855+
}
4856+
47504857
#[test]
47514858
fn test_dict_pages_offset_zero() {
47524859
let file = arrow_test_data("avro/dict-page-offset-zero.avro");

0 commit comments

Comments
 (0)