Skip to content

Commit c0023f0

Browse files
authored
[core] Introduce VariantType (apache#4757)
1 parent 9977760 commit c0023f0

File tree

67 files changed

+732
-9
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+732
-9
lines changed

paimon-arrow/src/main/java/org/apache/paimon/arrow/ArrowFieldTypeConversion.java

+6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.paimon.types.TinyIntType;
4141
import org.apache.paimon.types.VarBinaryType;
4242
import org.apache.paimon.types.VarCharType;
43+
import org.apache.paimon.types.VariantType;
4344

4445
import org.apache.arrow.vector.types.TimeUnit;
4546
import org.apache.arrow.vector.types.Types;
@@ -150,6 +151,11 @@ public FieldType visit(LocalZonedTimestampType localZonedTimestampType) {
150151
return new FieldType(localZonedTimestampType.isNullable(), arrowType, null);
151152
}
152153

154+
@Override
155+
public FieldType visit(VariantType variantType) {
156+
throw new UnsupportedOperationException();
157+
}
158+
153159
private TimeUnit getTimeUnit(int precision) {
154160
if (precision == 0) {
155161
return TimeUnit.SECOND;

paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/Arrow2PaimonVectorConverter.java

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.apache.paimon.types.TinyIntType;
6464
import org.apache.paimon.types.VarBinaryType;
6565
import org.apache.paimon.types.VarCharType;
66+
import org.apache.paimon.types.VariantType;
6667

6768
import org.apache.arrow.vector.BigIntVector;
6869
import org.apache.arrow.vector.BitVector;
@@ -423,6 +424,11 @@ public Timestamp getTimestamp(int i, int precision) {
423424
};
424425
}
425426

427+
@Override
428+
public Arrow2PaimonVectorConverter visit(VariantType variantType) {
429+
throw new UnsupportedOperationException();
430+
}
431+
426432
@Override
427433
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
428434
final Arrow2PaimonVectorConverter arrowVectorConvertor =

paimon-arrow/src/main/java/org/apache/paimon/arrow/writer/ArrowFieldWriterFactoryVisitor.java

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.paimon.types.TinyIntType;
4040
import org.apache.paimon.types.VarBinaryType;
4141
import org.apache.paimon.types.VarCharType;
42+
import org.apache.paimon.types.VariantType;
4243

4344
import org.apache.arrow.vector.FieldVector;
4445
import org.apache.arrow.vector.complex.ListVector;
@@ -138,6 +139,11 @@ public ArrowFieldWriterFactory visit(LocalZonedTimestampType localZonedTimestamp
138139
fieldVector, localZonedTimestampType.getPrecision(), null);
139140
}
140141

142+
@Override
143+
public ArrowFieldWriterFactory visit(VariantType variantType) {
144+
throw new UnsupportedOperationException("Doesn't support VariantType.");
145+
}
146+
141147
@Override
142148
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
143149
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);

paimon-codegen/src/main/scala/org/apache/paimon/codegen/GenerateUtils.scala

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.codegen
2020

2121
import org.apache.paimon.data._
22+
import org.apache.paimon.data.variant.Variant
2223
import org.apache.paimon.memory.MemorySegment
2324
import org.apache.paimon.types._
2425
import org.apache.paimon.types.DataTypeChecks.{getFieldCount, getFieldTypes, getPrecision, getScale}
@@ -380,6 +381,7 @@ object GenerateUtils {
380381
case ARRAY => className[InternalArray]
381382
case MULTISET | MAP => className[InternalMap]
382383
case ROW => className[InternalRow]
384+
case VARIANT => className[Variant]
383385
case _ =>
384386
throw new IllegalArgumentException("Illegal type: " + t)
385387
}
@@ -418,6 +420,8 @@ object GenerateUtils {
418420
s"$rowTerm.getMap($indexTerm)"
419421
case ROW =>
420422
s"$rowTerm.getRow($indexTerm, ${getFieldCount(t)})"
423+
case VARIANT =>
424+
s"$rowTerm.getVariant($indexTerm)"
421425
case _ =>
422426
throw new IllegalArgumentException("Illegal type: " + t)
423427
}

