Skip to content

Commit dcaf261

Browse files
authored
Feat: support extension types (#83)
* feat: support extension types * add compare * fix * fix * fix * fix
1 parent b63b04c commit dcaf261

File tree

21 files changed

+1810
-46
lines changed

21 files changed

+1810
-46
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ rust-version = "1.80"
2727

2828
[dependencies]
2929
byteorder = "1.5.0"
30+
ethnum = "1.5.1"
3031
fast-float2 = "0.2.3"
3132
itoa = "1.0"
33+
jiff = "0.2.10"
3234
nom = "8.0.0"
3335
num-traits = "0.2.19"
3436
ordered-float = { version = "5.0", default-features = false }

src/constants.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,24 @@ pub(crate) const RR: char = '\x0D'; // \r Carriage Return
2626
pub(crate) const TT: char = '\x09'; // \t Horizontal Tab
2727

2828
// JSONB value compare level
29-
pub(crate) const NULL_LEVEL: u8 = 7;
30-
pub(crate) const ARRAY_LEVEL: u8 = 6;
31-
pub(crate) const OBJECT_LEVEL: u8 = 5;
32-
pub(crate) const STRING_LEVEL: u8 = 4;
33-
pub(crate) const NUMBER_LEVEL: u8 = 3;
34-
pub(crate) const TRUE_LEVEL: u8 = 2;
35-
pub(crate) const FALSE_LEVEL: u8 = 1;
29+
pub(crate) const NULL_LEVEL: u8 = 8;
30+
pub(crate) const ARRAY_LEVEL: u8 = 7;
31+
pub(crate) const OBJECT_LEVEL: u8 = 6;
32+
pub(crate) const STRING_LEVEL: u8 = 5;
33+
pub(crate) const NUMBER_LEVEL: u8 = 4;
34+
pub(crate) const TRUE_LEVEL: u8 = 3;
35+
pub(crate) const FALSE_LEVEL: u8 = 2;
36+
pub(crate) const EXTENSION_LEVEL: u8 = 1;
3637

3738
pub(crate) const TYPE_STRING: &str = "string";
3839
pub(crate) const TYPE_NULL: &str = "null";
3940
pub(crate) const TYPE_BOOLEAN: &str = "boolean";
4041
pub(crate) const TYPE_NUMBER: &str = "number";
4142
pub(crate) const TYPE_ARRAY: &str = "array";
4243
pub(crate) const TYPE_OBJECT: &str = "object";
44+
pub(crate) const TYPE_DECIMAL: &str = "decimal";
45+
pub(crate) const TYPE_BINARY: &str = "binary";
46+
pub(crate) const TYPE_DATE: &str = "date";
47+
pub(crate) const TYPE_TIMESTAMP: &str = "timestamp";
48+
pub(crate) const TYPE_TIMESTAMP_TZ: &str = "timestamp_tz";
49+
pub(crate) const TYPE_INTERVAL: &str = "interval";

src/core/databend/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ fn append_jsonb_item(buf: &mut Vec<u8>, jentry_index: &mut usize, item: JsonbIte
199199
replace_jentry(buf, jentry, jentry_index);
200200
buf.extend_from_slice(data);
201201
}
202+
JsonbItem::Extension(data) => {
203+
let jentry = JEntry::make_extension_jentry(data.len());
204+
replace_jentry(buf, jentry, jentry_index);
205+
buf.extend_from_slice(data);
206+
}
202207
JsonbItem::Raw(raw_jsonb) => {
203208
append_raw_jsonb_data(buf, jentry_index, raw_jsonb)?;
204209
}

src/core/databend/constants.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub(super) const NUMBER_TAG: u32 = 0x20000000;
2727
pub(super) const FALSE_TAG: u32 = 0x30000000;
2828
pub(super) const TRUE_TAG: u32 = 0x40000000;
2929
pub(super) const CONTAINER_TAG: u32 = 0x50000000;
30+
pub(super) const EXTENSION_TAG: u32 = 0x60000000;
3031

3132
// JSONB number constants
3233
pub(super) const NUMBER_ZERO: u8 = 0x00;
@@ -36,6 +37,14 @@ pub(super) const NUMBER_NEG_INF: u8 = 0x30;
3637
pub(super) const NUMBER_INT: u8 = 0x40;
3738
pub(super) const NUMBER_UINT: u8 = 0x50;
3839
pub(super) const NUMBER_FLOAT: u8 = 0x60;
40+
pub(super) const NUMBER_DECIMAL: u8 = 0x70;
41+
42+
// JSONB extension constants
43+
pub(super) const EXTENSION_BINARY: u8 = 0x00;
44+
pub(super) const EXTENSION_DATE: u8 = 0x10;
45+
pub(super) const EXTENSION_TIMESTAMP: u8 = 0x20;
46+
pub(super) const EXTENSION_TIMESTAMP_TZ: u8 = 0x30;
47+
pub(super) const EXTENSION_INTERVAL: u8 = 0x40;
3948

4049
// @todo support offset mode
4150
#[allow(dead_code)]

src/core/databend/de.rs

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use super::constants::*;
2828
use super::jentry::JEntry;
2929
use crate::error::Error;
3030
use crate::error::Result;
31+
use crate::extension::ExtensionValue;
3132
use crate::number::Number;
3233
use crate::value::Object;
3334
use crate::value::Value;
@@ -146,6 +147,14 @@ impl<'de> Deserializer<'de> {
146147
Ok(Cow::Borrowed(s))
147148
}
148149

150+
fn read_payload_extension(&mut self, length: usize) -> Result<ExtensionValue> {
151+
let start = self.index;
152+
let end = self.index + length;
153+
let val = ExtensionValue::decode(&self.raw.data[start..end])?;
154+
self.index = end;
155+
Ok(val)
156+
}
157+
149158
fn read_null(&mut self) -> Result<()> {
150159
let jentry_res = self.read_scalar_jentry();
151160
if jentry_res == Err(Error::UnexpectedType) {
@@ -154,7 +163,7 @@ impl<'de> Deserializer<'de> {
154163
let jentry = jentry_res?;
155164
match jentry.type_code {
156165
NULL_TAG => Ok(()),
157-
FALSE_TAG | TRUE_TAG | NUMBER_TAG | STRING_TAG | CONTAINER_TAG => {
166+
FALSE_TAG | TRUE_TAG | NUMBER_TAG | STRING_TAG | CONTAINER_TAG | EXTENSION_TAG => {
158167
Err(Error::UnexpectedType)
159168
}
160169
_ => Err(Error::InvalidJsonb),
@@ -170,7 +179,9 @@ impl<'de> Deserializer<'de> {
170179
match jentry.type_code {
171180
FALSE_TAG => Ok(false),
172181
TRUE_TAG => Ok(true),
173-
NULL_TAG | NUMBER_TAG | STRING_TAG | CONTAINER_TAG => Err(Error::UnexpectedType),
182+
NULL_TAG | NUMBER_TAG | STRING_TAG | CONTAINER_TAG | EXTENSION_TAG => {
183+
Err(Error::UnexpectedType)
184+
}
174185
_ => Err(Error::InvalidJsonb),
175186
}
176187
}
@@ -187,7 +198,7 @@ impl<'de> Deserializer<'de> {
187198
let num = self.read_payload_number(length)?;
188199
Ok(num)
189200
}
190-
NULL_TAG | FALSE_TAG | TRUE_TAG | STRING_TAG | CONTAINER_TAG => {
201+
NULL_TAG | FALSE_TAG | TRUE_TAG | STRING_TAG | CONTAINER_TAG | EXTENSION_TAG => {
191202
Err(Error::UnexpectedType)
192203
}
193204
_ => Err(Error::InvalidJsonb),
@@ -202,7 +213,9 @@ impl<'de> Deserializer<'de> {
202213
match num {
203214
Number::Int64(n) => T::from_i64(n).ok_or(Error::UnexpectedType),
204215
Number::UInt64(n) => T::from_u64(n).ok_or(Error::UnexpectedType),
205-
Number::Float64(_) => Err(Error::UnexpectedType),
216+
Number::Float64(_) | Number::Decimal128(_) | Number::Decimal256(_) => {
217+
Err(Error::UnexpectedType)
218+
}
206219
}
207220
}
208221

@@ -215,6 +228,14 @@ impl<'de> Deserializer<'de> {
215228
Number::Int64(n) => T::from_i64(n).ok_or(Error::UnexpectedType),
216229
Number::UInt64(n) => T::from_u64(n).ok_or(Error::UnexpectedType),
217230
Number::Float64(n) => T::from_f64(n).ok_or(Error::UnexpectedType),
231+
Number::Decimal128(v) => {
232+
let n = v.to_float64();
233+
T::from_f64(n).ok_or(Error::UnexpectedType)
234+
}
235+
Number::Decimal256(v) => {
236+
let n = v.to_float64();
237+
T::from_f64(n).ok_or(Error::UnexpectedType)
238+
}
218239
}
219240
}
220241

@@ -230,6 +251,12 @@ impl<'de> Deserializer<'de> {
230251
let s = self.read_payload_str(length)?;
231252
Ok(s)
232253
}
254+
EXTENSION_TAG => {
255+
let length = jentry.length as usize;
256+
let val = self.read_payload_extension(length)?;
257+
let s = format!("{}", val);
258+
Ok(Cow::Owned(s))
259+
}
233260
NULL_TAG | FALSE_TAG | TRUE_TAG | NUMBER_TAG | CONTAINER_TAG => {
234261
Err(Error::UnexpectedType)
235262
}
@@ -290,8 +317,22 @@ impl<'de> Deserializer<'de> {
290317
}
291318
}
292319
Number::Float64(i) => visitor.visit_f64(i),
320+
Number::Decimal128(i) => {
321+
let v = i.to_float64();
322+
visitor.visit_f64(v)
323+
}
324+
Number::Decimal256(i) => {
325+
let v = i.to_float64();
326+
visitor.visit_f64(v)
327+
}
293328
}
294329
}
330+
EXTENSION_TAG => {
331+
let length = jentry.length as usize;
332+
let val = self.read_payload_extension(length)?;
333+
let s = format!("{}", val);
334+
visitor.visit_string(s)
335+
}
295336
CONTAINER_TAG => Err(Error::UnexpectedType),
296337
_ => Err(Error::InvalidJsonb),
297338
}
@@ -462,14 +503,14 @@ impl<'de> de::Deserializer<'de> for &mut Deserializer<'de> {
462503
where
463504
V: Visitor<'de>,
464505
{
465-
self.deserialize_seq(visitor)
506+
visitor.visit_string(self.read_string()?)
466507
}
467508

468509
fn deserialize_byte_buf<V>(self, visitor: V) -> Result<V::Value>
469510
where
470511
V: Visitor<'de>,
471512
{
472-
self.deserialize_seq(visitor)
513+
visitor.visit_string(self.read_string()?)
473514
}
474515

475516
fn deserialize_option<V>(self, visitor: V) -> Result<V::Value>
@@ -922,6 +963,19 @@ impl<'a> Decoder<'a> {
922963
self.buf = &self.buf[offset..];
923964
Ok(Value::Number(n))
924965
}
966+
EXTENSION_TAG => {
967+
let offset = jentry.length as usize;
968+
let v = &self.buf.get(..offset).ok_or(Error::InvalidJsonbExtension)?;
969+
let val = ExtensionValue::decode(v)?;
970+
self.buf = &self.buf[offset..];
971+
match val {
972+
ExtensionValue::Binary(v) => Ok(Value::Binary(v)),
973+
ExtensionValue::Date(v) => Ok(Value::Date(v)),
974+
ExtensionValue::Timestamp(v) => Ok(Value::Timestamp(v)),
975+
ExtensionValue::TimestampTz(v) => Ok(Value::TimestampTz(v)),
976+
ExtensionValue::Interval(v) => Ok(Value::Interval(v)),
977+
}
978+
}
925979
CONTAINER_TAG => self.decode_jsonb(),
926980
_ => Err(Error::InvalidJsonbJEntry),
927981
}

src/core/databend/jentry.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ impl JEntry {
6969
}
7070
}
7171

