Skip to content

Commit 5f34456

Browse files
committed
Revert to generate avro schema to record of value + metadata and pass in variant value + metadata schema
1 parent ee34713 commit 5f34456

File tree

4 files changed

+41
-20
lines changed

4 files changed

+41
-20
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.avro.Conversion;
2525
import org.apache.avro.LogicalType;
2626
import org.apache.avro.Schema;
27+
import org.apache.avro.SchemaBuilder;
2728
import org.apache.avro.generic.GenericArray;
2829
import org.apache.avro.generic.GenericData;
2930
import org.apache.avro.generic.IndexedRecord;
@@ -58,6 +59,18 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter
5859
private final GenericData model;
5960
private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();
6061

62+
private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord")
63+
.fields()
64+
.name("metadata")
65+
.type()
66+
.bytesType()
67+
.noDefault()
68+
.name("value")
69+
.type()
70+
.bytesType()
71+
.noDefault()
72+
.endRecord();
73+
6174
public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
6275
this(parquetSchema, avroSchema, SpecificData.get());
6376
}
@@ -170,7 +183,7 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode
170183
return new MapConverter(parent, type.asGroupType(), schema, model);
171184
case RECORD:
172185
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
173-
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
186+
return new AvroVariantConverter(parent, type.asGroupType(), VARIANT_SCHEMA, model);
174187
} else {
175188
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
176189
}

parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,18 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
9393
private final GenericData model;
9494
private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();
9595

96+
private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord")
97+
.fields()
98+
.name("metadata")
99+
.type()
100+
.bytesType()
101+
.noDefault()
102+
.name("value")
103+
.type()
104+
.bytesType()
105+
.noDefault()
106+
.endRecord();
107+
96108
AvroRecordConverter(
97109
MessageType parquetSchema, Schema avroSchema, GenericData baseModel, ReflectClassValidator validator) {
98110
this(null, parquetSchema, avroSchema, baseModel, validator);
@@ -396,7 +408,7 @@ private static Converter newConverter(
396408
return newStringConverter(schema, model, parent, validator);
397409
case RECORD:
398410
if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
399-
return new AvroVariantConverter(parent, type.asGroupType(), schema, model);
411+
return new AvroVariantConverter(parent, type.asGroupType(), VARIANT_SCHEMA, model);
400412
} else {
401413
return new AvroRecordConverter(parent, type.asGroupType(), schema, model, validator);
402414
}

parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,8 +470,11 @@ public Optional<Schema> visit(
470470
@Override
471471
public Optional<Schema> visit(
472472
LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) {
473-
return of(
474-
convertFields(parquetGroupType.getName(), parquetGroupType.getFields(), names));
473+
String name = parquetGroupType.getName();
474+
List<Schema.Field> fields = new ArrayList<>();
475+
fields.add(new Schema.Field("metadata", Schema.create(Schema.Type.BYTES)));
476+
fields.add(new Schema.Field("value", Schema.create(Schema.Type.BYTES)));
477+
return of(Schema.createRecord(name, null, namespace(name, names), false, fields));
475478
}
476479
})
477480
.orElseThrow(

parquet-avro/src/main/java/org/apache/parquet/avro/AvroVariantConverter.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.function.Consumer;
2323
import org.apache.avro.Schema;
24-
import org.apache.avro.SchemaBuilder;
2524
import org.apache.avro.generic.GenericData;
2625
import org.apache.parquet.Preconditions;
2726
import org.apache.parquet.io.api.Converter;
@@ -35,27 +34,21 @@
3534
* Converter for Variant values.
3635
*/
3736
class AvroVariantConverter extends GroupConverter implements VariantConverters.ParentConverter<VariantBuilder> {
38-
private static final Schema VARIANT_SCHEMA = SchemaBuilder.record("VariantRecord")
39-
.fields()
40-
.name("metadata")
41-
.type()
42-
.bytesType()
43-
.noDefault()
44-
.name("value")
45-
.type()
46-
.bytesType()
47-
.noDefault()
48-
.endRecord();
49-
5037
private final ParentValueContainer parent;
38+
private final Schema avroSchema;
5139
private final GenericData model;
40+
private final int metadataPos;
41+
private final int valuePos;
5242
private final GroupConverter wrappedConverter;
5343

5444
private VariantBuilder builder = null;
5545
private ImmutableMetadata metadata = null;
5646

5747
AvroVariantConverter(ParentValueContainer parent, GroupType variantGroup, Schema avroSchema, GenericData model) {
5848
this.parent = parent;
49+
this.avroSchema = avroSchema;
50+
this.metadataPos = avroSchema.getField("metadata").pos();
51+
this.valuePos = avroSchema.getField("value").pos();
5952
this.model = model;
6053
this.wrappedConverter = VariantConverters.newVariantConverter(variantGroup, this::setMetadata, this);
6154
}
@@ -84,9 +77,9 @@ public void end() {
8477

8578
builder.appendNullIfEmpty();
8679

87-
Object record = model.newRecord(null, VARIANT_SCHEMA);
88-
model.setField(record, "metadata", 0, metadata.getEncodedBuffer());
89-
model.setField(record, "value", 1, builder.encodedValue());
80+
Object record = model.newRecord(null, avroSchema);
81+
model.setField(record, "metadata", metadataPos, metadata.getEncodedBuffer());
82+
model.setField(record, "value", valuePos, builder.encodedValue());
9083
parent.add(record);
9184

9285
this.builder = null;

0 commit comments

Comments
 (0)