paimon-codegen/src/test/java/org/apache/paimon/codegen/EqualiserCodeGeneratorTest.java

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.paimon.data.serializer.InternalMapSerializer;
3131
import org.apache.paimon.data.serializer.InternalRowSerializer;
3232
import org.apache.paimon.data.serializer.Serializer;
33+
import org.apache.paimon.data.variant.GenericVariant;
3334
import org.apache.paimon.types.DataType;
3435
import org.apache.paimon.types.DataTypeRoot;
3536
import org.apache.paimon.types.DataTypes;
@@ -179,6 +180,13 @@ public class EqualiserCodeGeneratorTest {
179180
GenericRow.of(31, BinaryString.fromString("32")),
180181
GenericRow.of(31, BinaryString.fromString("33"))),
181182
new InternalRowSerializer(DataTypes.INT(), DataTypes.VARCHAR(2))));
183+
TEST_DATA.put(
184+
DataTypeRoot.VARIANT,
185+
new GeneratedData(
186+
DataTypes.VARIANT(),
187+
Pair.of(
188+
GenericVariant.fromJson("{\"age\":27,\"city\":\"Beijing\"}"),
189+
GenericVariant.fromJson("{\"age\":27,\"city\":\"Hangzhou\"}"))));
182190
}
183191

184192
@ParameterizedTest

paimon-common/src/main/java/org/apache/paimon/PartitionSettedRow.java

+8
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.paimon.data.InternalRow;
2727
import org.apache.paimon.data.PartitionInfo;
2828
import org.apache.paimon.data.Timestamp;
29+
import org.apache.paimon.data.variant.Variant;
2930
import org.apache.paimon.types.RowKind;
3031

3132
/** An implementation of {@link InternalRow} which provides a row the fixed partition value. */
@@ -153,6 +154,13 @@ public byte[] getBinary(int pos) {
153154
: row.getBinary(partitionInfo.getRealIndex(pos));
154155
}
155156

