diff --git a/connectors/flink/src/main/scala/io/delta/flink/internal/KernelSnapshotWrapper.java b/connectors/flink/src/main/scala/io/delta/flink/internal/KernelSnapshotWrapper.java index ba1342132bd..eafd8f03632 100644 --- a/connectors/flink/src/main/scala/io/delta/flink/internal/KernelSnapshotWrapper.java +++ b/connectors/flink/src/main/scala/io/delta/flink/internal/KernelSnapshotWrapper.java @@ -19,11 +19,9 @@ package io.delta.standalone.internal; import java.lang.reflect.Constructor; -import java.util.ArrayList; import java.util.List; import java.util.Optional; -import io.delta.kernel.data.ColumnVector; import io.delta.kernel.internal.types.DataTypeJsonSerDe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,11 +138,7 @@ private Metadata convertMetadata() { ); // Convert the partition columns from a ColumnVector to a List - ColumnVector partitionsVec = kernelMetadata.getPartitionColumns().getElements(); - ArrayList partitionColumns = new ArrayList(partitionsVec.getSize()); - for(int i = 0; i < partitionsVec.getSize(); i++) { - partitionColumns.add(partitionsVec.getString(i)); - } + final List partitionColumns = kernelMetadata.getPartitionColumns(); // Convert over the schema StructType List kernelFields = kernelMetadata.getSchema().fields(); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/actions/AbstractMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/actions/AbstractMetadata.java new file mode 100644 index 00000000000..3a724f24102 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/actions/AbstractMetadata.java @@ -0,0 +1,60 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.actions; + +import io.delta.kernel.annotation.Evolving; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Interface for Metadata actions in Delta. + * + *

