From c416775f000f0ecac2806ab49df6d7266b60b64e Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Tue, 14 Jan 2025 12:59:44 +0700 Subject: [PATCH 1/4] refactor: Rename MaxComputeSchemaCache.java to ProtobufMaxComputeSchemaCache --- .../maxcompute/MaxComputeSinkFactory.java | 16 ++++---- .../record/ProtoMessageRecordConverter.java | 6 +-- .../ProtoMetadataColumnRecordDecorator.java | 12 +++--- .../record/RecordDecoratorFactory.java | 6 +-- .../schema/MaxComputeSchemaCacheFactory.java | 4 +- ...ava => ProtobufMaxComputeSchemaCache.java} | 10 ++--- .../ProtoMessageRecordConverterTest.java | 18 ++++----- .../ProtoDataColumnRecordDecoratorTest.java | 6 +-- ...rotoMetadataColumnRecordDecoratorTest.java | 14 +++---- ...=> ProtobufMaxComputeSchemaCacheTest.java} | 40 +++++++++---------- .../TimestampPartitioningStrategyTest.java | 10 ++--- 11 files changed, 71 insertions(+), 71 deletions(-) rename src/main/java/com/gotocompany/depot/maxcompute/schema/{MaxComputeSchemaCache.java => ProtobufMaxComputeSchemaCache.java} (90%) rename src/test/java/com/gotocompany/depot/maxcompute/schema/{MaxComputeSchemaCacheTest.java => ProtobufMaxComputeSchemaCacheTest.java} (85%) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java index be2900b6..6e5a1fa2 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java @@ -9,7 +9,7 @@ import com.gotocompany.depot.maxcompute.converter.record.ProtoMessageRecordConverter; import com.gotocompany.depot.maxcompute.record.RecordDecorator; import com.gotocompany.depot.maxcompute.record.RecordDecoratorFactory; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCacheFactory; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; @@ -35,7 +35,7 @@ public class MaxComputeSinkFactory { private final MaxComputeClient maxComputeClient; private final MetadataUtil metadataUtil; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private PartitioningStrategy partitioningStrategy; private MessageParser messageParser; @@ -55,19 +55,19 @@ public MaxComputeSinkFactory(StatsDReporter statsDReporter, public void init() { Descriptors.Descriptor descriptor = stencilClient.get(getProtoSchemaClassName(sinkConfig)); this.partitioningStrategy = PartitioningStrategyFactory.createPartitioningStrategy(protobufConverterOrchestrator, maxComputeSinkConfig, descriptor); - this.maxComputeSchemaCache = MaxComputeSchemaCacheFactory.createMaxComputeSchemaCache(protobufConverterOrchestrator, + this.protobufMaxComputeSchemaCache = MaxComputeSchemaCacheFactory.createMaxComputeSchemaCache(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, sinkConfig, maxComputeClient, metadataUtil); - this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, maxComputeSchemaCache); - maxComputeSchemaCache.setMessageParser(messageParser); - maxComputeSchemaCache.updateSchema(); + this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, protobufMaxComputeSchemaCache); + protobufMaxComputeSchemaCache.setMessageParser(messageParser); + protobufMaxComputeSchemaCache.updateSchema(); } public Sink create() { RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( - new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, maxComputeSchemaCache, messageParser, + new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, protobufMaxComputeSchemaCache, messageParser, partitioningStrategy, maxComputeSinkConfig, sinkConfig, statsDReporter, maxComputeMetrics, metadataUtil) ); - ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); return new MaxComputeSink(maxComputeClient.createInsertManager(), protoMessageRecordConverter, statsDReporter, maxComputeMetrics); } diff --git a/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java b/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java index 5b182bce..8172478e 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverter.java @@ -10,7 +10,7 @@ import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.model.RecordWrappers; import com.gotocompany.depot.maxcompute.record.RecordDecorator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.message.Message; import lombok.RequiredArgsConstructor; @@ -25,7 +25,7 @@ public class ProtoMessageRecordConverter implements MessageRecordConverter { private final RecordDecorator recordDecorator; - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; /** * Converts a list of messages to RecordWrappers. @@ -36,7 +36,7 @@ public class ProtoMessageRecordConverter implements MessageRecordConverter { */ @Override public RecordWrappers convert(List messages) { - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); RecordWrappers recordWrappers = new RecordWrappers(); IntStream.range(0, messages.size()) .forEach(index -> { diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java index 4322eb11..d98e8f5f 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecorator.java @@ -9,7 +9,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.Message; @@ -25,7 +25,7 @@ */ public class ProtoMetadataColumnRecordDecorator extends RecordDecorator { - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private final Map metadataTypePairs; private final String maxcomputeMetadataNamespace; private final List metadataColumnsTypes; @@ -33,10 +33,10 @@ public class ProtoMetadataColumnRecordDecorator extends RecordDecorator { public ProtoMetadataColumnRecordDecorator(RecordDecorator recordDecorator, MaxComputeSinkConfig maxComputeSinkConfig, - MaxComputeSchemaCache maxComputeSchemaCache, + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache, MetadataUtil metadataUtil) { super(recordDecorator); - this.maxComputeSchemaCache = maxComputeSchemaCache; + this.protobufMaxComputeSchemaCache = protobufMaxComputeSchemaCache; this.metadataUtil = metadataUtil; this.metadataTypePairs = maxComputeSinkConfig.getMetadataColumnsTypes() .stream() @@ -66,7 +66,7 @@ public RecordWrapper process(RecordWrapper recordWrapper, Message message) throw private void appendNamespacedMetadata(Record record, Message message) { Map metadata = message.getMetadata(metadataColumnsTypes); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); StructTypeInfo typeInfo = (StructTypeInfo) maxComputeSchema.getTableSchema() .getColumn(maxcomputeMetadataNamespace) .getTypeInfo(); @@ -80,7 +80,7 @@ private void appendNamespacedMetadata(Record record, Message message) { private void appendMetadata(Record record, Message message) { Map metadata = message.getMetadata(metadataColumnsTypes); - for (Map.Entry entry : maxComputeSchemaCache.getMaxComputeSchema() + for (Map.Entry entry : protobufMaxComputeSchemaCache.getMaxComputeSchema() .getMetadataColumns() .entrySet()) { Object value = metadata.get(entry.getKey()); diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java index f785ba25..5a8dcbc2 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java @@ -3,7 +3,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.MessageParser; @@ -32,13 +32,13 @@ public static RecordDecorator createRecordDecorator(RecordDecoratorConfig record return dataColumnRecordDecorator; } return new ProtoMetadataColumnRecordDecorator(dataColumnRecordDecorator, recordDecoratorConfig.maxComputeSinkConfig, - recordDecoratorConfig.maxComputeSchemaCache, recordDecoratorConfig.metadataUtil); + recordDecoratorConfig.protobufMaxComputeSchemaCache, recordDecoratorConfig.metadataUtil); } @RequiredArgsConstructor public static class RecordDecoratorConfig { private final ProtobufConverterOrchestrator protobufConverterOrchestrator; - private final MaxComputeSchemaCache maxComputeSchemaCache; + private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private final MessageParser messageParser; private final PartitioningStrategy partitioningStrategy; private final MaxComputeSinkConfig maxComputeSinkConfig; diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java index 871c0066..043a2675 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java @@ -9,7 +9,7 @@ public class MaxComputeSchemaCacheFactory { - public static MaxComputeSchemaCache createMaxComputeSchemaCache( + public static ProtobufMaxComputeSchemaCache createMaxComputeSchemaCache( ProtobufConverterOrchestrator protobufConverterOrchestrator, MaxComputeSinkConfig maxComputeSinkConfig, PartitioningStrategy partitioningStrategy, @@ -17,7 +17,7 @@ public static MaxComputeSchemaCache createMaxComputeSchemaCache( MaxComputeClient maxComputeClient, MetadataUtil metadataUtil ) { - return new MaxComputeSchemaCache( + return new ProtobufMaxComputeSchemaCache( new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil), sinkConfig, protobufConverterOrchestrator, maxComputeClient diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java similarity index 90% rename from src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java rename to src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java index a98ebc65..53e9560b 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCache.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java @@ -20,7 +20,7 @@ * It also caches the MaxCompute schema. */ @Slf4j -public class MaxComputeSchemaCache extends DepotStencilUpdateListener { +public class ProtobufMaxComputeSchemaCache extends DepotStencilUpdateListener { private final MaxComputeSchemaBuilder maxComputeSchemaBuilder; private final SinkConfig sinkConfig; @@ -28,10 +28,10 @@ public class MaxComputeSchemaCache extends DepotStencilUpdateListener { private final MaxComputeClient maxComputeClient; private MaxComputeSchema maxComputeSchema; - public MaxComputeSchemaCache(MaxComputeSchemaBuilder maxComputeSchemaBuilder, - SinkConfig sinkConfig, - ProtobufConverterOrchestrator protobufConverterOrchestrator, - MaxComputeClient maxComputeClient) { + public ProtobufMaxComputeSchemaCache(MaxComputeSchemaBuilder maxComputeSchemaBuilder, + SinkConfig sinkConfig, + ProtobufConverterOrchestrator protobufConverterOrchestrator, + MaxComputeClient maxComputeClient) { this.maxComputeSchemaBuilder = maxComputeSchemaBuilder; this.sinkConfig = sinkConfig; this.protobufConverterOrchestrator = protobufConverterOrchestrator; diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java index 76b8d926..818f225d 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java @@ -22,7 +22,7 @@ import com.gotocompany.depot.maxcompute.record.ProtoDataColumnRecordDecorator; import com.gotocompany.depot.maxcompute.record.ProtoMetadataColumnRecordDecorator; import com.gotocompany.depot.maxcompute.record.RecordDecorator; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory; import com.gotocompany.depot.maxcompute.util.MetadataUtil; @@ -59,7 +59,7 @@ public class ProtoMessageRecordConverterTest { private ProtoMessageParser protoMessageParser; private MaxComputeSchemaBuilder maxComputeSchemaBuilder; private SinkConfig sinkConfig; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private ProtoMessageRecordConverter protoMessageRecordConverter; @Before @@ -98,9 +98,9 @@ public void setup() throws IOException { ); MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil); - maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); + protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); Instrumentation instrumentation = Mockito.mock(Instrumentation.class); Mockito.doNothing().when(instrumentation) .captureDurationSince(Mockito.any(), Mockito.any()); @@ -108,8 +108,8 @@ public void setup() throws IOException { RecordDecorator protoDataColumnRecordDecorator = new ProtoDataColumnRecordDecorator(null, protobufConverterOrchestrator, protoMessageParser, sinkConfig, partitioningStrategy, Mockito.mock(StatsDReporter.class), maxComputeMetrics); - RecordDecorator metadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(protoDataColumnRecordDecorator, maxComputeSinkConfig, maxComputeSchemaCache, metadataUtil); - protoMessageRecordConverter = new ProtoMessageRecordConverter(metadataColumnRecordDecorator, maxComputeSchemaCache); + RecordDecorator metadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(protoDataColumnRecordDecorator, maxComputeSinkConfig, protobufMaxComputeSchemaCache, metadataUtil); + protoMessageRecordConverter = new ProtoMessageRecordConverter(metadataColumnRecordDecorator, protobufMaxComputeSchemaCache); } @Test @@ -168,7 +168,7 @@ public void shouldReturnRecordWrapperWithDeserializationErrorWhenIOExceptionIsTh RecordDecorator recordDecorator = Mockito.mock(RecordDecorator.class); Mockito.doThrow(new IOException()).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), @@ -194,7 +194,7 @@ public void shouldReturnRecordWrapperWithUnknownFieldsErrorWhenUnknownFieldExcep com.google.protobuf.Message mockedMessage = getMockedMessage(); Mockito.doThrow(new UnknownFieldsException(mockedMessage)).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), @@ -220,7 +220,7 @@ public void shouldReturnRecordWrapperWithInvalidMessageErrorWhenInvalidMessageEx String invalidMessage = "Invalid message"; Mockito.doThrow(new InvalidMessageException(invalidMessage)).when(recordDecorator) .decorate(Mockito.any(), Mockito.any()); - ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache); + ProtoMessageRecordConverter recordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache); Message message = new Message( null, getMockedMessage().toByteArray(), diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java index 82726358..e1a77b4b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java @@ -18,7 +18,7 @@ import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.schema.partition.DefaultPartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; import com.gotocompany.depot.maxcompute.schema.partition.TimestampPartitioningStrategy; @@ -310,8 +310,8 @@ private void instantiateProtoDataColumnRecordDecorator(SinkConfig sinkConfig, Ma new MetadataUtil(maxComputeSinkConfig) ); MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); ParsedMessage parsedMessage = Mockito.mock(ParsedMessage.class); when(parsedMessage.getRaw()).thenReturn(mockedMessage); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java index 1ed9989d..839127a1 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java @@ -15,7 +15,7 @@ import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import com.gotocompany.depot.maxcompute.util.MetadataUtil; import com.gotocompany.depot.message.Message; import com.gotocompany.depot.message.proto.ProtoParsedMessage; @@ -37,7 +37,7 @@ public class ProtoMetadataColumnRecordDecoratorTest { private final Descriptors.Descriptor descriptor = TestMaxComputeRecord.MaxComputeRecord.getDescriptor(); private MaxComputeSinkConfig maxComputeSinkConfig; - private MaxComputeSchemaCache maxComputeSchemaCache; + private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private ProtoMetadataColumnRecordDecorator protoMetadataColumnRecordDecorator; @Before @@ -65,7 +65,7 @@ public void shouldPopulateRecordWithNamespacedMetadata() throws IOException { new Tuple<>("__kafka_topic", "topic"), new Tuple<>("__kafka_offset", 100L) ); - Record record = new ArrayRecord(maxComputeSchemaCache.getMaxComputeSchema().getColumns()); + Record record = new ArrayRecord(protobufMaxComputeSchemaCache.getMaxComputeSchema().getColumns()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); LocalDateTime expectedLocalDateTime = Instant.ofEpochMilli(10002010L) .atZone(ZoneId.of("UTC")) @@ -101,7 +101,7 @@ public void shouldPopulateRecordWithNonNamespacedMetadata() throws IOException { new Tuple<>("__kafka_topic", "topic"), new Tuple<>("__kafka_offset", 100L) ); - Record record = new ArrayRecord(maxComputeSchemaCache.getMaxComputeSchema().getColumns()); + Record record = new ArrayRecord(protobufMaxComputeSchemaCache.getMaxComputeSchema().getColumns()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); LocalDateTime expectedLocalDateTime = Instant.ofEpochMilli(10002010L) .atZone(ZoneId.of("UTC")) @@ -147,8 +147,8 @@ private void initializeDecorator(MaxComputeSinkConfig sinkConfig) { MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, sinkConfig, null, metadataUtil); MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); - maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); - when(maxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); - protoMetadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(null, sinkConfig, maxComputeSchemaCache, metadataUtil); + protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); + protoMetadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(null, sinkConfig, protobufMaxComputeSchemaCache, metadataUtil); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java similarity index 85% rename from src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java rename to src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java index e1126b48..3f35a86a 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java @@ -24,7 +24,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class MaxComputeSchemaCacheTest { +public class ProtobufMaxComputeSchemaCacheTest { @Test public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { @@ -46,13 +46,13 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( maxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -61,7 +61,7 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { .when(maxComputeClient) .getLatestTableSchema(); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); verify(maxComputeClient, Mockito.times(1)) .createOrUpdateTable(Mockito.any()); @@ -86,17 +86,17 @@ public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchF MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( maxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - Field field = MaxComputeSchemaCache.class.getDeclaredField("maxComputeSchema"); + Field field = ProtobufMaxComputeSchemaCache.class.getDeclaredField("maxComputeSchema"); field.setAccessible(true); - field.set(maxComputeSchemaCache, mockedMaxComputeSchema); + field.set(protobufMaxComputeSchemaCache, mockedMaxComputeSchema); - MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema(); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema(); verify(maxComputeClient, Mockito.times(0)) .createOrUpdateTable(Mockito.any()); @@ -120,13 +120,13 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( maxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -134,8 +134,8 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { .when(maxComputeClient) .getLatestTableSchema(); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); verify(maxComputeClient, Mockito.times(2)) .createOrUpdateTable(Mockito.any()); @@ -158,13 +158,13 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( maxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); Mockito.doNothing() .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); @@ -172,8 +172,8 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep .when(maxComputeClient) .getLatestTableSchema(); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); verify(maxComputeClient, Mockito.times(2)) .createOrUpdateTable(Mockito.any()); @@ -196,19 +196,19 @@ public void shouldThrowMaxComputeTableOperationExceptionWhenUpsertIsFailing() th MaxComputeClient maxComputeClient = Mockito.spy(MaxComputeClient.class); MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); - MaxComputeSchemaCache maxComputeSchemaCache = new MaxComputeSchemaCache( + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( maxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient ); - maxComputeSchemaCache.setMessageParser(protoMessageParser); + protobufMaxComputeSchemaCache.setMessageParser(protoMessageParser); doThrow(new OdpsException("Invalid schema")) .when(maxComputeClient) .createOrUpdateTable(Mockito.any()); - maxComputeSchemaCache.getMaxComputeSchema(); - maxComputeSchemaCache.onSchemaUpdate(newDescriptor); + protobufMaxComputeSchemaCache.getMaxComputeSchema(); + protobufMaxComputeSchemaCache.onSchemaUpdate(newDescriptor); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java index 434f6532..36448595 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/partition/TimestampPartitioningStrategyTest.java @@ -8,7 +8,7 @@ import com.aliyun.odps.type.TypeInfoFactory; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; import org.junit.Test; import org.mockito.Mockito; @@ -66,10 +66,10 @@ public void shouldReturnValidPartitionSpec() { .build()) .withPartitionColumn(partitionColumn) .build(); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = Mockito.mock(MaxComputeSchema.class); when(maxComputeSchema.getTableSchema()).thenReturn(tableSchema); - when(maxComputeSchemaCache.getMaxComputeSchema()) + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()) .thenReturn(maxComputeSchema); String expectedStartOfDayEpoch = "2024-10-28"; Record record = new ArrayRecord(tableSchema); @@ -94,7 +94,7 @@ public void shouldEmptyPartitionSpecIfObjectIsNotRecord() { public void shouldReturnDefaultPartitionSpec() { String expectedPartitionSpecStringRepresentation = "tablePartitionColumnName='__NULL__'"; TimestampPartitioningStrategy timestampPartitioningStrategy = new TimestampPartitioningStrategy(getMaxComputeSinkConfig()); - MaxComputeSchemaCache maxComputeSchemaCache = Mockito.mock(MaxComputeSchemaCache.class); + ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); MaxComputeSchema maxComputeSchema = Mockito.mock(MaxComputeSchema.class); Column partitionColumn = Column.newBuilder("tablePartitionColumnName", TypeInfoFactory.STRING) .build(); @@ -105,7 +105,7 @@ public void shouldReturnDefaultPartitionSpec() { .withPartitionColumn(partitionColumn) .build(); when(maxComputeSchema.getTableSchema()).thenReturn(tableSchema); - when(maxComputeSchemaCache.getMaxComputeSchema()) + when(protobufMaxComputeSchemaCache.getMaxComputeSchema()) .thenReturn(maxComputeSchema); Record record = new ArrayRecord(tableSchema); record.set("str", "strVal"); From 26780a88bc5c69eb8d405b91aa94d70173dc40a4 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Tue, 14 Jan 2025 13:01:06 +0700 Subject: [PATCH 2/4] refactor: Rename method name to createProtobufRecordDecorator --- .../depot/maxcompute/record/RecordDecoratorFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java index 5a8dcbc2..a31a9c85 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactory.java @@ -20,7 +20,7 @@ public class RecordDecoratorFactory { * @param recordDecoratorConfig record decorator configuration * @return record decorator */ - public static RecordDecorator createRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) { + public static RecordDecorator createProtobufRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) { RecordDecorator dataColumnRecordDecorator = new ProtoDataColumnRecordDecorator(null, recordDecoratorConfig.protobufConverterOrchestrator, recordDecoratorConfig.messageParser, From c4505894c95fd8d2b03a2e48c79c699912e72551 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Tue, 14 Jan 2025 13:01:45 +0700 Subject: [PATCH 3/4] refactor: Rename method name to createProtobufRecordDecorator --- .../gotocompany/depot/maxcompute/MaxComputeSinkFactory.java | 2 +- .../depot/maxcompute/record/RecordDecoratorFactoryTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java index 6e5a1fa2..86e79825 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java @@ -63,7 +63,7 @@ public void init() { } public Sink create() { - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, protobufMaxComputeSchemaCache, messageParser, partitioningStrategy, maxComputeSinkConfig, sinkConfig, statsDReporter, maxComputeMetrics, metadataUtil) ); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java index 9755e872..9ba7f88b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/RecordDecoratorFactoryTest.java @@ -27,7 +27,7 @@ public void shouldCreateDataRecordDecorator() { when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); when(sinkConfig.getSinkConnectorSchemaProtoMessageClass()).thenReturn("com.gotocompany.depot.message.Message"); - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig( null, null, null, null, maxComputeSinkConfig, sinkConfig, Mockito.mock(StatsDReporter.class), @@ -52,7 +52,7 @@ public void shouldCreateDataRecordDecoratorWithNamespaceDecorator() { when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); when(sinkConfig.getSinkConnectorSchemaProtoMessageClass()).thenReturn("com.gotocompany.depot.message.Message"); - RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator( + RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig( null, null, null, null, maxComputeSinkConfig, sinkConfig, Mockito.mock(StatsDReporter.class), From 73668f88177b76ec5b23db4fcc63b6b5e5799519 Mon Sep 17 00:00:00 2001 From: Eka Winata Date: Mon, 20 Jan 2025 13:57:28 +0700 Subject: [PATCH 4/4] temp commit --- .../com/gotocompany/depot/SinkFactory.java | 6 ++ .../depot/config/MaxComputeSinkConfig.java | 9 +++ .../MaxComputeDefaultColumnConverter.java | 40 ++++++++++++ .../maxcompute/JsonMaxComputeSinkFactory.java | 35 +++++++++++ ...ava => ProtobufMaxComputeSinkFactory.java} | 13 ++-- .../schema/JsonMaxComputeSchemaBuilder.java | 62 +++++++++++++++++++ .../schema/JsonMaxComputeSchemaCache.java | 23 +++++++ .../schema/MaxComputeSchemaCacheFactory.java | 4 +- ...a => ProtobufMaxComputeSchemaBuilder.java} | 2 +- .../schema/ProtobufMaxComputeSchemaCache.java | 8 +-- .../MaxComputeDefaultColumnConverterTest.java | 20 ++++++ .../ProtoMessageRecordConverterTest.java | 8 +-- .../ProtoDataColumnRecordDecoratorTest.java | 20 +++--- ...rotoMetadataColumnRecordDecoratorTest.java | 6 +- ... ProtobufMaxComputeSchemaBuilderTest.java} | 18 +++--- .../ProtobufMaxComputeSchemaCacheTest.java | 30 ++++----- 16 files changed, 251 insertions(+), 53 deletions(-) create mode 100644 src/main/java/com/gotocompany/depot/SinkFactory.java create mode 100644 src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java create mode 100644 src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java rename src/main/java/com/gotocompany/depot/maxcompute/{MaxComputeSinkFactory.java => ProtobufMaxComputeSinkFactory.java} (91%) create mode 100644 src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java create mode 100644 src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java rename src/main/java/com/gotocompany/depot/maxcompute/schema/{MaxComputeSchemaBuilder.java => ProtobufMaxComputeSchemaBuilder.java} (98%) create mode 100644 src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java rename src/test/java/com/gotocompany/depot/maxcompute/schema/{MaxComputeSchemaBuilderTest.java => ProtobufMaxComputeSchemaBuilderTest.java} (92%) diff --git a/src/main/java/com/gotocompany/depot/SinkFactory.java b/src/main/java/com/gotocompany/depot/SinkFactory.java new file mode 100644 index 00000000..ba9dbb7f --- /dev/null +++ b/src/main/java/com/gotocompany/depot/SinkFactory.java @@ -0,0 +1,6 @@ +package com.gotocompany.depot; + +public interface SinkFactory { + void init(); + Sink create(); +} diff --git a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java index 3352e411..0f05eab3 100644 --- a/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java +++ b/src/main/java/com/gotocompany/depot/config/MaxComputeSinkConfig.java @@ -1,13 +1,16 @@ package com.gotocompany.depot.config; +import com.aliyun.odps.Column; import com.aliyun.odps.tunnel.io.CompressOption; import com.gotocompany.depot.common.TupleString; import com.gotocompany.depot.config.converter.ConfToListConverter; import com.gotocompany.depot.config.converter.LocalDateTimeConverter; +import com.gotocompany.depot.config.converter.MaxComputeDefaultColumnConverter; import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter; import com.gotocompany.depot.config.converter.ZoneIdConverter; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; import org.aeonbits.owner.Config; +import org.apache.commons.lang3.StringUtils; import java.time.LocalDateTime; import java.time.ZoneId; @@ -149,4 +152,10 @@ public interface MaxComputeSinkConfig extends Config { @DefaultValue("1") int getMaxFutureYearEventTimeDifference(); + @DefaultValue("") + @Key("SINK_MAXCOMPUTE_DEFAULT_COLUMNS") + @ConverterClass(MaxComputeDefaultColumnConverter.class) + @Separator(",") + List getDefaultColumns(); + } diff --git a/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java b/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java new file mode 100644 index 00000000..6d3411f0 --- /dev/null +++ b/src/main/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverter.java @@ -0,0 +1,40 @@ +package com.gotocompany.depot.config.converter; + +import com.aliyun.odps.Column; +import com.aliyun.odps.OdpsType; +import com.aliyun.odps.type.TypeInfoFactory; +import com.google.common.collect.ImmutableSet; +import org.aeonbits.owner.Converter; + +import java.lang.reflect.Method; +import java.util.Set; + +import static com.aliyun.odps.OdpsType.*; + +public class MaxComputeDefaultColumnConverter implements Converter { + + private static final Set PRIMITIVE_TYPES = ImmutableSet.of( + BIGINT, DOUBLE, BOOLEAN, DATETIME, STRING, DECIMAL, + ARRAY, TINYINT, SMALLINT, INT, FLOAT, DATE, TIMESTAMP, + BINARY, TIMESTAMP_NTZ); + + private static final String NAME_TYPE_SEPARATOR = ":"; + private static final int COLUMN_NAME_INDEX = 0; + private static final int COLUMN_TYPE_INDEX = 1; + + @Override + public Column convert(Method method, String s) { + String[] parts = s.split(NAME_TYPE_SEPARATOR); + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid column format: " + s); + } + String name = parts[COLUMN_NAME_INDEX]; + OdpsType type = OdpsType.valueOf(parts[COLUMN_TYPE_INDEX].toUpperCase()); + if (!PRIMITIVE_TYPES.contains(type)) { + throw new IllegalArgumentException("Unsupported column type: " + type); + } + return Column.newBuilder(name, TypeInfoFactory.getPrimitiveTypeInfo(type)) + .build(); + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java new file mode 100644 index 00000000..4cb15bdc --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/JsonMaxComputeSinkFactory.java @@ -0,0 +1,35 @@ +package com.gotocompany.depot.maxcompute; + +import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkFactory; +import com.gotocompany.depot.config.SinkConfig; +import com.gotocompany.depot.metrics.StatsDReporter; +import com.gotocompany.stencil.client.StencilClient; +import org.aeonbits.owner.ConfigFactory; + +import java.util.Map; + +public class JsonMaxComputeSinkFactory implements SinkFactory { + + private final StatsDReporter statsDReporter; + private final StencilClient stencilClient; + private final SinkConfig sinkConfig; + + public JsonMaxComputeSinkFactory(StatsDReporter statsDReporter, + StencilClient stencilClient, + Map env) { + this.statsDReporter = statsDReporter; + this.stencilClient = stencilClient; + this.sinkConfig = ConfigFactory.create(SinkConfig.class, env); + } + + @Override + public void init() { + + } + + @Override + public Sink create() { + return null; + } +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java similarity index 91% rename from src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java rename to src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java index 86e79825..6092902c 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/MaxComputeSinkFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/ProtobufMaxComputeSinkFactory.java @@ -2,6 +2,7 @@ import com.google.protobuf.Descriptors; import com.gotocompany.depot.Sink; +import com.gotocompany.depot.SinkFactory; import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.client.MaxComputeClient; @@ -24,7 +25,7 @@ import java.util.Map; -public class MaxComputeSinkFactory { +public class ProtobufMaxComputeSinkFactory implements SinkFactory { private final MaxComputeSinkConfig maxComputeSinkConfig; private final SinkConfig sinkConfig; @@ -39,9 +40,9 @@ public class MaxComputeSinkFactory { private PartitioningStrategy partitioningStrategy; private MessageParser messageParser; - public MaxComputeSinkFactory(StatsDReporter statsDReporter, - StencilClient stencilClient, - Map env) { + public ProtobufMaxComputeSinkFactory(StatsDReporter statsDReporter, + StencilClient stencilClient, + Map env) { this.statsDReporter = statsDReporter; this.maxComputeSinkConfig = ConfigFactory.create(MaxComputeSinkConfig.class, env); this.sinkConfig = ConfigFactory.create(SinkConfig.class, env); @@ -52,16 +53,18 @@ public MaxComputeSinkFactory(StatsDReporter statsDReporter, this.metadataUtil = new MetadataUtil(maxComputeSinkConfig); } + @Override public void init() { Descriptors.Descriptor descriptor = stencilClient.get(getProtoSchemaClassName(sinkConfig)); this.partitioningStrategy = PartitioningStrategyFactory.createPartitioningStrategy(protobufConverterOrchestrator, maxComputeSinkConfig, descriptor); - this.protobufMaxComputeSchemaCache = MaxComputeSchemaCacheFactory.createMaxComputeSchemaCache(protobufConverterOrchestrator, + this.protobufMaxComputeSchemaCache = MaxComputeSchemaCacheFactory.createProtobufMaxComputeSchemaCache(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, sinkConfig, maxComputeClient, metadataUtil); this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, protobufMaxComputeSchemaCache); protobufMaxComputeSchemaCache.setMessageParser(messageParser); protobufMaxComputeSchemaCache.updateSchema(); } + @Override public Sink create() { RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator( new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, protobufMaxComputeSchemaCache, messageParser, diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java new file mode 100644 index 00000000..232cd0bf --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaBuilder.java @@ -0,0 +1,62 @@ +package com.gotocompany.depot.maxcompute.schema; + +import com.aliyun.odps.Column; +import com.aliyun.odps.TableSchema; +import com.gotocompany.depot.config.MaxComputeSinkConfig; +import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; +import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy; +import com.gotocompany.depot.maxcompute.util.MetadataUtil; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +@RequiredArgsConstructor +public class JsonMaxComputeSchemaBuilder { + + private final MaxComputeSinkConfig maxComputeSinkConfig; + private final PartitioningStrategy partitioningStrategy; + private final MetadataUtil metadataUtil; + + public MaxComputeSchema build() { + List metadataColumns = buildMetadataColumns(); + TableSchema.Builder tableSchemaBuilder = com.aliyun.odps.TableSchema.builder() + .withColumns(metadataColumns) + .withColumns(buildDataColumns()); + if (maxComputeSinkConfig.isTablePartitioningEnabled()) { + tableSchemaBuilder.withPartitionColumn(buildPartitionColumn()); + } + return new MaxComputeSchema(tableSchemaBuilder.build(), metadataColumns.stream().collect(Collectors.toMap(Column::getName, Column::getTypeInfo))); + } + + private Column buildPartitionColumn() { + return partitioningStrategy.getPartitionColumn(); + } + + private List buildDataColumns() { + return maxComputeSinkConfig.getDefaultColumns() + .stream() + .filter(col -> { + if (!maxComputeSinkConfig.isTablePartitioningEnabled() || !col.getName().equals(maxComputeSinkConfig.getTablePartitionKey())) { + return true; + } + return !partitioningStrategy.shouldReplaceOriginalColumn(); + }).collect(Collectors.toList()); + } + + private List buildMetadataColumns() { + if (!maxComputeSinkConfig.shouldAddMetadata()) { + return new ArrayList<>(); + } + if (StringUtils.isNotBlank(maxComputeSinkConfig.getMaxcomputeMetadataNamespace())) { + throw new IllegalArgumentException("Maxcompute metadata namespace is not supported for JSON schema"); + } + return maxComputeSinkConfig.getMetadataColumnsTypes() + .stream() + .map(tuple -> Column.newBuilder(tuple.getFirst(), metadataUtil.getMetadataTypeInfo(tuple.getSecond())).build()) + .collect(Collectors.toList()); + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java new file mode 100644 index 00000000..aeabce22 --- /dev/null +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/JsonMaxComputeSchemaCache.java @@ -0,0 +1,23 @@ +package com.gotocompany.depot.maxcompute.schema; + +import com.aliyun.odps.TableSchema; +import com.gotocompany.depot.config.MaxComputeSinkConfig; +import com.gotocompany.depot.maxcompute.client.MaxComputeClient; +import com.gotocompany.depot.stencil.DepotStencilUpdateListener; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class JsonMaxComputeSchemaCache extends DepotStencilUpdateListener { + + private final MaxComputeSinkConfig maxComputeSinkConfig; + private final JsonMaxComputeSchemaBuilder jsonMaxComputeSchemaBuilder; + private final MaxComputeClient maxComputeClient; + + @Override + public void updateSchema() { + TableSchema remoteSchema = maxComputeClient.getLatestTableSchema(); + + } + +} diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java index 043a2675..c437f487 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaCacheFactory.java @@ -9,7 +9,7 @@ public class MaxComputeSchemaCacheFactory { - public static ProtobufMaxComputeSchemaCache createMaxComputeSchemaCache( + public static ProtobufMaxComputeSchemaCache createProtobufMaxComputeSchemaCache( ProtobufConverterOrchestrator protobufConverterOrchestrator, MaxComputeSinkConfig maxComputeSinkConfig, PartitioningStrategy partitioningStrategy, @@ -18,7 +18,7 @@ public static ProtobufMaxComputeSchemaCache createMaxComputeSchemaCache( MetadataUtil metadataUtil ) { return new ProtobufMaxComputeSchemaCache( - new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil), + new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil), sinkConfig, protobufConverterOrchestrator, maxComputeClient ); diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java similarity index 98% rename from src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java rename to src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java index b3121204..39cb1400 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilder.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilder.java @@ -18,7 +18,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor -public class MaxComputeSchemaBuilder { +public class ProtobufMaxComputeSchemaBuilder { private final ProtobufConverterOrchestrator protobufConverterOrchestrator; private final MaxComputeSinkConfig maxComputeSinkConfig; diff --git a/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java index 53e9560b..7b0c2ab3 100644 --- a/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java +++ b/src/main/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCache.java @@ -22,17 +22,17 @@ @Slf4j public class ProtobufMaxComputeSchemaCache extends DepotStencilUpdateListener { - private final MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private final ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private final SinkConfig sinkConfig; private final ProtobufConverterOrchestrator protobufConverterOrchestrator; private final MaxComputeClient maxComputeClient; private MaxComputeSchema maxComputeSchema; - public ProtobufMaxComputeSchemaCache(MaxComputeSchemaBuilder maxComputeSchemaBuilder, + public ProtobufMaxComputeSchemaCache(ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder, SinkConfig sinkConfig, ProtobufConverterOrchestrator protobufConverterOrchestrator, MaxComputeClient maxComputeClient) { - this.maxComputeSchemaBuilder = maxComputeSchemaBuilder; + this.protobufMaxComputeSchemaBuilder = protobufMaxComputeSchemaBuilder; this.sinkConfig = sinkConfig; this.protobufConverterOrchestrator = protobufConverterOrchestrator; this.maxComputeClient = maxComputeClient; @@ -75,7 +75,7 @@ public synchronized void updateSchema() { } private void updateMaxComputeTableSchema(Descriptors.Descriptor descriptor) { - MaxComputeSchema localSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema localSchema = protobufMaxComputeSchemaBuilder.build(descriptor); protobufConverterOrchestrator.clearCache(); try { maxComputeClient.createOrUpdateTable(localSchema.getTableSchema()); diff --git a/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java b/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java new file mode 100644 index 00000000..e1e8cd03 --- /dev/null +++ b/src/test/java/com/gotocompany/depot/config/converter/MaxComputeDefaultColumnConverterTest.java @@ -0,0 +1,20 @@ +package com.gotocompany.depot.config.converter; + +import com.aliyun.odps.Column; +import org.junit.Test; + +import java.util.List; + +public class MaxComputeDefaultColumnConverterTest { + + private static final MaxComputeDefaultColumnConverter converter = new MaxComputeDefaultColumnConverter(); + + @Test + public void shouldParseComplexTableStatement() { + String input = "col1:STRING,col2:ARRAY,col3:STRUCT>>"; + + List columns = converter.convert(null, input); + + System.out.println(columns); + } +} diff --git a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java index 818f225d..8292f301 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/converter/record/ProtoMessageRecordConverterTest.java @@ -15,7 +15,7 @@ import com.gotocompany.depot.exception.UnknownFieldsException; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.model.RecordWrappers; @@ -57,7 +57,7 @@ public class ProtoMessageRecordConverterTest { private MaxComputeSinkConfig maxComputeSinkConfig; private ProtobufConverterOrchestrator protobufConverterOrchestrator; private ProtoMessageParser protoMessageParser; - private MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private SinkConfig sinkConfig; private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache; private ProtoMessageRecordConverter protoMessageRecordConverter; @@ -97,9 +97,9 @@ public void setup() throws IOException { descriptor ); MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); - maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil); + protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, metadataUtil); protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); Instrumentation instrumentation = Mockito.mock(Instrumentation.class); Mockito.doNothing().when(instrumentation) diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java index e1a77b4b..0f6f2ee7 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoDataColumnRecordDecoratorTest.java @@ -15,7 +15,7 @@ import com.gotocompany.depot.config.SinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; @@ -47,7 +47,7 @@ public class ProtoDataColumnRecordDecoratorTest { private static final Descriptors.Descriptor DESCRIPTOR = TestMaxComputeRecord.MaxComputeRecord.getDescriptor(); - private MaxComputeSchemaBuilder maxComputeSchemaBuilder; + private ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder; private ProtoDataColumnRecordDecorator protoDataColumnRecordDecorator; @Before @@ -69,7 +69,7 @@ public void setup() throws IOException { @Test public void decorateShouldProcessDataColumnToRecord() throws IOException { - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -110,7 +110,7 @@ public void decorateShouldProcessDataColumnToRecordAndOmitPartitionColumnIfParti PartitioningStrategy partitioningStrategy = new DefaultPartitioningStrategy(TypeInfoFactory.STRING, maxComputeSinkConfig); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -153,7 +153,7 @@ public void decorateShouldProcessDataColumnToRecordAndShouldNotOmitOriginalColum when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); PartitioningStrategy partitioningStrategy = new TimestampPartitioningStrategy(maxComputeSinkConfig); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -209,7 +209,7 @@ public void decorateShouldSetDefaultPartitioningSpecWhenProtoFieldNotExists() th )) .build(); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, null, partitioningStrategy, maxComputeRecord); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); Message message = new Message(null, maxComputeRecord.toByteArray()); @@ -233,7 +233,7 @@ public void decorateShouldSetDefaultPartitioningSpecWhenProtoFieldNotExists() th @Test public void decorateShouldPutDefaultPartitionSpec() throws IOException { - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -273,7 +273,7 @@ public void decorateShouldCallInjectedDecorator() throws IOException { SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()).thenReturn(SinkConnectorSchemaMessageMode.LOG_MESSAGE); instantiateProtoDataColumnRecordDecorator(sinkConfig, maxComputeSinkConfig, recordDecorator, null, getMockedMessage()); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); Record record = new ArrayRecord(maxComputeSchema.getTableSchema()); RecordWrapper recordWrapper = new RecordWrapper(record, 0, null, null); TestMaxComputeRecord.MaxComputeRecord maxComputeRecord = getMockedMessage(); @@ -303,13 +303,13 @@ private void instantiateProtoDataColumnRecordDecorator(SinkConfig sinkConfig, Ma PartitioningStrategy partitioningStrategy, com.google.protobuf.Message mockedMessage) throws IOException { ProtobufConverterOrchestrator protobufConverterOrchestrator = new ProtobufConverterOrchestrator(maxComputeSinkConfig); - maxComputeSchemaBuilder = new MaxComputeSchemaBuilder( + protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder( protobufConverterOrchestrator, maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig) ); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(DESCRIPTOR); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(DESCRIPTOR); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java index 839127a1..7096cd4d 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/record/ProtoMetadataColumnRecordDecoratorTest.java @@ -12,7 +12,7 @@ import com.gotocompany.depot.config.MaxComputeSinkConfig; import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator; import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType; -import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaBuilder; +import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaBuilder; import com.gotocompany.depot.maxcompute.model.MaxComputeSchema; import com.gotocompany.depot.maxcompute.model.RecordWrapper; import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache; @@ -145,8 +145,8 @@ private void initializeDecorator(MaxComputeSinkConfig sinkConfig) { this.maxComputeSinkConfig = sinkConfig; ProtobufConverterOrchestrator protobufConverterOrchestrator = new ProtobufConverterOrchestrator(sinkConfig); MetadataUtil metadataUtil = new MetadataUtil(maxComputeSinkConfig); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(protobufConverterOrchestrator, sinkConfig, null, metadataUtil); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(protobufConverterOrchestrator, sinkConfig, null, metadataUtil); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); protobufMaxComputeSchemaCache = Mockito.mock(ProtobufMaxComputeSchemaCache.class); when(protobufMaxComputeSchemaCache.getMaxComputeSchema()).thenReturn(maxComputeSchema); protoMetadataColumnRecordDecorator = new ProtoMetadataColumnRecordDecorator(null, sinkConfig, protobufMaxComputeSchemaCache, metadataUtil); diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java similarity index 92% rename from src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java rename to src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java index b2621e8a..d364fb1b 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/MaxComputeSchemaBuilderTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaBuilderTest.java @@ -22,7 +22,7 @@ import static org.mockito.Mockito.when; -public class MaxComputeSchemaBuilderTest { +public class ProtobufMaxComputeSchemaBuilderTest { private final Descriptors.Descriptor descriptor = TextMaxComputeTable.Table.getDescriptor(); @@ -49,12 +49,12 @@ public void shouldBuildPartitionedTableSchemaWithRootLevelMetadata() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); int expectedNonPartitionColumnCount = 7; int expectedPartitionColumnCount = 1; - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -109,11 +109,11 @@ public void shouldBuildPartitionedTableSchemaWithNestedMetadata() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder( + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder( new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -159,10 +159,10 @@ public void shouldBuildTableSchemaWithoutPartitionAndMeta() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - MaxComputeSchema maxComputeSchema = maxComputeSchemaBuilder.build(descriptor); + MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaBuilder.build(descriptor); assertThat(maxComputeSchema.getTableSchema().getColumns().size()).isEqualTo(expectedNonPartitionColumnCount); assertThat(maxComputeSchema.getTableSchema().getPartitionColumns().size()).isEqualTo(expectedPartitionColumnCount); @@ -206,10 +206,10 @@ public void shouldThrowIllegalArgumentExceptionWhenPartitionKeyIsNotFound() { maxComputeSinkConfig, descriptor ); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = new MaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = new ProtobufMaxComputeSchemaBuilder(new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeSinkConfig, partitioningStrategy, new MetadataUtil(maxComputeSinkConfig)); - maxComputeSchemaBuilder.build(descriptor); + protobufMaxComputeSchemaBuilder.build(descriptor); } } diff --git a/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java index 3f35a86a..3d6376b2 100644 --- a/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java +++ b/src/test/java/com/gotocompany/depot/maxcompute/schema/ProtobufMaxComputeSchemaCacheTest.java @@ -30,11 +30,11 @@ public class ProtobufMaxComputeSchemaCacheTest { public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); when(protoMessageParser.getDescriptorMap()).thenReturn(newDescriptor); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -47,7 +47,7 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( - maxComputeSchemaBuilder, + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient @@ -72,11 +72,11 @@ public void shouldBuildAndReturnMaxComputeSchema() throws OdpsException { public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchFieldException, IllegalAccessException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); when(protoMessageParser.getDescriptorMap()).thenReturn(newDescriptor); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -87,7 +87,7 @@ public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchF MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( - maxComputeSchemaBuilder, + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient @@ -107,10 +107,10 @@ public void shouldReturnMaxComputeSchemaIfExists() throws OdpsException, NoSuchF public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -121,7 +121,7 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( - maxComputeSchemaBuilder, + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient @@ -145,10 +145,10 @@ public void shouldUpdateSchemaBasedOnNewDescriptor() throws OdpsException { public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -159,7 +159,7 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( - maxComputeSchemaBuilder, + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient @@ -183,10 +183,10 @@ public void shouldUpdateSchemaUsingLogKeyBasedOnNewDescriptor() throws OdpsExcep public void shouldThrowMaxComputeTableOperationExceptionWhenUpsertIsFailing() throws OdpsException { Map newDescriptor = new HashMap<>(); newDescriptor.put("class", Mockito.mock(Descriptors.Descriptor.class)); - MaxComputeSchemaBuilder maxComputeSchemaBuilder = Mockito.mock(MaxComputeSchemaBuilder.class); + ProtobufMaxComputeSchemaBuilder protobufMaxComputeSchemaBuilder = Mockito.mock(ProtobufMaxComputeSchemaBuilder.class); ProtoMessageParser protoMessageParser = Mockito.mock(ProtoMessageParser.class); MaxComputeSchema mockedMaxComputeSchema = new MaxComputeSchema(null, null); - when(maxComputeSchemaBuilder.build(Mockito.any())) + when(protobufMaxComputeSchemaBuilder.build(Mockito.any())) .thenReturn(mockedMaxComputeSchema); SinkConfig sinkConfig = Mockito.mock(SinkConfig.class); when(sinkConfig.getSinkConnectorSchemaMessageMode()) @@ -197,7 +197,7 @@ public void shouldThrowMaxComputeTableOperationExceptionWhenUpsertIsFailing() th MaxComputeSinkConfig maxComputeSinkConfig = Mockito.mock(MaxComputeSinkConfig.class); when(maxComputeSinkConfig.getZoneId()).thenReturn(ZoneId.of("UTC")); ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache = new ProtobufMaxComputeSchemaCache( - maxComputeSchemaBuilder, + protobufMaxComputeSchemaBuilder, sinkConfig, new ProtobufConverterOrchestrator(maxComputeSinkConfig), maxComputeClient