|
| 1 | +/* |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, software |
| 13 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + * See the License for the specific language governing permissions and |
| 16 | + * limitations under the License. |
| 17 | + */ |
| 18 | + |
| 19 | +package org.apache.xtable.service; |
| 20 | + |
| 21 | +import static org.apache.xtable.model.storage.TableFormat.DELTA; |
| 22 | +import static org.apache.xtable.model.storage.TableFormat.HUDI; |
| 23 | +import static org.apache.xtable.model.storage.TableFormat.ICEBERG; |
| 24 | + |
| 25 | +import java.util.ArrayList; |
| 26 | +import java.util.Collections; |
| 27 | +import java.util.List; |
| 28 | + |
| 29 | +import org.apache.commons.lang3.tuple.Pair; |
| 30 | +import org.apache.hadoop.conf.Configuration; |
| 31 | +import org.apache.hudi.common.table.timeline.HoodieInstant; |
| 32 | + |
| 33 | +import org.apache.iceberg.BaseTable; |
| 34 | +import org.apache.iceberg.SchemaParser; |
| 35 | +import org.apache.iceberg.Snapshot; |
| 36 | + |
| 37 | +import org.apache.iceberg.Table; |
| 38 | +import org.apache.iceberg.TableMetadata; |
| 39 | +import org.apache.iceberg.TableOperations; |
| 40 | +import org.apache.iceberg.hadoop.HadoopTables; |
| 41 | +import org.apache.xtable.conversion.ConversionConfig; |
| 42 | +import org.apache.xtable.conversion.ConversionController; |
| 43 | +import org.apache.xtable.conversion.ConversionSourceProvider; |
| 44 | +import org.apache.xtable.conversion.SourceTable; |
| 45 | +import org.apache.xtable.conversion.TargetTable; |
| 46 | +import org.apache.xtable.delta.DeltaConversionSourceProvider; |
| 47 | +import org.apache.xtable.hudi.HudiConversionSourceProvider; |
| 48 | +import org.apache.xtable.iceberg.IcebergConversionSourceProvider; |
| 49 | +import org.apache.xtable.service.models.ConvertTableRequest; |
| 50 | +import org.apache.xtable.service.models.ConvertTableResponse; |
| 51 | +import org.apache.xtable.service.models.InternalTable; |
| 52 | +import org.apache.xtable.service.spark.SparkHolder; |
| 53 | + |
| 54 | +import jakarta.enterprise.context.ApplicationScoped; |
| 55 | +import jakarta.inject.Inject; |
| 56 | + |
| 57 | +@ApplicationScoped |
| 58 | +public class ConversionService { |
| 59 | + @Inject SparkHolder sparkHolder; |
| 60 | + |
| 61 | + public ConvertTableResponse runSync(ConvertTableRequest request) { |
| 62 | + ConversionController conversionController = |
| 63 | + new ConversionController(sparkHolder.jsc().hadoopConfiguration()); |
| 64 | + SourceTable sourceTable = |
| 65 | + SourceTable.builder() |
| 66 | + .name(request.getSourceTableName()) |
| 67 | + .basePath(request.getSourceTablePath()) |
| 68 | + .formatName(request.getSourceFormat()) |
| 69 | + .build(); |
| 70 | + |
| 71 | + List<TargetTable> targetTables = new ArrayList<>(); |
| 72 | + for (String targetFormat : request.getTargetFormats()) { |
| 73 | + TargetTable targetTable = |
| 74 | + TargetTable.builder() |
| 75 | + .name(request.getSourceTableName()) |
| 76 | + .basePath(request.getSourceTablePath()) |
| 77 | + .formatName(targetFormat) |
| 78 | + .build(); |
| 79 | + targetTables.add(targetTable); |
| 80 | + } |
| 81 | + ConversionConfig conversionConfig = |
| 82 | + ConversionConfig.builder() |
| 83 | + .sourceTable(sourceTable) |
| 84 | + .targetTables(targetTables) |
| 85 | + .build(); |
| 86 | + ConversionSourceProvider<?> conversionSourceProvider = |
| 87 | + getConversionSourceProvider(request.getSourceFormat()); |
| 88 | + conversionController.sync(conversionConfig, conversionSourceProvider); |
| 89 | + |
| 90 | + Pair<String, String> responseFields = getIcebergSchemaAndMetadataPath(request.getSourceTablePath(), sparkHolder.jsc().hadoopConfiguration()); |
| 91 | + |
| 92 | + InternalTable internalTable = |
| 93 | + new InternalTable( |
| 94 | + "ICEBERG", |
| 95 | + responseFields.getLeft(), responseFields.getRight()); |
| 96 | + return new ConvertTableResponse(Collections.singletonList(internalTable)); |
| 97 | + } |
| 98 | + |
| 99 | + private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTableFormat) { |
| 100 | + if (sourceTableFormat.equalsIgnoreCase(HUDI)) { |
| 101 | + ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider = |
| 102 | + new HudiConversionSourceProvider(); |
| 103 | + hudiConversionSourceProvider.init(sparkHolder.jsc().hadoopConfiguration()); |
| 104 | + return hudiConversionSourceProvider; |
| 105 | + } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) { |
| 106 | + ConversionSourceProvider<Long> deltaConversionSourceProvider = |
| 107 | + new DeltaConversionSourceProvider(); |
| 108 | + deltaConversionSourceProvider.init(sparkHolder.jsc().hadoopConfiguration()); |
| 109 | + return deltaConversionSourceProvider; |
| 110 | + } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) { |
| 111 | + ConversionSourceProvider<Snapshot> icebergConversionSourceProvider = |
| 112 | + new IcebergConversionSourceProvider(); |
| 113 | + icebergConversionSourceProvider.init(sparkHolder.jsc().hadoopConfiguration()); |
| 114 | + return icebergConversionSourceProvider; |
| 115 | + } else { |
| 116 | + throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); |
| 117 | + } |
| 118 | + } |
| 119 | + |
| 120 | + public static Pair<String, String> getIcebergSchemaAndMetadataPath(String tableLocation, Configuration conf) { |
| 121 | + HadoopTables tables = new HadoopTables(conf); |
| 122 | + Table table = tables.load(tableLocation); |
| 123 | + TableOperations ops = ((BaseTable) table).operations(); |
| 124 | + TableMetadata current = ops.current(); |
| 125 | + return Pair.of(current.metadataFileLocation(), SchemaParser.toJson(current.schema())); |
| 126 | + } |
| 127 | + |
| 128 | +} |
0 commit comments