72+
pub(super) fn make_extension_jentry(length: usize) -> JEntry {
73+
JEntry {
74+
type_code: EXTENSION_TAG,
75+
length: length as u32,
76+
}
77+
}
78+
7279
pub(super) fn encoded(&self) -> u32 {
7380
self.type_code | self.length
7481
}

src/core/databend/ser.rs

Lines changed: 82 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use super::jentry::JEntry;
2626
use crate::core::ArrayBuilder;
2727
use crate::core::ObjectBuilder;
2828
use crate::error::*;
29+
use crate::extension::ExtensionValue;
2930
use crate::number::Number;
3031
use crate::value::Object;
3132
use crate::value::Value;
@@ -477,32 +478,42 @@ impl Serialize for RawJsonb<'_> {
477478
.map_err(|e| ser::Error::custom(format!("{e}")))?;
478479
index += 4;
479480

481+
let payload_start = index;
482+
let payload_end = index + jentry.length as usize;
480483
match jentry.type_code {
481484
NULL_TAG => serializer.serialize_unit(),
482485
TRUE_TAG => serializer.serialize_bool(true),
483486
FALSE_TAG => serializer.serialize_bool(false),
484487
NUMBER_TAG => {
485-
let payload_start = index;
486-
let payload_end = index + jentry.length as usize;
487-
488488
let num = Number::decode(&self.data[payload_start..payload_end])
489489
.map_err(|e| ser::Error::custom(format!("{e}")))?;
490490

491491
match num {
492492
Number::Int64(i) => serializer.serialize_i64(i),
493493
Number::UInt64(i) => serializer.serialize_u64(i),
494494
Number::Float64(i) => serializer.serialize_f64(i),
495+
Number::Decimal128(i) => {
496+
let v = i.to_float64();
497+
serializer.serialize_f64(v)
498+
}
499+
Number::Decimal256(i) => {
500+
let v = i.to_float64();
501+
serializer.serialize_f64(v)
502+
}
495503
}
496504
}
497505
STRING_TAG => {
498-
let payload_start = index;
499-
let payload_end = index + jentry.length as usize;
500-
501506
let s = unsafe {
502507
std::str::from_utf8_unchecked(&self.data[payload_start..payload_end])
503508
};
504509
serializer.serialize_str(s)
505510
}
511+
EXTENSION_TAG => {
512+
let val = ExtensionValue::decode(&self.data[payload_start..payload_end])
513+
.map_err(|e| ser::Error::custom(format!("{e}")))?;
514+
let s = format!("{}", val);
515+
serializer.serialize_str(&s)
516+
}
506517
CONTAINER_TAG => {
507518
// Scalar header can't have contianer jentry tag
508519
Err(ser::Error::custom("Invalid jsonb".to_string()))
@@ -532,6 +543,14 @@ impl Serialize for RawJsonb<'_> {
532543
Number::Int64(i) => serialize_seq.serialize_element(&i)?,
533544
Number::UInt64(i) => serialize_seq.serialize_element(&i)?,
534545
Number::Float64(i) => serialize_seq.serialize_element(&i)?,
546+
Number::Decimal128(i) => {
547+
let v = i.to_float64();
548+
serialize_seq.serialize_element(&v)?
549+
}
550+
Number::Decimal256(i) => {
551+
let v = i.to_float64();
552+
serialize_seq.serialize_element(&v)?
553+
}
535554
}
536555
}
537556
STRING_TAG => {
@@ -542,6 +561,13 @@ impl Serialize for RawJsonb<'_> {
542561
};
543562
serialize_seq.serialize_element(&s)?;
544563
}
564+
EXTENSION_TAG => {
565+
let val =
566+
ExtensionValue::decode(&self.data[payload_start..payload_end])
567+
.map_err(|e| ser::Error::custom(format!("{e}")))?;
568+
let s = format!("{}", val);
569+
serialize_seq.serialize_element(&s)?;
570+
}
545571
CONTAINER_TAG => {
546572
let inner_raw_jsonb =
547573
RawJsonb::new(&self.data[payload_start..payload_end]);
@@ -602,6 +628,14 @@ impl Serialize for RawJsonb<'_> {
602628
Number::Int64(i) => serialize_map.serialize_entry(&k, &i)?,
603629
Number::UInt64(i) => serialize_map.serialize_entry(&k, &i)?,
604630
Number::Float64(i) => serialize_map.serialize_entry(&k, &i)?,
631+
Number::Decimal128(i) => {
632+
let v = i.to_float64();
633+
serialize_map.serialize_entry(&k, &v)?
634+
}
635+
Number::Decimal256(i) => {
636+
let v = i.to_float64();
637+
serialize_map.serialize_entry(&k, &v)?
638+
}
605639
}
606640
}
607641
STRING_TAG => {
@@ -612,6 +646,13 @@ impl Serialize for RawJsonb<'_> {
612646
};
613647
serialize_map.serialize_entry(&k, &s)?;
614648
}
649+
EXTENSION_TAG => {
650+
let val =
651+
ExtensionValue::decode(&self.data[payload_start..payload_end])
652+
.map_err(|e| ser::Error::custom(format!("{e}")))?;
653+
let s = format!("{}", val);
654+
serialize_map.serialize_entry(&k, &s)?;
655+
}
615656
CONTAINER_TAG => {
616657
let inner_raw_jsonb =
617658
RawJsonb::new(&self.data[payload_start..payload_end]);
@@ -755,6 +796,41 @@ impl<'a> Encoder<'a> {
755796
self.buf.extend_from_slice(s.as_ref().as_bytes());
756797
JEntry::make_string_jentry(len)
757798
}
799+
Value::Binary(v) => {
800+
let old_off = self.buf.len();
801+
let val = ExtensionValue::Binary(v);
802+
let _ = val.compact_encode(&mut self.buf).unwrap();
803+
let len = self.buf.len() - old_off;
804+
JEntry::make_extension_jentry(len)
805+
}
806+
Value::Date(v) => {
807+
let old_off = self.buf.len();
808+
let val = ExtensionValue::Date(v.clone());
809+
let _ = val.compact_encode(&mut self.buf).unwrap();
810+
let len = self.buf.len() - old_off;
811+
JEntry::make_extension_jentry(len)
812+
}
813+
Value::Timestamp(v) => {
814+
let old_off = self.buf.len();
815+
let val = ExtensionValue::Timestamp(v.clone());
816+
let _ = val.compact_encode(&mut self.buf).unwrap();
817+
let len = self.buf.len() - old_off;
818+
JEntry::make_extension_jentry(len)
819+
}
820+
Value::TimestampTz(v) => {
821+
let old_off = self.buf.len();
822+
let val = ExtensionValue::TimestampTz(v.clone());
823+
let _ = val.compact_encode(&mut self.buf).unwrap();
824+
let len = self.buf.len() - old_off;
825+
JEntry::make_extension_jentry(len)
826+
}
827+
Value::Interval(v) => {
828+
let old_off = self.buf.len();
829+
let val = ExtensionValue::Interval(v.clone());
830+
let _ = val.compact_encode(&mut self.buf).unwrap();
831+
let len = self.buf.len() - old_off;
832+
JEntry::make_extension_jentry(len)
833+
}
758834
Value::Array(array) => {
759835
let len = self.encode_array(array);
760836
JEntry::make_container_jentry(len)

0 commit comments

Comments
 (0)