157+
@Override
158+
public Variant getVariant(int pos) {
159+
return partitionInfo.inPartitionRow(pos)
160+
? partition.getVariant(partitionInfo.getRealIndex(pos))
161+
: row.getVariant(partitionInfo.getRealIndex(pos));
162+
}
163+
156164
@Override
157165
public InternalArray getArray(int pos) {
158166
return partitionInfo.inPartitionRow(pos)

paimon-common/src/main/java/org/apache/paimon/casting/CastedArray.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.data.InternalMap;
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.data.Timestamp;
27+
import org.apache.paimon.data.variant.Variant;
2728

2829
/**
2930
* An implementation of {@link InternalArray} which provides a casted view of the underlying {@link
@@ -184,6 +185,11 @@ public byte[] getBinary(int pos) {
184185
return castElementGetter.getElementOrNull(array, pos);
185186
}
186187

188+
@Override
189+
public Variant getVariant(int pos) {
190+
return castElementGetter.getElementOrNull(array, pos);
191+
}
192+
187193
@Override
188194
public InternalArray getArray(int pos) {
189195
return castElementGetter.getElementOrNull(array, pos);

paimon-common/src/main/java/org/apache/paimon/casting/CastedRow.java

+6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.data.InternalMap;
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.data.Timestamp;
27+
import org.apache.paimon.data.variant.Variant;
2728
import org.apache.paimon.types.RowKind;
2829

2930
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -131,6 +132,11 @@ public byte[] getBinary(int pos) {
131132
return castMapping[pos].getFieldOrNull(row);
132133
}
133134

135+
@Override
136+
public Variant getVariant(int pos) {
137+
return castMapping[pos].getFieldOrNull(row);
138+
}
139+
134140
@Override
135141
public InternalArray getArray(int pos) {
136142
return castMapping[pos].getFieldOrNull(row);

paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java

+9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.paimon.data.InternalMap;
2525
import org.apache.paimon.data.InternalRow;
2626
import org.apache.paimon.data.Timestamp;
27+
import org.apache.paimon.data.variant.Variant;
2728
import org.apache.paimon.types.RowKind;
2829

2930
/**
@@ -181,6 +182,14 @@ public InternalRow getRow(int pos, int numFields) {
181182
return defaultValueRow.getRow(pos, numFields);
182183
}
183184

185+
@Override
186+
public Variant getVariant(int pos) {
187+
if (!row.isNullAt(pos)) {
188+
return row.getVariant(pos);
189+
}
190+
return defaultValueRow.getVariant(pos);
191+
}
192+
184193
public static DefaultValueRow from(InternalRow defaultValueRow) {
185194
return new DefaultValueRow(defaultValueRow);
186195
}

paimon-common/src/main/java/org/apache/paimon/data/AbstractBinaryWriter.java

+18
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.data.serializer.InternalArraySerializer;
2222
import org.apache.paimon.data.serializer.InternalMapSerializer;
2323
import org.apache.paimon.data.serializer.InternalRowSerializer;
24+
import org.apache.paimon.data.variant.Variant;
2425
import org.apache.paimon.memory.MemorySegment;
2526
import org.apache.paimon.memory.MemorySegmentUtils;
2627

@@ -177,6 +178,23 @@ public void writeTimestamp(int pos, Timestamp value, int precision) {
177178
}
178179
}
179180

181+
@Override
182+
public void writeVariant(int pos, Variant variant) {
183+
byte[] value = variant.value();
184+
byte[] metadata = variant.metadata();
185+
int totalSize = 4 + value.length + metadata.length;
186+
final int roundedSize = roundNumberOfBytesToNearestWord(totalSize);
187+
ensureCapacity(roundedSize);
188+
zeroOutPaddingBytes(totalSize);
189+
190+
segment.putInt(cursor, value.length);
191+
segment.put(cursor + 4, value, 0, value.length);
192+
segment.put(cursor + 4 + value.length, metadata, 0, metadata.length);
193+
194+
setOffsetAndSize(pos, cursor, totalSize);
195+
cursor += roundedSize;
196+
}
197+
180198
protected void zeroOutPaddingBytes(int numBytes) {
181199
if ((numBytes & 0x07) > 0) {
182200
segment.putLong(cursor + ((numBytes >> 3) << 3), 0L);

paimon-common/src/main/java/org/apache/paimon/data/BinaryArray.java

+10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.data;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.variant.Variant;
2223
import org.apache.paimon.memory.MemorySegment;
2324
import org.apache.paimon.memory.MemorySegmentUtils;
2425
import org.apache.paimon.types.DataType;
@@ -83,6 +84,7 @@ public static int calculateFixLengthPartSize(DataType type) {
8384
case MULTISET:
8485
case MAP:
8586
case ROW:
87+
case VARIANT:
8688
// long and double are 8 bytes;
8789
// otherwise it stores the length and offset of the variable-length part for types
8890
// such as is string, map, etc.
@@ -234,6 +236,14 @@ public byte[] getBinary(int pos) {
234236
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndSize);
235237
}
236238

239+
@Override
240+
public Variant getVariant(int pos) {
241+
assertIndexIsValid(pos);
242+
int fieldOffset = getElementOffset(pos, 8);
243+
final long offsetAndLen = MemorySegmentUtils.getLong(segments, fieldOffset);
244+
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
245+
}
246+
237247
@Override
238248
public InternalArray getArray(int pos) {
239249
assertIndexIsValid(pos);

paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java

+9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.data;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.variant.Variant;
2223
import org.apache.paimon.memory.MemorySegment;
2324
import org.apache.paimon.memory.MemorySegmentUtils;
2425
import org.apache.paimon.types.DataType;
@@ -335,6 +336,14 @@ public byte[] getBinary(int pos) {
335336
return MemorySegmentUtils.readBinary(segments, offset, fieldOffset, offsetAndLen);
336337
}
337338

339+
@Override
340+
public Variant getVariant(int pos) {
341+
assertIndexIsValid(pos);
342+
int fieldOffset = getFieldOffset(pos);
343+
final long offsetAndLen = segments[0].getLong(fieldOffset);
344+
return MemorySegmentUtils.readVariant(segments, offset, offsetAndLen);
345+
}
346+
338347
@Override
339348
public InternalArray getArray(int pos) {
340349
assertIndexIsValid(pos);

paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java

+8
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.paimon.data.serializer.InternalRowSerializer;
2424
import org.apache.paimon.data.serializer.InternalSerializers;
2525
import org.apache.paimon.data.serializer.Serializer;
26+
import org.apache.paimon.data.variant.Variant;
2627
import org.apache.paimon.types.DataType;
2728
import org.apache.paimon.types.DecimalType;
2829
import org.apache.paimon.types.LocalZonedTimestampType;
@@ -67,6 +68,8 @@ public interface BinaryWriter {
6768

6869
void writeTimestamp(int pos, Timestamp value, int precision);
6970

71+
void writeVariant(int pos, Variant variant);
72+
7073
void writeArray(int pos, InternalArray value, InternalArraySerializer serializer);
7174

7275
void writeMap(int pos, InternalMap value, InternalMapSerializer serializer);
@@ -139,6 +142,9 @@ static void write(
139142
case VARBINARY:
140143
writer.writeBinary(pos, (byte[]) o);
141144
break;
145+
case VARIANT:
146+
writer.writeVariant(pos, (Variant) o);
147+
break;
142148
default:
143149
throw new UnsupportedOperationException("Not support type: " + type);
144150
}
@@ -208,6 +214,8 @@ static ValueSetter createValueSetter(DataType elementType, Serializer<?> seriali
208214
return (writer, pos, value) ->
209215
writer.writeRow(
210216
pos, (InternalRow) value, (InternalRowSerializer) rowSerializer);
217+
case VARIANT:
218+
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
211219
default:
212220
String msg =
213221
String.format(

paimon-common/src/main/java/org/apache/paimon/data/DataGetters.java

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.data;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.variant.Variant;
2223

2324
/**
2425
* Getters to get data.
@@ -74,6 +75,9 @@ public interface DataGetters {
7475
/** Returns the binary value at the given position. */
7576
byte[] getBinary(int pos);
7677

78+
/** Returns the variant value at the given position. */
79+
Variant getVariant(int pos);
80+
7781
/** Returns the array value at the given position. */
7882
InternalArray getArray(int pos);
7983

paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.data;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.variant.Variant;
2223
import org.apache.paimon.types.ArrayType;
2324
import org.apache.paimon.utils.ArrayUtils;
2425

@@ -204,6 +205,11 @@ public byte[] getBinary(int pos) {
204205
return (byte[]) getObject(pos);
205206
}
206207

208+
@Override
209+
public Variant getVariant(int pos) {
210+
return (Variant) getObject(pos);
211+
}
212+
207213
@Override
208214
public BinaryString getString(int pos) {
209215
return (BinaryString) getObject(pos);

paimon-common/src/main/java/org/apache/paimon/data/GenericRow.java

+6
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.data;
2020

2121
import org.apache.paimon.annotation.Public;
22+
import org.apache.paimon.data.variant.Variant;
2223
import org.apache.paimon.types.RowKind;
2324
import org.apache.paimon.types.RowType;
2425

@@ -186,6 +187,11 @@ public byte[] getBinary(int pos) {
186187
return (byte[]) this.fields[pos];
187188
}
188189

190+
@Override
191+
public Variant getVariant(int pos) {
192+
return (Variant) this.fields[pos];
193+
}
194+
189195
@Override
190196
public InternalArray getArray(int pos) {
191197
return (InternalArray) this.fields[pos];

paimon-common/src/main/java/org/apache/paimon/data/InternalArray.java

+3
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ static ElementGetter createElementGetter(DataType elementType) {
130130
final int rowFieldCount = getFieldCount(elementType);
131131
elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount);
132132
break;
133+
case VARIANT:
134+
elementGetter = InternalArray::getVariant;
135+
break;
133136
default:
134137
String msg =
135138
String.format(

paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java

+3
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ static FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
221221
final int rowFieldCount = DataTypeChecks.getFieldCount(fieldType);
222222
fieldGetter = row -> row.getRow(fieldPos, rowFieldCount);
223223
break;
224+
case VARIANT:
225+
fieldGetter = row -> row.getVariant(fieldPos);
226+
break;
224227
default:
225228
String msg =
226229
String.format(

0 commit comments

Comments
 (0)