Skip to content

Feat sink maxcompute json support #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/com/gotocompany/depot/SinkFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.gotocompany.depot;

public interface SinkFactory {
void init();
Sink create();
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package com.gotocompany.depot.config;

import com.aliyun.odps.Column;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.gotocompany.depot.common.TupleString;
import com.gotocompany.depot.config.converter.ConfToListConverter;
import com.gotocompany.depot.config.converter.LocalDateTimeConverter;
import com.gotocompany.depot.config.converter.MaxComputeDefaultColumnConverter;
import com.gotocompany.depot.config.converter.MaxComputeOdpsGlobalSettingsConverter;
import com.gotocompany.depot.config.converter.ZoneIdConverter;
import com.gotocompany.depot.maxcompute.enumeration.MaxComputeTimestampDataType;
import org.aeonbits.owner.Config;
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -149,4 +152,10 @@ public interface MaxComputeSinkConfig extends Config {
@DefaultValue("1")
int getMaxFutureYearEventTimeDifference();

@DefaultValue("")
@Key("SINK_MAXCOMPUTE_DEFAULT_COLUMNS")
@ConverterClass(MaxComputeDefaultColumnConverter.class)
@Separator(",")
List<Column> getDefaultColumns();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.gotocompany.depot.config.converter;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.type.TypeInfoFactory;
import com.google.common.collect.ImmutableSet;
import org.aeonbits.owner.Converter;

import java.lang.reflect.Method;
import java.util.Set;

import static com.aliyun.odps.OdpsType.*;

public class MaxComputeDefaultColumnConverter implements Converter<Column> {

private static final Set<OdpsType> PRIMITIVE_TYPES = ImmutableSet.of(
BIGINT, DOUBLE, BOOLEAN, DATETIME, STRING, DECIMAL,
ARRAY, TINYINT, SMALLINT, INT, FLOAT, DATE, TIMESTAMP,
BINARY, TIMESTAMP_NTZ);

private static final String NAME_TYPE_SEPARATOR = ":";
private static final int COLUMN_NAME_INDEX = 0;
private static final int COLUMN_TYPE_INDEX = 1;

@Override
public Column convert(Method method, String s) {
String[] parts = s.split(NAME_TYPE_SEPARATOR);
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid column format: " + s);
}
String name = parts[COLUMN_NAME_INDEX];
OdpsType type = OdpsType.valueOf(parts[COLUMN_TYPE_INDEX].toUpperCase());
if (!PRIMITIVE_TYPES.contains(type)) {
throw new IllegalArgumentException("Unsupported column type: " + type);
}
return Column.newBuilder(name, TypeInfoFactory.getPrimitiveTypeInfo(type))
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.gotocompany.depot.maxcompute;

import com.gotocompany.depot.Sink;
import com.gotocompany.depot.SinkFactory;
import com.gotocompany.depot.config.SinkConfig;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.stencil.client.StencilClient;
import org.aeonbits.owner.ConfigFactory;

import java.util.Map;

public class JsonMaxComputeSinkFactory implements SinkFactory {

private final StatsDReporter statsDReporter;
private final StencilClient stencilClient;
private final SinkConfig sinkConfig;

public JsonMaxComputeSinkFactory(StatsDReporter statsDReporter,
StencilClient stencilClient,
Map<String, String> env) {
this.statsDReporter = statsDReporter;
this.stencilClient = stencilClient;
this.sinkConfig = ConfigFactory.create(SinkConfig.class, env);
}

@Override
public void init() {

}

@Override
public Sink create() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import com.google.protobuf.Descriptors;
import com.gotocompany.depot.Sink;
import com.gotocompany.depot.SinkFactory;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.config.SinkConfig;
import com.gotocompany.depot.maxcompute.client.MaxComputeClient;
import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator;
import com.gotocompany.depot.maxcompute.converter.record.ProtoMessageRecordConverter;
import com.gotocompany.depot.maxcompute.record.RecordDecorator;
import com.gotocompany.depot.maxcompute.record.RecordDecoratorFactory;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCacheFactory;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategyFactory;
Expand All @@ -24,7 +25,7 @@

import java.util.Map;

public class MaxComputeSinkFactory {
public class ProtobufMaxComputeSinkFactory implements SinkFactory {

private final MaxComputeSinkConfig maxComputeSinkConfig;
private final SinkConfig sinkConfig;
Expand All @@ -35,13 +36,13 @@ public class MaxComputeSinkFactory {
private final MaxComputeClient maxComputeClient;
private final MetadataUtil metadataUtil;

private MaxComputeSchemaCache maxComputeSchemaCache;
private ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache;
private PartitioningStrategy partitioningStrategy;
private MessageParser messageParser;

public MaxComputeSinkFactory(StatsDReporter statsDReporter,
StencilClient stencilClient,
Map<String, String> env) {
public ProtobufMaxComputeSinkFactory(StatsDReporter statsDReporter,
StencilClient stencilClient,
Map<String, String> env) {
this.statsDReporter = statsDReporter;
this.maxComputeSinkConfig = ConfigFactory.create(MaxComputeSinkConfig.class, env);
this.sinkConfig = ConfigFactory.create(SinkConfig.class, env);
Expand All @@ -52,22 +53,24 @@ public MaxComputeSinkFactory(StatsDReporter statsDReporter,
this.metadataUtil = new MetadataUtil(maxComputeSinkConfig);
}

@Override
public void init() {
Descriptors.Descriptor descriptor = stencilClient.get(getProtoSchemaClassName(sinkConfig));
this.partitioningStrategy = PartitioningStrategyFactory.createPartitioningStrategy(protobufConverterOrchestrator, maxComputeSinkConfig, descriptor);
this.maxComputeSchemaCache = MaxComputeSchemaCacheFactory.createMaxComputeSchemaCache(protobufConverterOrchestrator,
this.protobufMaxComputeSchemaCache = MaxComputeSchemaCacheFactory.createProtobufMaxComputeSchemaCache(protobufConverterOrchestrator,
maxComputeSinkConfig, partitioningStrategy, sinkConfig, maxComputeClient, metadataUtil);
this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, maxComputeSchemaCache);
maxComputeSchemaCache.setMessageParser(messageParser);
maxComputeSchemaCache.updateSchema();
this.messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter, protobufMaxComputeSchemaCache);
protobufMaxComputeSchemaCache.setMessageParser(messageParser);
protobufMaxComputeSchemaCache.updateSchema();
}

@Override
public Sink create() {
RecordDecorator recordDecorator = RecordDecoratorFactory.createRecordDecorator(
new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, maxComputeSchemaCache, messageParser,
RecordDecorator recordDecorator = RecordDecoratorFactory.createProtobufRecordDecorator(
new RecordDecoratorFactory.RecordDecoratorConfig(protobufConverterOrchestrator, protobufMaxComputeSchemaCache, messageParser,
partitioningStrategy, maxComputeSinkConfig, sinkConfig, statsDReporter, maxComputeMetrics, metadataUtil)
);
ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, maxComputeSchemaCache);
ProtoMessageRecordConverter protoMessageRecordConverter = new ProtoMessageRecordConverter(recordDecorator, protobufMaxComputeSchemaCache);
return new MaxComputeSink(maxComputeClient.createInsertManager(), protoMessageRecordConverter, statsDReporter, maxComputeMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import com.gotocompany.depot.maxcompute.model.RecordWrappers;
import com.gotocompany.depot.maxcompute.record.RecordDecorator;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache;
import com.gotocompany.depot.message.Message;
import lombok.RequiredArgsConstructor;

Expand All @@ -25,7 +25,7 @@
public class ProtoMessageRecordConverter implements MessageRecordConverter {

private final RecordDecorator recordDecorator;
private final MaxComputeSchemaCache maxComputeSchemaCache;
private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache;

/**
* Converts a list of messages to RecordWrappers.
Expand All @@ -36,7 +36,7 @@ public class ProtoMessageRecordConverter implements MessageRecordConverter {
*/
@Override
public RecordWrappers convert(List<Message> messages) {
MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema();
MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema();
RecordWrappers recordWrappers = new RecordWrappers();
IntStream.range(0, messages.size())
.forEach(index -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.model.MaxComputeSchema;
import com.gotocompany.depot.maxcompute.model.RecordWrapper;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.util.MetadataUtil;
import com.gotocompany.depot.message.Message;

Expand All @@ -25,18 +25,18 @@
*/
public class ProtoMetadataColumnRecordDecorator extends RecordDecorator {

private final MaxComputeSchemaCache maxComputeSchemaCache;
private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache;
private final Map<String, String> metadataTypePairs;
private final String maxcomputeMetadataNamespace;
private final List<TupleString> metadataColumnsTypes;
private final MetadataUtil metadataUtil;

public ProtoMetadataColumnRecordDecorator(RecordDecorator recordDecorator,
MaxComputeSinkConfig maxComputeSinkConfig,
MaxComputeSchemaCache maxComputeSchemaCache,
ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache,
MetadataUtil metadataUtil) {
super(recordDecorator);
this.maxComputeSchemaCache = maxComputeSchemaCache;
this.protobufMaxComputeSchemaCache = protobufMaxComputeSchemaCache;
this.metadataUtil = metadataUtil;
this.metadataTypePairs = maxComputeSinkConfig.getMetadataColumnsTypes()
.stream()
Expand Down Expand Up @@ -66,7 +66,7 @@ public RecordWrapper process(RecordWrapper recordWrapper, Message message) throw

private void appendNamespacedMetadata(Record record, Message message) {
Map<String, Object> metadata = message.getMetadata(metadataColumnsTypes);
MaxComputeSchema maxComputeSchema = maxComputeSchemaCache.getMaxComputeSchema();
MaxComputeSchema maxComputeSchema = protobufMaxComputeSchemaCache.getMaxComputeSchema();
StructTypeInfo typeInfo = (StructTypeInfo) maxComputeSchema.getTableSchema()
.getColumn(maxcomputeMetadataNamespace)
.getTypeInfo();
Expand All @@ -80,7 +80,7 @@ private void appendNamespacedMetadata(Record record, Message message) {

private void appendMetadata(Record record, Message message) {
Map<String, Object> metadata = message.getMetadata(metadataColumnsTypes);
for (Map.Entry<String, TypeInfo> entry : maxComputeSchemaCache.getMaxComputeSchema()
for (Map.Entry<String, TypeInfo> entry : protobufMaxComputeSchemaCache.getMaxComputeSchema()
.getMetadataColumns()
.entrySet()) {
Object value = metadata.get(entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.config.SinkConfig;
import com.gotocompany.depot.maxcompute.converter.ProtobufConverterOrchestrator;
import com.gotocompany.depot.maxcompute.schema.MaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.ProtobufMaxComputeSchemaCache;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import com.gotocompany.depot.maxcompute.util.MetadataUtil;
import com.gotocompany.depot.message.MessageParser;
Expand All @@ -20,7 +20,7 @@ public class RecordDecoratorFactory {
* @param recordDecoratorConfig record decorator configuration
* @return record decorator
*/
public static RecordDecorator createRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) {
public static RecordDecorator createProtobufRecordDecorator(RecordDecoratorConfig recordDecoratorConfig) {
RecordDecorator dataColumnRecordDecorator = new ProtoDataColumnRecordDecorator(null,
recordDecoratorConfig.protobufConverterOrchestrator,
recordDecoratorConfig.messageParser,
Expand All @@ -32,13 +32,13 @@ public static RecordDecorator createRecordDecorator(RecordDecoratorConfig record
return dataColumnRecordDecorator;
}
return new ProtoMetadataColumnRecordDecorator(dataColumnRecordDecorator, recordDecoratorConfig.maxComputeSinkConfig,
recordDecoratorConfig.maxComputeSchemaCache, recordDecoratorConfig.metadataUtil);
recordDecoratorConfig.protobufMaxComputeSchemaCache, recordDecoratorConfig.metadataUtil);
}

@RequiredArgsConstructor
public static class RecordDecoratorConfig {
private final ProtobufConverterOrchestrator protobufConverterOrchestrator;
private final MaxComputeSchemaCache maxComputeSchemaCache;
private final ProtobufMaxComputeSchemaCache protobufMaxComputeSchemaCache;
private final MessageParser messageParser;
private final PartitioningStrategy partitioningStrategy;
private final MaxComputeSinkConfig maxComputeSinkConfig;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.gotocompany.depot.maxcompute.schema;

import com.aliyun.odps.Column;
import com.aliyun.odps.TableSchema;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.model.MaxComputeSchema;
import com.gotocompany.depot.maxcompute.schema.partition.PartitioningStrategy;
import com.gotocompany.depot.maxcompute.util.MetadataUtil;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@RequiredArgsConstructor
public class JsonMaxComputeSchemaBuilder {

private final MaxComputeSinkConfig maxComputeSinkConfig;
private final PartitioningStrategy partitioningStrategy;
private final MetadataUtil metadataUtil;

public MaxComputeSchema build() {
List<Column> metadataColumns = buildMetadataColumns();
TableSchema.Builder tableSchemaBuilder = com.aliyun.odps.TableSchema.builder()
.withColumns(metadataColumns)
.withColumns(buildDataColumns());
if (maxComputeSinkConfig.isTablePartitioningEnabled()) {
tableSchemaBuilder.withPartitionColumn(buildPartitionColumn());
}
return new MaxComputeSchema(tableSchemaBuilder.build(), metadataColumns.stream().collect(Collectors.toMap(Column::getName, Column::getTypeInfo)));
}

private Column buildPartitionColumn() {
return partitioningStrategy.getPartitionColumn();
}

private List<Column> buildDataColumns() {
return maxComputeSinkConfig.getDefaultColumns()
.stream()
.filter(col -> {
if (!maxComputeSinkConfig.isTablePartitioningEnabled() || !col.getName().equals(maxComputeSinkConfig.getTablePartitionKey())) {
return true;
}
return !partitioningStrategy.shouldReplaceOriginalColumn();
}).collect(Collectors.toList());
}

private List<Column> buildMetadataColumns() {
if (!maxComputeSinkConfig.shouldAddMetadata()) {
return new ArrayList<>();
}
if (StringUtils.isNotBlank(maxComputeSinkConfig.getMaxcomputeMetadataNamespace())) {
throw new IllegalArgumentException("Maxcompute metadata namespace is not supported for JSON schema");
}
return maxComputeSinkConfig.getMetadataColumnsTypes()
.stream()
.map(tuple -> Column.newBuilder(tuple.getFirst(), metadataUtil.getMetadataTypeInfo(tuple.getSecond())).build())
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.gotocompany.depot.maxcompute.schema;

import com.aliyun.odps.TableSchema;
import com.gotocompany.depot.config.MaxComputeSinkConfig;
import com.gotocompany.depot.maxcompute.client.MaxComputeClient;
import com.gotocompany.depot.stencil.DepotStencilUpdateListener;
import lombok.RequiredArgsConstructor;


@RequiredArgsConstructor
public class JsonMaxComputeSchemaCache extends DepotStencilUpdateListener {

private final MaxComputeSinkConfig maxComputeSinkConfig;
private final JsonMaxComputeSchemaBuilder jsonMaxComputeSchemaBuilder;
private final MaxComputeClient maxComputeClient;

@Override
public void updateSchema() {
TableSchema remoteSchema = maxComputeClient.getLatestTableSchema();

}

}
Loading