Skip to content

Commit be1884e

Browse files
committed
Fix TIME processing
Instead of assuming we want `Time64Microsecond` when converting from NaiveType, convert based on the interenced type. Update the `get_time*_value` functions to return `NaiveTime` instead of `NaiveDateTime`. Previously they were returning None, because that's what `as_datetime` always returns for Time32 & Time64 values.
1 parent 3590522 commit be1884e

File tree

2 files changed

+42
-13
lines changed

2 files changed

+42
-13
lines changed

arrow-pg/src/datatypes/df.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::iter;
22
use std::sync::Arc;
33

44
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
5-
use datafusion::arrow::datatypes::{DataType, Date32Type};
5+
use datafusion::arrow::datatypes::{DataType, Date32Type, TimeUnit};
66
use datafusion::arrow::record_batch::RecordBatch;
77
use datafusion::common::ParamValues;
88
use datafusion::prelude::*;
@@ -79,9 +79,10 @@ where
7979
let param_len = portal.parameter_len();
8080
let mut deserialized_params = Vec::with_capacity(param_len);
8181
for i in 0..param_len {
82+
let inferenced_type = inferenced_types.get(i).and_then(|v| v.to_owned());
8283
let pg_type = get_pg_type(
8384
portal.statement.parameter_types.get(i),
84-
inferenced_types.get(i).and_then(|v| v.to_owned()),
85+
inferenced_type.clone(),
8586
)?;
8687
match pg_type {
8788
// enumerate all supported parameter types and deserialize the
@@ -158,9 +159,36 @@ where
158159
}
159160
Type::TIME => {
160161
let value = portal.parameter::<NaiveTime>(i, &pg_type)?;
161-
deserialized_params.push(ScalarValue::Time64Microsecond(value.map(|t| {
162-
t.num_seconds_from_midnight() as i64 * 1_000_000 + t.nanosecond() as i64 / 1_000
163-
})));
162+
163+
let ns = value.map(|t| {
164+
t.num_seconds_from_midnight() as i64 * 1_000_000_000 + t.nanosecond() as i64
165+
});
166+
167+
let scalar_value = match inferenced_type {
168+
Some(DataType::Time64(TimeUnit::Nanosecond)) => {
169+
ScalarValue::Time64Nanosecond(ns)
170+
}
171+
Some(DataType::Time64(TimeUnit::Microsecond)) => {
172+
ScalarValue::Time64Microsecond(ns.map(|ns| (ns / 1_000) as _))
173+
}
174+
Some(DataType::Time32(TimeUnit::Millisecond)) => {
175+
ScalarValue::Time32Second(ns.map(|ns| (ns / 1_000_000_000) as _))
176+
}
177+
Some(DataType::Time32(TimeUnit::Second)) => {
178+
ScalarValue::Time32Second(ns.map(|ns| (ns / 1_000_000_000) as _))
179+
}
180+
_ => {
181+
return Err(PgWireError::ApiError(
182+
format!(
183+
"Unable to deserialise time parameter type {:?} to type {:?}",
184+
value, inferenced_type
185+
)
186+
.into(),
187+
))
188+
}
189+
};
190+
191+
deserialized_params.push(scalar_value);
164192
}
165193
Type::UUID => {
166194
let value = portal.parameter::<String>(i, &pg_type)?;

arrow-pg/src/encoder.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use arrow::{array::*, datatypes::*};
88
use bytes::BufMut;
99
use bytes::BytesMut;
10+
use chrono::NaiveTime;
1011
use chrono::{NaiveDate, NaiveDateTime};
1112
#[cfg(feature = "datafusion")]
1213
use datafusion::arrow::{array::*, datatypes::*};
@@ -203,43 +204,43 @@ fn get_date64_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDate> {
203204
.value_as_date(idx)
204205
}
205206

206-
fn get_time32_second_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDateTime> {
207+
fn get_time32_second_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
207208
if arr.is_null(idx) {
208209
return None;
209210
}
210211
arr.as_any()
211212
.downcast_ref::<Time32SecondArray>()
212213
.unwrap()
213-
.value_as_datetime(idx)
214+
.value_as_time(idx)
214215
}
215216

216-
fn get_time32_millisecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDateTime> {
217+
fn get_time32_millisecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
217218
if arr.is_null(idx) {
218219
return None;
219220
}
220221
arr.as_any()
221222
.downcast_ref::<Time32MillisecondArray>()
222223
.unwrap()
223-
.value_as_datetime(idx)
224+
.value_as_time(idx)
224225
}
225226

226-
fn get_time64_microsecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDateTime> {
227+
fn get_time64_microsecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
227228
if arr.is_null(idx) {
228229
return None;
229230
}
230231
arr.as_any()
231232
.downcast_ref::<Time64MicrosecondArray>()
232233
.unwrap()
233-
.value_as_datetime(idx)
234+
.value_as_time(idx)
234235
}
235-
fn get_time64_nanosecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveDateTime> {
236+
fn get_time64_nanosecond_value(arr: &Arc<dyn Array>, idx: usize) -> Option<NaiveTime> {
236237
if arr.is_null(idx) {
237238
return None;
238239
}
239240
arr.as_any()
240241
.downcast_ref::<Time64NanosecondArray>()
241242
.unwrap()
242-
.value_as_datetime(idx)
243+
.value_as_time(idx)
243244
}
244245

245246
fn get_numeric_128_value(

0 commit comments

Comments
 (0)