See Metadata for + * more details. + */ +@Evolving +public interface AbstractMetadata { + + /** A unique table identifier. */ + String getId(); + + /** User-specified table identifier. */ + Optional getName(); + + /** User-specified table description. */ + Optional getDescription(); + + /** The table provider format. */ + String getProvider(); + + /** The format options */ + Map getFormatOptions(); + + /** The table schema in string representation. */ + String getSchemaString(); + + /** List of partition columns. */ + List getPartitionColumns(); + + /** The table properties defined on the table. */ + Map getConfiguration(); + + /** Timestamp for the creation of this metadata. */ + Optional getCreatedTime(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java index 96e93cd02e6..ffe56e3d7b3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java @@ -77,7 +77,7 @@ public ScanImpl( this.dataPath = dataPath; this.partitionColToStructFieldMap = () -> { - Set partitionColNames = metadata.getPartitionColNames(); + Set partitionColNames = metadata.getPartitionColumnsLowercaseSet(); return metadata.getSchema().fields().stream() .filter(field -> partitionColNames.contains(field.getName().toLowerCase(Locale.ROOT))) .collect(toMap(field -> field.getName().toLowerCase(Locale.ROOT), identity())); @@ -156,7 +156,7 @@ public Row getScanState(Engine engine) { // Compute the physical data read schema, basically the list of columns to read // from a Parquet data file. It should exclude partition columns and include // row_index metadata columns (in case DVs are present) - List partitionColumns = VectorUtils.toJavaList(metadata.getPartitionColumns()); + List partitionColumns = metadata.getPartitionColumns(); StructType physicalDataReadSchema = PartitionUtils.physicalSchemaWithoutPartitionColumns( readSchema, /* logical read schema */ @@ -185,7 +185,7 @@ private Optional> splitFilters(Optional return filter.map( predicate -> PartitionUtils.splitMetadataAndDataPredicates( - predicate, metadata.getPartitionColNames())); + predicate, metadata.getPartitionColumnsLowercaseSet())); } private Optional getDataFilters() { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 01e3fe45167..89241833b13 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -31,7 +31,6 @@ import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.snapshot.LogSegment; -import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructType; import java.util.List; @@ -134,7 +133,7 @@ public Protocol getProtocol() { } public List getPartitionColumnNames(Engine engine) { - return VectorUtils.toJavaList(getMetadata().getPartitionColumns()); + return getMetadata().getPartitionColumns(); } public SnapshotReport getSnapshotReport() { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 3f9d7706f81..8b3e4a41bfe 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -109,7 +109,7 @@ public Row getTransactionState(Engine engine) { @Override public List getPartitionColumns(Engine engine) { - return VectorUtils.toJavaList(metadata.getPartitionColumns()); + return metadata.getPartitionColumns(); } @Override @@ -402,7 +402,7 @@ private boolean isReadyForCheckpoint(long newVersion) { private Map getOperationParameters() { if (isNewTable) { - List partitionCols = VectorUtils.toJavaList(metadata.getPartitionColumns()); + List partitionCols = metadata.getPartitionColumns(); String partitionBy = partitionCols.stream() .map(col -> "\"" + col + "\"") diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java index dac74b4f8a4..c9b55f67887 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/Metadata.java @@ -19,25 +19,37 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import io.delta.kernel.actions.AbstractMetadata; import io.delta.kernel.data.*; import io.delta.kernel.internal.data.GenericRow; import io.delta.kernel.internal.lang.Lazy; import io.delta.kernel.internal.types.DataTypeJsonSerDe; +import io.delta.kernel.internal.util.InternalUtils; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.*; import java.util.*; import java.util.stream.Collectors; -public class Metadata { +public class Metadata implements AbstractMetadata { + ////////////////////////////////// + // Static variables and methods // + ////////////////////////////////// + + /** + * Creates a {@link Metadata} object from the given {@link ColumnVector}. + * + *

This method should always be used after a {@code metadataVector.isNullAt} check to ensure + * that the column vector is not null at the given row ID. This method never returns null. + * + * @throws IllegalArgumentException if the column vector is null at the given row ID + */ public static Metadata fromColumnVector(ColumnVector vector, int rowId) { - if (vector.isNullAt(rowId)) { - return null; - } + InternalUtils.requireNonNull(vector, rowId, "metaData"); final String schemaJson = requireNonNull(vector.getChild(4), rowId, "schemaString").getString(rowId); - StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaJson); + final StructType schema = DataTypeJsonSerDe.deserializeStructType(schemaJson); return new Metadata( requireNonNull(vector.getChild(0), rowId, "id").getString(rowId), @@ -71,18 +83,23 @@ public static Metadata fromColumnVector(ColumnVector vector, int rowId) { new MapType(StringType.STRING, StringType.STRING, false), false /* nullable */); + ////////////////////////////////// + // Member variables and methods // + ////////////////////////////////// + private final String id; private final Optional name; private final Optional description; private final Format format; private final String schemaString; private final StructType schema; - private final ArrayValue partitionColumns; + private final ArrayValue partitionColumnsArrayValue; private final Optional createdTime; private final MapValue configurationMapValue; private final Lazy> configuration; + private final Lazy> partitionColumns; // Partition column names in lower case. - private final Lazy> partitionColNames; + private final Lazy> partitionColumnsLowercaseSet; // Logical data schema excluding partition columns private final Lazy dataSchema; @@ -93,7 +110,7 @@ public Metadata( Format format, String schemaString, StructType schema, - ArrayValue partitionColumns, + ArrayValue partitionColumnsArrayValue, Optional createdTime, MapValue configurationMapValue) { this.id = requireNonNull(id, "id is null"); @@ -102,11 +119,18 @@ public Metadata( this.format = requireNonNull(format, "format is null"); this.schemaString = requireNonNull(schemaString, "schemaString is null"); this.schema = schema; - this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.partitionColumnsArrayValue = + requireNonNull(partitionColumnsArrayValue, "partitionColumns is null"); this.createdTime = createdTime; this.configurationMapValue = requireNonNull(configurationMapValue, "configuration is null"); this.configuration = new Lazy<>(() -> VectorUtils.toJavaMap(configurationMapValue)); - this.partitionColNames = new Lazy<>(this::loadPartitionColNames); + this.partitionColumns = new Lazy<>(this::loadAndValidatePartitionColumnNames); + this.partitionColumnsLowercaseSet = + new Lazy<>( + () -> + partitionColumns.get().stream() + .map(partColName -> partColName.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet())); this.dataSchema = new Lazy<>( () -> @@ -114,124 +138,90 @@ public Metadata( schema.fields().stream() .filter( field -> - !partitionColNames + !partitionColumnsLowercaseSet .get() .contains(field.getName().toLowerCase(Locale.ROOT))) .collect(Collectors.toList()))); } - public Metadata withNewConfiguration(Map configuration) { - Map newConfiguration = new HashMap<>(getConfiguration()); - newConfiguration.putAll(configuration); - return new Metadata( - this.id, - this.name, - this.description, - this.format, - this.schemaString, - this.schema, - this.partitionColumns, - this.createdTime, - VectorUtils.stringStringMapValue(newConfiguration)); + //////////////////////////////// + // AbstractMetadata overrides // + //////////////////////////////// + + @Override + public String getId() { + return id; } - public Metadata withNewSchema(StructType schema) { - return new Metadata( - this.id, - this.name, - this.description, - this.format, - schema.toJson(), - schema, - this.partitionColumns, - this.createdTime, - this.configurationMapValue); + @Override + public Optional getName() { + return name; } @Override - public String toString() { - List partitionColumnsStr = VectorUtils.toJavaList(partitionColumns); - StringBuilder sb = new StringBuilder(); - sb.append("List("); - for (String partitionColumn : partitionColumnsStr) { - sb.append(partitionColumn).append(", "); - } - if (sb.substring(sb.length() - 2).equals(", ")) { - sb.setLength(sb.length() - 2); // Remove the last comma and space - } - sb.append(")"); - return "Metadata{" - + "id='" - + id - + '\'' - + ", name=" - + name - + ", description=" - + description - + ", format=" - + format - + ", schemaString='" - + schemaString - + '\'' - + ", partitionColumns=" - + sb - + ", createdTime=" - + createdTime - + ", configuration=" - + configuration.get() - + '}'; + public Optional getDescription() { + return description; + } + + @Override + public String getProvider() { + return format.getProvider(); + } + + @Override + public Map getFormatOptions() { + return format.getOptions(); } + @Override public String getSchemaString() { return schemaString; } - public StructType getSchema() { - return schema; + @Override + public List getPartitionColumns() { + return partitionColumns.get(); } - public ArrayValue getPartitionColumns() { - return partitionColumns; + @Override + public Map getConfiguration() { + return Collections.unmodifiableMap(configuration.get()); } - /** Set of lowercase partition column names */ - public Set getPartitionColNames() { - return partitionColNames.get(); + @Override + public Optional getCreatedTime() { + return createdTime; } - /** The logical data schema which excludes partition columns */ - public StructType getDataSchema() { - return dataSchema.get(); + /////////////////// + // Other methods // + /////////////////// + + public StructType getSchema() { + return schema; } - public String getId() { - return id; + public ArrayValue getPartitionColumnsArrayValue() { + return partitionColumnsArrayValue; } - public Optional getName() { - return name; + public Set getPartitionColumnsLowercaseSet() { + return partitionColumnsLowercaseSet.get(); } - public Optional getDescription() { - return description; + /** The logical data schema which excludes partition columns */ + public StructType getDataSchema() { + return dataSchema.get(); } public Format getFormat() { return format; } - public Optional getCreatedTime() { - return createdTime; - } - public MapValue getConfigurationMapValue() { return configurationMapValue; } - public Map getConfiguration() { - return Collections.unmodifiableMap(configuration.get()); - } - /** * Filter out the key-value pair matches exactly with the old properties. * @@ -254,32 +244,85 @@ public Map filterOutUnchangedProperties(Map newP * @return {@link Row} object with the schema {@link Metadata#FULL_SCHEMA} */ public Row toRow() { - Map metadataMap = new HashMap<>(); + final Map metadataMap = new HashMap<>(); metadataMap.put(0, id); metadataMap.put(1, name.orElse(null)); metadataMap.put(2, description.orElse(null)); metadataMap.put(3, format.toRow()); metadataMap.put(4, schemaString); - metadataMap.put(5, partitionColumns); + metadataMap.put(5, partitionColumnsArrayValue); metadataMap.put(6, createdTime.orElse(null)); metadataMap.put(7, configurationMapValue); return new GenericRow(Metadata.FULL_SCHEMA, metadataMap); } - /** Helper method to load the partition column names. */ - private Set loadPartitionColNames() { - ColumnVector partitionColNameVector = partitionColumns.getElements(); - Set partitionColumnNames = new HashSet<>(); - for (int i = 0; i < partitionColumns.getSize(); i++) { + public Metadata withNewConfiguration(Map configuration) { + final Map newConfiguration = new HashMap<>(getConfiguration()); + newConfiguration.putAll(configuration); + return new Metadata( + this.id, + this.name, + this.description, + this.format, + this.schemaString, + this.schema, + this.partitionColumnsArrayValue, + this.createdTime, + VectorUtils.stringStringMapValue(newConfiguration)); + } + + public Metadata withNewSchema(StructType schema) { + return new Metadata( + this.id, + this.name, + this.description, + this.format, + schema.toJson(), + schema, + this.partitionColumnsArrayValue, + this.createdTime, + this.configurationMapValue); + } + + @Override + public String toString() { + final String partColsStr = "List(" + String.join(", ", getPartitionColumns()) + ")"; + return "Metadata{" + + "id='" + + id + + '\'' + + ", name=" + + name + + ", description=" + + description + + ", format=" + + format + + ", schemaString='" + + schemaString + + '\'' + + ", partitionColumns=" + + partColsStr + + ", createdTime=" + + createdTime + + ", configuration=" + + configuration.get() + + '}'; + } + + /** Validates that partition column names are non-null and non-empty. */ + private List loadAndValidatePartitionColumnNames() { + final ColumnVector partitionColNameVector = partitionColumnsArrayValue.getElements(); + final List partitionColumnNames = new ArrayList<>(); + for (int i = 0; i < partitionColumnsArrayValue.getSize(); i++) { checkArgument( !partitionColNameVector.isNullAt(i), "Expected a non-null partition column name"); - String partitionColName = partitionColNameVector.getString(i); + final String partitionColName = partitionColNameVector.getString(i); checkArgument( partitionColName != null && !partitionColName.isEmpty(), "Expected non-null and non-empty partition column name"); - partitionColumnNames.add(partitionColName.toLowerCase(Locale.ROOT)); + partitionColumnNames.add(partitionColName); } - return Collections.unmodifiableSet(partitionColumnNames); + return Collections.unmodifiableList(partitionColumnNames); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java index 20f0f975ab8..dfcb86e35f0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/ScanStateRow.java @@ -61,7 +61,8 @@ public static ScanStateRow of( valueMap.put(COL_NAME_TO_ORDINAL.get("physicalSchemaString"), readSchemaPhysicalJson); valueMap.put( COL_NAME_TO_ORDINAL.get("physicalDataReadSchemaString"), readPhysicalDataSchemaJson); - valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns()); + valueMap.put( + COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumnsArrayValue()); valueMap.put(COL_NAME_TO_ORDINAL.get("minReaderVersion"), protocol.getMinReaderVersion()); valueMap.put(COL_NAME_TO_ORDINAL.get("minWriterVersion"), protocol.getMinWriterVersion()); valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java index 11c50a2784f..4fab7f3403a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/data/TransactionStateRow.java @@ -46,7 +46,8 @@ public class TransactionStateRow extends GenericRow { public static TransactionStateRow of(Metadata metadata, String tablePath) { HashMap valueMap = new HashMap<>(); valueMap.put(COL_NAME_TO_ORDINAL.get("logicalSchemaString"), metadata.getSchemaString()); - valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns()); + valueMap.put( + COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumnsArrayValue()); valueMap.put(COL_NAME_TO_ORDINAL.get("configuration"), metadata.getConfigurationMapValue()); valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath); return new TransactionStateRow(valueMap);