diff --git a/src/main/java/com/gotocompany/depot/SinkFactory.java b/src/main/java/com/gotocompany/depot/SinkFactory.java new file mode 100644 index 00000000..ba9dbb7f --- /dev/null +++ b/src/main/java/com/gotocompany/depot/SinkFactory.java @@ -0,0 +1,6 @@ +package com.gotocompany.depot; + +public interface SinkFactory { + void init(); + Sink create(); +} diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index 3352e411..0f05eab3 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -1,13 +1,16 @@ package com.gotocompany.depot.config; +import com.aliyun.odps.Column; import com.aliyun.odps.tunnel.io.CompressOption; import com.gotocompany.depot.common.TupleString; import com.gotocompany.depot.config.converter.ConfToListConverter; import com.gotocompany.depot.config.converter.LocalDateTimeConverter; +import com.gotocompany.depot.config.converter.MaxComputeDefaultColumnConverter; import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter; import com.gotocompany.depot.config.converter.ZoneIdConverter; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; import org.aeonbits.owner.Config; +import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; import java.time.ZoneId; @@ -149,4 +152,10 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("1") int getMaxFutureYearEventTimeDifference(); + @DefaultValue("") + @Key("SINK_MAXCOMPUTE_DEFAULT_COLUMNS") + @ConverterClass(MaxComputeDefaultColumnConverter.class) + @Separator(",") + List getDefaultColumns(); + } diff --git a/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java b/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java new file mode 100644 index 00000000..6d3411f0 --- /dev/null +++ b/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java @@ -0,0 +1,40 @@ +package com.gotocompany.depot.config.converter; + +import com.aliyun.odps.Column; +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.TypeInfoFactory; +import com.google.common.collect.ImmutableSet; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; +import java.util.Set; + +import static com.aliyun.odps.OdpsType.*; + +public class MaxComputeDefaultColumnConverter implements Converter { + + private static final Set PRIMITIVE_TYPES = ImmutableSet.of( + BIGINT, DOUBLE, BOOLEAN, DATETIME, STRING, DECIMAL, + ARRAY, TINYINT, SMALLINT, INT, FLOAT, DATE, TIMESTAMP, + BINARY, TIMESTAMP_NTZ); + + private static final String NAME_TYPE_SEPARATOR = ":"; + private static final int COLUMN_NAME_INDEX = 0; + private static final int COLUMN_TYPE_INDEX = 1; + + @Override + public Column convert(Method method, String s) { + String[] parts = s.split(NAME_TYPE_SEPARATOR); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid column format: " + s); + } + String name = parts[COLUMN_NAME_INDEX]; + OdpsType type = OdpsType.valueOf(parts[COLUMN_TYPE_INDEX].toUpperCase()); + if (!PRIMITIVE_TYPES.contains(type)) { + throw new IllegalArgumentException("Unsupported column type: " + type); + } + return Column.newBuilder(name, TypeInfoFactory.getPrimitiveTypeInfo(type)) + .build(); + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java new file mode 100644 index 00000000..4cb15bdc --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java @@ -0,0 +1,35 @@ +package com.gotocompany.depot.maxcompute; + +import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkFactory; +import com.gotocompany.depot.config.SinkConfig; +import com.gotocompany.depot.metrics.StatsDReporter; +import com.gotocompany.stencil.client.StencilClient; +import org.aeonbits.owner.ConfigFactory; + +import java.util.Map; + +public class JsonMaxComputeSinkFactory implements SinkFactory { + + private final StatsDReporter statsDReporter; + private final StencilClient stencilClient; + private final SinkConfig sinkConfig; + + public JsonMaxComputeSinkFactory(StatsDReporter statsDReporter, + StencilClient stencilClient, + Map env) { + this.statsDReporter = statsDReporter; + this.stencilClient = stencilClient; + this.sinkConfig = ConfigFactory.create(SinkConfig.class, env); + } + + @Override + public void init() { + + } + + @Override + public Sink create() { + return null; + } +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java similarity index 78% rename from src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java rename to src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java index be2900b6..6092902c 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java @@ -2,6 +2,7 @@ import com.google.protobuf.Descriptors; import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkFactory; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.client.MaxComputeClient; @@ -9,7 +10,7 @@ import com.gotocompany.depot.maxcompute.converter.record.ProtoMessageRecordConverter; import com.gotocompany.depot.maxcompute.record.RecordDecorator; import com.gotocompany.depot.maxcompute.record.RecordDecoratorFactory; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCacheFactory; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; @@ -24,7 +25,7 @@ import java.util.Map; -public class MaxComputeSinkFactory { +public class ProtobufMaxComputeSinkFactory implements SinkFactory { private final MaxComputeSinkConfig maxComputeSinkConfig; private final SinkConfig sinkConfig; @@ -35,13 +36,13 @@ public class MaxComputeSinkFactory { private final MaxComputeClient maxComputeClient; private final MetadataUtil metadataUtil; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private PartitioningStrategy partitioningStrategy; private MessageParser messageParser; - public MaxComputeSinkFactory(StatsDReporter statsDReporter, - StencilClient stencilClient, - Map env) { + public ProtobufMaxComputeSinkFactory(StatsDReporter statsDReporter, + StencilClient stencilClient, + Map env) { this.statsDReporter = statsDReporter; this.maxComputeSinkConfig = ConfigFactory.create(MaxComputeSinkConfig.class, env); this.sinkConfig = ConfigFactory.create(SinkConfig.class, env); @@ -52,22 +53,24 @@ public MaxComputeSinkFactory(StatsDReporter statsDReporter, this.metadataUtil = new MetadataUtil(maxComputeSinkConfig); } + @Override public void init() { Descriptors.Descriptor descriptor = stencilClient.get(getProtoSchemaClassName(sinkConfig)); this.partitioningStrategy = PartitioningStrategyFactory.createPartitioningStrategy(protobufConverterOrchestrator, maxComputeSinkConfig, descriptor); - this.maxComputeSchemaCache = MaxComputeSchemaCacheFactory.createMaxComputeSchemaCache(protobufConverterOrchestrator, + this.protobufMaxComputeSchemaCache = MaxComputeSchemaCacheFactory.createProtobufMaxComputeSchemaCache(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, sinkConfig, maxComputeClient, metadataUtil); - this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, maxComputeSchemaCache); - maxComputeSchemaCache.setMessageParser(messageParser); - maxComputeSchemaCache.updateSchema(); + this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, protobufMaxComputeSchemaCache); + protobufMaxComputeSchemaCache.setMessageParser(messageParser); + protobufMaxComputeSchemaCache.updateSchema(); } + @Override public Sink create() { - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( - new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, maxComputeSchemaCache, messageParser, + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( + new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, protobufMaxComputeSchemaCache, messageParser, partitioningStrategy, maxComputeSinkConfig, sinkConfig, statsDReporter, maxComputeMetrics, metadataUtil) ); - ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); return new MaxComputeSink(maxComputeClient.createInsertManager(), protoMessageRecordConverter, statsDReporter, maxComputeMetrics); } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java b/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java index 5b182bce..8172478e 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java @@ -10,7 +10,7 @@ import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.model.RecordWrappers; import com.gotocompany.depot.maxcompute.record.RecordDecorator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.message.Message; import lombok.RequiredArgsConstructor; @@ -25,7 +25,7 @@ public class ProtoMessageRecordConverter implements MessageRecordConverter { private final RecordDecorator recordDecorator; - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; /** * Converts a list of messages to RecordWrappers. @@ -36,7 +36,7 @@ public class ProtoMessageRecordConverter implements MessageRecordConverter { */ @Override public RecordWrappers convert(List messages) { - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); RecordWrappers recordWrappers = new RecordWrappers(); IntStream.range(0, messages.size()) .forEach(index -> { diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java index 4322eb11..d98e8f5f 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java @@ -9,7 +9,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.Message; @@ -25,7 +25,7 @@ */ public class ProtoMetadataColumnRecordDecorator extends RecordDecorator { - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private final Map metadataTypePairs; private final String maxcomputeMetadataNamespace; private final List metadataColumnsTypes; @@ -33,10 +33,10 @@ public class ProtoMetadataColumnRecordDecorator extends RecordDecorator { public ProtoMetadataColumnRecordDecorator(RecordDecorator recordDecorator, MaxComputeSinkConfig maxComputeSinkConfig, - MaxComputeSchemaCache maxComputeSchemaCache, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache, MetadataUtil metadataUtil) { super(recordDecorator); - this.maxComputeSchemaCache = maxComputeSchemaCache; + this.protobufMaxComputeSchemaCache = protobufMaxComputeSchemaCache; this.metadataUtil = metadataUtil; this.metadataTypePairs = maxComputeSinkConfig.getMetadataColumnsTypes() .stream() @@ -66,7 +66,7 @@ public RecordWrapper process(RecordWrapper recordWrapper, Message message) throw private void appendNamespacedMetadata(Record record, Message message) { Map metadata = message.getMetadata(metadataColumnsTypes); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); StructTypeInfo typeInfo = (StructTypeInfo) maxComputeSchema.getTableSchema() .getColumn(maxcomputeMetadataNamespace) .getTypeInfo(); @@ -80,7 +80,7 @@ private void appendNamespacedMetadata(Record record, Message message) { private void appendMetadata(Record record, Message message) { Map metadata = message.getMetadata(metadataColumnsTypes); - for (Map.Entry entry : maxComputeSchemaCache.getMaxComputeSchema() + for (Map.Entry entry : protobufMaxComputeSchemaCache.getMaxComputeSchema() .getMetadataColumns() .entrySet()) { Object value = metadata.get(entry.getKey()); diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java index f785ba25..a31a9c85 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java @@ -3,7 +3,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.MessageParser; @@ -20,7 +20,7 @@ public class RecordDecoratorFactory { * @param recordDecoratorConfig record decorator configuration * @return record decorator */ - public static RecordDecorator createRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) { + public static RecordDecorator createProtobufRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) { RecordDecorator dataColumnRecordDecorator = new ProtoDataColumnRecordDecorator(null, recordDecoratorConfig.protobufConverterOrchestrator, recordDecoratorConfig.messageParser, @@ -32,13 +32,13 @@ public static RecordDecorator createRecordDecorator(RecordDecoratorConfig record return dataColumnRecordDecorator; } return new ProtoMetadataColumnRecordDecorator(dataColumnRecordDecorator, recordDecoratorConfig.maxComputeSinkConfig, - recordDecoratorConfig.maxComputeSchemaCache, recordDecoratorConfig.metadataUtil); + recordDecoratorConfig.protobufMaxComputeSchemaCache, recordDecoratorConfig.metadataUtil); } @RequiredArgsConstructor public static class RecordDecoratorConfig { private final ProtobufConverterOrchestrator protobufConverterOrchestrator; - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private final MessageParser messageParser; private final PartitioningStrategy partitioningStrategy; private final MaxComputeSinkConfig maxComputeSinkConfig; diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java new file mode 100644 index 00000000..232cd0bf --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java @@ -0,0 +1,62 @@ +package com.gotocompany.depot.maxcompute.schema; + +import com.aliyun.odps.Column; +import com.aliyun.odps.TableSchema; +import com.gotocompany.depot.config.MaxComputeSinkConfig; +import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; +import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; +import com.gotocompany.depot.maxcompute.util.MetadataUtil; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@RequiredArgsConstructor +public class JsonMaxComputeSchemaBuilder { + + private final MaxComputeSinkConfig maxComputeSinkConfig; + private final PartitioningStrategy partitioningStrategy; + private final MetadataUtil metadataUtil; + + public MaxComputeSchema build() { + List metadataColumns = buildMetadataColumns(); + TableSchema.Builder tableSchemaBuilder = com.aliyun.odps.TableSchema.builder() + .withColumns(metadataColumns) + .withColumns(buildDataColumns()); + if (maxComputeSinkConfig.isTablePartitioningEnabled()) { + tableSchemaBuilder.withPartitionColumn(buildPartitionColumn()); + } + return new MaxComputeSchema(tableSchemaBuilder.build(), metadataColumns.stream().collect(Collectors.toMap(Column::getName, Column::getTypeInfo))); + } + + private Column buildPartitionColumn() { + return partitioningStrategy.getPartitionColumn(); + } + + private List buildDataColumns() { + return maxComputeSinkConfig.getDefaultColumns() + .stream() + .filter(col -> { + if (!maxComputeSinkConfig.isTablePartitioningEnabled() || !col.getName().equals(maxComputeSinkConfig.getTablePartitionKey())) { + return true; + } + return !partitioningStrategy.shouldReplaceOriginalColumn(); + }).collect(Collectors.toList()); + } + + private List buildMetadataColumns() { + if (!maxComputeSinkConfig.shouldAddMetadata()) { + return new ArrayList<>(); + } + if (StringUtils.isNotBlank(maxComputeSinkConfig.getMaxcomputeMetadataNamespace())) { + throw new IllegalArgumentException("Maxcompute metadata namespace is not supported for JSON schema"); + } + return maxComputeSinkConfig.getMetadataColumnsTypes() + .stream() + .map(tuple -> Column.newBuilder(tuple.getFirst(), metadataUtil.getMetadataTypeInfo(tuple.getSecond())).build()) + .collect(Collectors.toList()); + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java new file mode 100644 index 00000000..aeabce22 --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java @@ -0,0 +1,23 @@ +package com.gotocompany.depot.maxcompute.schema; + +import com.aliyun.odps.TableSchema; +import com.gotocompany.depot.config.MaxComputeSinkConfig; +import com.gotocompany.depot.maxcompute.client.MaxComputeClient; +import com.gotocompany.depot.stencil.DepotStencilUpdateListener; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class JsonMaxComputeSchemaCache extends DepotStencilUpdateListener { + + private final MaxComputeSinkConfig maxComputeSinkConfig; + private final JsonMaxComputeSchemaBuilder jsonMaxComputeSchemaBuilder; + private final MaxComputeClient maxComputeClient; + + @Override + public void updateSchema() { + TableSchema remoteSchema = maxComputeClient.getLatestTableSchema(); + + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java index 871c0066..c437f487 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java @@ -9,7 +9,7 @@ public class MaxComputeSchemaCacheFactory { - public static MaxComputeSchemaCache createMaxComputeSchemaCache( + public static ProtobufMaxComputeSchemaCache createProtobufMaxComputeSchemaCache( ProtobufConverterOrchestrator protobufConverterOrchestrator, MaxComputeSinkConfig maxComputeSinkConfig, PartitioningStrategy partitioningStrategy, @@ -17,8 +17,8 @@ public static MaxComputeSchemaCache createMaxComputeSchemaCache( MaxComputeClient maxComputeClient, MetadataUtil metadataUtil ) { - return new MaxComputeSchemaCache( - new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil), + return new ProtobufMaxComputeSchemaCache( + new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil), sinkConfig, protobufConverterOrchestrator, maxComputeClient ); diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java similarity index 98% rename from src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java rename to src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java index b3121204..39cb1400 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java @@ -18,7 +18,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor -public class MaxComputeSchemaBuilder { +public class ProtobufMaxComputeSchemaBuilder { private final ProtobufConverterOrchestrator protobufConverterOrchestrator; private final MaxComputeSinkConfig maxComputeSinkConfig; diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java similarity index 83% rename from src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java rename to src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java index a98ebc65..7b0c2ab3 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java @@ -20,19 +20,19 @@ * It also caches the MaxCompute schema. */ @Slf4j -public class MaxComputeSchemaCache extends DepotStencilUpdateListener { +public class ProtobufMaxComputeSchemaCache extends DepotStencilUpdateListener { - private final MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private final ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private final SinkConfig sinkConfig; private final ProtobufConverterOrchestrator protobufConverterOrchestrator; private final MaxComputeClient maxComputeClient; private MaxComputeSchema maxComputeSchema; - public MaxComputeSchemaCache(MaxComputeSchemaBuilder maxComputeSchemaBuilder, - SinkConfig sinkConfig, - ProtobufConverterOrchestrator protobufConverterOrchestrator, - MaxComputeClient maxComputeClient) { - this.maxComputeSchemaBuilder = maxComputeSchemaBuilder; + public ProtobufMaxComputeSchemaCache(ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder, + SinkConfig sinkConfig, + ProtobufConverterOrchestrator protobufConverterOrchestrator, + MaxComputeClient maxComputeClient) { + this.protobufMaxComputeSchemaBuilder = protobufMaxComputeSchemaBuilder; this.sinkConfig = sinkConfig; this.protobufConverterOrchestrator = protobufConverterOrchestrator; this.maxComputeClient = maxComputeClient; @@ -75,7 +75,7 @@ public synchronized void updateSchema() { } private void updateMaxComputeTableSchema(Descriptors.Descriptor descriptor) { - MaxComputeSchema localSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema localSchema = protobufMaxComputeSchemaBuilder.build(descriptor); protobufConverterOrchestrator.clearCache(); try { maxComputeClient.createOrUpdateTable(localSchema.getTableSchema()); diff --git a/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java b/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java new file mode 100644 index 00000000..e1e8cd03 --- /dev/null +++ b/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java @@ -0,0 +1,20 @@ +package com.gotocompany.depot.config.converter; + +import com.aliyun.odps.Column; +import org.junit.Test; + +import java.util.List; + +public class MaxComputeDefaultColumnConverterTest { + + private static final MaxComputeDefaultColumnConverter converter = new MaxComputeDefaultColumnConverter(); + + @Test + public void shouldParseComplexTableStatement() { + String input = "col1:STRING,col2:ARRAY,col3:STRUCT>>"; + + List columns = converter.convert(null, input); + + System.out.println(columns); + } +} diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java index 76b8d926..8292f301 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java @@ -15,14 +15,14 @@ import com.gotocompany.depot.exception.UnknownFieldsException; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.model.RecordWrappers; import com.gotocompany.depot.maxcompute.record.ProtoDataColumnRecordDecorator; import com.gotocompany.depot.maxcompute.record.ProtoMetadataColumnRecordDecorator; import com.gotocompany.depot.maxcompute.record.RecordDecorator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; import com.gotocompany.depot.maxcompute.util.MetadataUtil; @@ -57,9 +57,9 @@ public class ProtoMessageRecordConverterTest { private MaxComputeSinkConfig maxComputeSinkConfig; private ProtobufConverterOrchestrator protobufConverterOrchestrator; private ProtoMessageParser protoMessageParser; - private MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private SinkConfig sinkConfig; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private ProtoMessageRecordConverter protoMessageRecordConverter; @Before @@ -97,10 +97,10 @@ public void setup() throws IOException { descriptor ); MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); - maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil); - maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil); + protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); Instrumentation instrumentation = Mockito.mock(Instrumentation.class); Mockito.doNothing().when(instrumentation) .captureDurationSince(Mockito.any(), Mockito.any()); @@ -108,8 +108,8 @@ public void setup() throws IOException { RecordDecorator protoDataColumnRecordDecorator = new ProtoDataColumnRecordDecorator(null, protobufConverterOrchestrator, protoMessageParser, sinkConfig, partitioningStrategy, Mockito.mock(StatsDReporter.class), maxComputeMetrics); - RecordDecorator metadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(protoDataColumnRecordDecorator, maxComputeSinkConfig, maxComputeSchemaCache, metadataUtil); - protoMessageRecordConverter = new ProtoMessageRecordConverter(metadataColumnRecordDecorator, maxComputeSchemaCache); + RecordDecorator metadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(protoDataColumnRecordDecorator, maxComputeSinkConfig, protobufMaxComputeSchemaCache, metadataUtil); + protoMessageRecordConverter = new ProtoMessageRecordConverter(metadataColumnRecordDecorator, protobufMaxComputeSchemaCache); } @Test @@ -168,7 +168,7 @@ public void shouldReturnRecordWrapperWithDeserializationErrorWhenIOExceptionIsTh RecordDecorator recordDecorator = Mockito.mock(RecordDecorator.class); Mockito.doThrow(new IOException()).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), @@ -194,7 +194,7 @@ public void shouldReturnRecordWrapperWithUnknownFieldsErrorWhenUnknownFieldExcep com.google.protobuf.Message mockedMessage = getMockedMessage(); Mockito.doThrow(new UnknownFieldsException(mockedMessage)).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), @@ -220,7 +220,7 @@ public void shouldReturnRecordWrapperWithInvalidMessageErrorWhenInvalidMessageEx String invalidMessage = "Invalid message"; Mockito.doThrow(new InvalidMessageException(invalidMessage)).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java index 82726358..0f6f2ee7 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java @@ -15,10 +15,10 @@ import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.DefaultPartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.TimestampPartitioningStrategy; @@ -47,7 +47,7 @@ public class ProtoDataColumnRecordDecoratorTest { private static final Descriptors.Descriptor DESCRIPTOR = TestMaxComputeRecord.MaxComputeRecord.getDescriptor(); - private MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private ProtoDataColumnRecordDecorator protoDataColumnRecordDecorator; @Before @@ -69,7 +69,7 @@ public void setup() throws IOException { @Test public void decorateShouldProcessDataColumnToRecord() throws IOException { - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -110,7 +110,7 @@ public void decorateShouldProcessDataColumnToRecordAndOmitPartitionColumnIfParti PartitioningStrategy partitioningStrategy = new DefaultPartitioningStrategy(TypeInfoFactory.STRING, maxComputeSinkConfig); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -153,7 +153,7 @@ public void decorateShouldProcessDataColumnToRecordAndShouldNotOmitOriginalColum when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); PartitioningStrategy partitioningStrategy = new TimestampPartitioningStrategy(maxComputeSinkConfig); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -209,7 +209,7 @@ public void decorateShouldSetDefaultPartitioningSpecWhenProtoFieldNotExists() th )) .build(); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, maxComputeRecord); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); Message message = new Message(null, maxComputeRecord.toByteArray()); @@ -233,7 +233,7 @@ public void decorateShouldSetDefaultPartitioningSpecWhenProtoFieldNotExists() th @Test public void decorateShouldPutDefaultPartitionSpec() throws IOException { - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -273,7 +273,7 @@ public void decorateShouldCallInjectedDecorator() throws IOException { SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, recordDecorator, null, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -303,15 +303,15 @@ private void instantiateProtoDataColumnRecordDecorator(SinkConfig sinkConfig, Ma PartitioningStrategy partitioningStrategy, com.google.protobuf.Message mockedMessage) throws IOException { ProtobufConverterOrchestrator protobufConverterOrchestrator = new ProtobufConverterOrchestrator(maxComputeSinkConfig); - maxComputeSchemaBuilder = new MaxComputeSchemaBuilder( + protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder( protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig) ); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); ParsedMessage parsedMessage = Mockito.mock(ParsedMessage.class); when(parsedMessage.getRaw()).thenReturn(mockedMessage); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java index 1ed9989d..7096cd4d 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java @@ -12,10 +12,10 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.Message; import com.gotocompany.depot.message.proto.ProtoParsedMessage; @@ -37,7 +37,7 @@ public class ProtoMetadataColumnRecordDecoratorTest { private final Descriptors.Descriptor descriptor = TestMaxComputeRecord.MaxComputeRecord.getDescriptor(); private MaxComputeSinkConfig maxComputeSinkConfig; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private ProtoMetadataColumnRecordDecorator protoMetadataColumnRecordDecorator; @Before @@ -65,7 +65,7 @@ public void shouldPopulateRecordWithNamespacedMetadata() throws IOException { new Tuple<>("__kafka_topic", "topic"), new Tuple<>("__kafka_offset", 100L) ); - Record record = new ArrayRecord(maxComputeSchemaCache.getMaxComputeSchema().getColumns()); + Record record = new ArrayRecord(protobufMaxComputeSchemaCache.getMaxComputeSchema().getColumns()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); LocalDateTime expectedLocalDateTime = Instant.ofEpochMilli(10002010L) .atZone(ZoneId.of("UTC")) @@ -101,7 +101,7 @@ public void shouldPopulateRecordWithNonNamespacedMetadata() throws IOException { new Tuple<>("__kafka_topic", "topic"), new Tuple<>("__kafka_offset", 100L) ); - Record record = new ArrayRecord(maxComputeSchemaCache.getMaxComputeSchema().getColumns()); + Record record = new ArrayRecord(protobufMaxComputeSchemaCache.getMaxComputeSchema().getColumns()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); LocalDateTime expectedLocalDateTime = Instant.ofEpochMilli(10002010L) .atZone(ZoneId.of("UTC")) @@ -145,10 +145,10 @@ private void initializeDecorator(MaxComputeSinkConfig sinkConfig) { this.maxComputeSinkConfig = sinkConfig; ProtobufConverterOrchestrator protobufConverterOrchestrator = new ProtobufConverterOrchestrator(sinkConfig); MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, sinkConfig, null, metadataUtil); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); - maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); - protoMetadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(null, sinkConfig, maxComputeSchemaCache, metadataUtil); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, sinkConfig, null, metadataUtil); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); + protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + protoMetadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(null, sinkConfig, protobufMaxComputeSchemaCache, metadataUtil); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java index 9755e872..9ba7f88b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java @@ -27,7 +27,7 @@ public void shouldCreateDataRecordDecorator() { when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); when(sinkConfig.getSinkConnectorSchemaProtoMessageClass()).thenReturn("com.gotocompany.depot.message.Message"); - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig( null, null, null, null, maxComputeSinkConfig, sinkConfig, Mockito.mock(StatsDReporter.class), @@ -52,7 +52,7 @@ public void shouldCreateDataRecordDecoratorWithNamespaceDecorator() { when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); when(sinkConfig.getSinkConnectorSchemaProtoMessageClass()).thenReturn("com.gotocompany.depot.message.Message"); - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig( null, null, null, null, maxComputeSinkConfig, sinkConfig, Mockito.mock(StatsDReporter.class), diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java similarity index 92% rename from src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java rename to src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java index b2621e8a..d364fb1b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.when; -public class MaxComputeSchemaBuilderTest { +public class ProtobufMaxComputeSchemaBuilderTest { private final Descriptors.Descriptor descriptor = TextMaxComputeTable.Table.getDescriptor(); @@ -49,12 +49,12 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); int expectedNonPartitionColumnCount = 7; int expectedPartitionColumnCount = 1; - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -109,11 +109,11 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder( + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder( new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -159,10 +159,10 @@ public void shouldBuildTableSchemaWithoutPartitionAndMeta() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -206,10 +206,10 @@ public void shouldThrowIllegalArgumentExceptionWhenPartitionKeyIsNotFound() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - maxComputeSchemaBuilder.build(descriptor); + protobufMaxComputeSchemaBuilder.build(descriptor); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java similarity index 75% rename from src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java rename to src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java index e1126b48..3d6376b2 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java @@ -24,17 +24,17 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class MaxComputeSchemaCacheTest { +public class ProtobufMaxComputeSchemaCacheTest { @Test public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); when(protoMessageParser.getDescriptorMap()).thenReturn(newDescriptor); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -46,13 +46,13 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( - maxComputeSchemaBuilder, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -61,7 +61,7 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { .when(maxComputeClient) .getLatestTableSchema(); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); verify(maxComputeClient, Mockito.times(1)) .createOrUpdateTable(Mockito.any()); @@ -72,11 +72,11 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchFieldException, IllegalAccessException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); when(protoMessageParser.getDescriptorMap()).thenReturn(newDescriptor); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -86,17 +86,17 @@ public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchF MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( - maxComputeSchemaBuilder, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - Field field = MaxComputeSchemaCache.class.getDeclaredField("maxComputeSchema"); + Field field = ProtobufMaxComputeSchemaCache.class.getDeclaredField("maxComputeSchema"); field.setAccessible(true); - field.set(maxComputeSchemaCache, mockedMaxComputeSchema); + field.set(protobufMaxComputeSchemaCache, mockedMaxComputeSchema); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); verify(maxComputeClient, Mockito.times(0)) .createOrUpdateTable(Mockito.any()); @@ -107,10 +107,10 @@ public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchF public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -120,13 +120,13 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( - maxComputeSchemaBuilder, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -134,8 +134,8 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { .when(maxComputeClient) .getLatestTableSchema(); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); verify(maxComputeClient, Mockito.times(2)) .createOrUpdateTable(Mockito.any()); @@ -145,10 +145,10 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -158,13 +158,13 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( - maxComputeSchemaBuilder, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -172,8 +172,8 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep .when(maxComputeClient) .getLatestTableSchema(); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); verify(maxComputeClient, Mockito.times(2)) .createOrUpdateTable(Mockito.any()); @@ -183,10 +183,10 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep public void shouldThrowMaxComputeTableOperationExceptionWhenUpsertIsFailing() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -196,19 +196,19 @@ public void shouldThrowMaxComputeTableOperationExceptionWhenUpsertIsFailing() th MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( - maxComputeSchemaBuilder, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); doThrow(new OdpsException("Invalid schema")) .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java index 434f6532..36448595 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java @@ -8,7 +8,7 @@ import com.aliyun.odps.type.TypeInfoFactory; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import org.junit.Test; import org.mockito.Mockito; @@ -66,10 +66,10 @@ public void shouldReturnValidPartitionSpec() { .build()) .withPartitionColumn(partitionColumn) .build(); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = Mockito.mock(MaxComputeSchema.class); when(maxComputeSchema.getTableSchema()).thenReturn(tableSchema); - when(maxComputeSchemaCache.getMaxComputeSchema()) + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()) .thenReturn(maxComputeSchema); String expectedStartOfDayEpoch = "2024-10-28"; Record record = new ArrayRecord(tableSchema); @@ -94,7 +94,7 @@ public void shouldEmptyPartitionSpecIfObjectIsNotRecord() { public void shouldReturnDefaultPartitionSpec() { String expectedPartitionSpecStringRepresentation = "tablePartitionColumnName='__NULL__'"; TimestampPartitioningStrategy timestampPartitioningStrategy = new TimestampPartitioningStrategy(getMaxComputeSinkConfig()); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = Mockito.mock(MaxComputeSchema.class); Column partitionColumn = Column.newBuilder("tablePartitionColumnName", TypeInfoFactory.STRING) .build(); @@ -105,7 +105,7 @@ public void shouldReturnDefaultPartitionSpec() { .withPartitionColumn(partitionColumn) .build(); when(maxComputeSchema.getTableSchema()).thenReturn(tableSchema); - when(maxComputeSchemaCache.getMaxComputeSchema()) + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()) .thenReturn(maxComputeSchema); Record record = new ArrayRecord(tableSchema); record.set("str", "strVal");