diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index bda983a6c170..f1223705c11d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -46,6 +46,7 @@ public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperatio public static final String TABLE_TYPE_PROP = "table_type"; public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; public static final String METADATA_LOCATION_PROP = "metadata_location"; + public static final String METADATA_HASH_PROP = "metadata_hash"; public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; private static final String METADATA_FOLDER_NAME = "metadata"; diff --git a/core/src/main/java/org/apache/iceberg/util/HashWriter.java b/core/src/main/java/org/apache/iceberg/util/HashWriter.java new file mode 100644 index 000000000000..2d0668fe8283 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/HashWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.IOException; +import java.io.Writer; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * {@link java.io.Writer} implementation that uses a hashing function to produce a hash value based + * on the streamed bytes. The output of the writer is not preserved. + */ +public class HashWriter extends Writer { + + private final MessageDigest digest; + private final CharsetEncoder encoder; + private boolean isClosed = false; + + public HashWriter(String hashAlgorithm, Charset charset) throws NoSuchAlgorithmException { + this.digest = MessageDigest.getInstance(hashAlgorithm); + this.encoder = charset.newEncoder(); + } + + @Override + public void write(char[] cbuf, int off, int len) throws IOException { + ensureNotClosed(); + CharBuffer chars = CharBuffer.wrap(cbuf, off, len); + ByteBuffer byteBuffer = encoder.encode(chars); + digest.update(byteBuffer); + } + + @Override + public void flush() throws IOException {} + + @Override + public void close() throws IOException { + isClosed = true; + } + + /** + * Calculates the final hash value. The underlying digest will be reset thus subsequent getHash() + * calls are not permitted. + * + * @return bytes of final hash value + */ + public byte[] getHash() { + ensureNotClosed(); + isClosed = true; + return digest.digest(); + } + + private void ensureNotClosed() { + if (isClosed) { + throw new IllegalStateException("HashWriter is closed."); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestHashWriter.java b/core/src/test/java/org/apache/iceberg/util/TestHashWriter.java new file mode 100644 index 000000000000..a1c32e996054 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestHashWriter.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.fasterxml.jackson.core.JsonGenerator; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.Map; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class TestHashWriter { + + @Test + public void testIncrementalHashCalculation() throws Exception { + HashWriter hashWriter = spy(new HashWriter("SHA-256", StandardCharsets.UTF_8)); + + // Create large enough TableMetadata which will be serialized into JSON in multiple chunks by + // the JSON generator + Map icebergTblProperties = Maps.newHashMap(); + for (int i = 0; i < 300; ++i) { + icebergTblProperties.put("Property Key " + i, "Property Value " + i); + } + Schema schema = new Schema(Types.NestedField.required(1, "col1", Types.StringType.get())); + TableMetadata tableMetadata = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, icebergTblProperties); + + JsonGenerator generator = JsonUtil.factory().createGenerator(hashWriter); + TableMetadataParser.toJson(tableMetadata, generator); + + // Expecting to see 3 write() invocations (and therefore incremental hash calculations) + verify(hashWriter, times(3)).write(any(char[].class), anyInt(), anyInt()); + + // +1 after flushing + generator.flush(); + verify(hashWriter, times(4)).write(any(char[].class), anyInt(), anyInt()); + + // Expected hash is calculated on the whole object i.e. without streaming + byte[] expectedHash = + MessageDigest.getInstance("SHA-256") + .digest(TableMetadataParser.toJson(tableMetadata).getBytes(StandardCharsets.UTF_8)); + assertThat(hashWriter.getHash()).isEqualTo(expectedHash); + assertThatThrownBy(() -> hashWriter.getHash()).hasMessageContaining("HashWriter is closed."); + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java index 0a177a7190d2..a50210bfc5db 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HMSTablePropertyHelper.java @@ -20,7 +20,13 @@ import static org.apache.iceberg.TableProperties.GC_ENABLED; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Base64; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -36,10 +42,12 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.SortOrderParser; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; import org.apache.iceberg.TableProperties; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.HashWriter; import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.view.ViewMetadata; import org.slf4j.Logger; @@ -101,6 +109,7 @@ public static void updateHmsTableForIcebergTable( metadata.schema(), maxHiveTablePropertySize); setStorageHandler(parameters, hiveEngineEnabled); + setMetadataHash(metadata, parameters); // Set the basic statistics if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) { @@ -256,6 +265,41 @@ static void setSchema( } } + @VisibleForTesting + static void setMetadataHash(TableMetadata metadata, Map parameters) { + if (parameters.containsKey(TableProperties.ENCRYPTION_TABLE_KEY)) { + byte[] currentHashBytes = hashOf(metadata); + parameters.put( + BaseMetastoreTableOperations.METADATA_HASH_PROP, + Base64.getEncoder().encodeToString(currentHashBytes)); + } + } + + @VisibleForTesting + static void verifyMetadataHash(TableMetadata metadata, String metadataHashFromHMS) { + byte[] currentHashBytes = hashOf(metadata); + byte[] expectedHashBytes = Base64.getDecoder().decode(metadataHashFromHMS); + + if (!Arrays.equals(expectedHashBytes, currentHashBytes)) { + throw new RuntimeException( + String.format( + "The current metadata file %s might have been modified. Hash of metadata loaded from storage differs " + + "from HMS-stored metadata hash.", + metadata.metadataFileLocation())); + } + } + + private static byte[] hashOf(TableMetadata tableMetadata) { + try (HashWriter hashWriter = new HashWriter("SHA-256", StandardCharsets.UTF_8)) { + JsonGenerator generator = JsonUtil.factory().createGenerator(hashWriter); + TableMetadataParser.toJson(tableMetadata, generator); + generator.flush(); + return hashWriter.getHash(); + } catch (NoSuchAlgorithmException | IOException e) { + throw new RuntimeException("Unable to produce hash of table metadata", e); + } + } + private static void setField( Map parameters, String key, String value, long maxHiveTablePropertySize) { if (value.length() <= maxHiveTablePropertySize) { diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index 4d382f8d388e..62340fe40138 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -174,6 +175,7 @@ protected void doRefresh() { String metadataLocation = null; String tableKeyIdFromHMS = null; String dekLengthFromHMS = null; + String metadataHashFromHMS = null; try { Table table = metaClients.run(client -> client.getTable(database, tableName)); @@ -189,6 +191,7 @@ the table key parameter (along with existing snapshots) in the file, making the produce unencrypted files. Table key ID is taken directly from HMS catalog */ tableKeyIdFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_TABLE_KEY); dekLengthFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_DEK_LENGTH); + metadataHashFromHMS = table.getParameters().get(METADATA_HASH_PROP); } catch (NoSuchObjectException e) { if (currentMetadataLocation() != null) { throw new NoSuchTableException("No such table: %s.%s", database, tableName); @@ -207,7 +210,7 @@ the table key parameter (along with existing snapshots) in the file, making the refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries); if (tableKeyIdFromHMS != null) { - checkEncryptionProperties(tableKeyIdFromHMS, dekLengthFromHMS); + checkIntegrityForEncryption(tableKeyIdFromHMS, dekLengthFromHMS, metadataHashFromHMS); tableKeyId = tableKeyIdFromHMS; encryptionDekLength = @@ -245,6 +248,7 @@ the table key parameter (along with existing snapshots) in the file, making the @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { boolean newTable = base == null; + final TableMetadata tableMetadata; encryptionPropsFromMetadata(metadata.properties()); String newMetadataLocation; @@ -257,19 +261,21 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { builder.addEncryptionKey(entry.getValue()); } - newMetadataLocation = writeNewMetadataIfRequired(newTable, builder.build()); + tableMetadata = builder.build(); } else { - newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata); + tableMetadata = metadata; } - boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf); + newMetadataLocation = writeNewMetadataIfRequired(newTable, tableMetadata); + + boolean hiveEngineEnabled = hiveEngineEnabled(tableMetadata, conf); boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false); BaseMetastoreOperations.CommitStatus commitStatus = BaseMetastoreOperations.CommitStatus.FAILURE; boolean updateHiveTable = false; - HiveLock lock = lockObject(base != null ? base : metadata); + HiveLock lock = lockObject(base != null ? base : tableMetadata); try { lock.lock(); @@ -293,14 +299,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } else { tbl = newHmsTable( - metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); + tableMetadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser())); LOG.debug("Committing new table: {}", fullName); } tbl.setSd( HiveOperationsBase.storageDescriptor( - metadata.schema(), - metadata.location(), + tableMetadata.schema(), + tableMetadata.location(), hiveEngineEnabled)); // set to pickup any schema changes String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP); @@ -316,7 +322,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { if (base != null) { removedProps = base.properties().keySet().stream() - .filter(key -> !metadata.properties().containsKey(key)) + .filter(key -> !tableMetadata.properties().containsKey(key)) .collect(Collectors.toSet()); } @@ -334,7 +340,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { HMSTablePropertyHelper.updateHmsTableForIcebergTable( newMetadataLocation, tbl, - metadata, + tableMetadata, removedProps, hiveEngineEnabled, maxHiveTablePropertySize, @@ -391,7 +397,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { // issue for example, and triggers this exception. So we need double-check to make sure // this is really a concurrent modification. Hitting this exception means no pending // requests, if any, can succeed later, so it's safe to check status in strict mode - commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata); + commitStatus = checkCommitStatusStrict(newMetadataLocation, tableMetadata); if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) { throw new CommitFailedException( e, "The table %s.%s has been modified concurrently", database, tableName); @@ -402,7 +408,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { database, tableName, e); - commitStatus = checkCommitStatus(newMetadataLocation, metadata); + commitStatus = checkCommitStatus(newMetadataLocation, tableMetadata); } switch (commitStatus) { @@ -574,8 +580,20 @@ private void encryptionPropsFromMetadata(Map tableProperties) { } } - private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String dekLengthFromHMS) { - Map propertiesFromMetadata = current().properties(); + private void checkIntegrityForEncryption( + String encryptionKeyIdFromHMS, String dekLengthFromHMS, String metadataHashFromHMS) { + TableMetadata metadata = current(); + if (StringUtils.isNotEmpty(metadataHashFromHMS)) { + HMSTablePropertyHelper.verifyMetadataHash(metadata, metadataHashFromHMS); + return; + } + + LOG.warn( + "Full metadata integrity check skipped because no metadata hash was recorded in HMS for table {}." + + " Falling back to encryption property based check.", + tableName); + + Map propertiesFromMetadata = metadata.properties(); String encryptionKeyIdFromMetadata = propertiesFromMetadata.get(TableProperties.ENCRYPTION_TABLE_KEY); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index 2bac6082854c..f212d307fe9d 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -26,6 +26,7 @@ import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_TIMESTAMP; import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC; import static org.apache.iceberg.TableProperties.DEFAULT_SORT_ORDER; +import static org.apache.iceberg.TableProperties.ENCRYPTION_TABLE_KEY; import static org.apache.iceberg.TableProperties.SNAPSHOT_COUNT; import static org.apache.iceberg.expressions.Expressions.bucket; import static org.apache.iceberg.types.Types.NestedField.required; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.BaseMetastoreTableOperations; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -1239,4 +1241,29 @@ public void testTableLocationWithTrailingSlashInDatabaseLocation() throws TExcep HIVE_METASTORE_EXTENSION.metastoreClient().dropDatabase(dbName); } } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMetadataHashing(boolean isTableEncrypted) { + Map hiveTblProperties = Maps.newHashMap(); + if (isTableEncrypted) { + hiveTblProperties.put(ENCRYPTION_TABLE_KEY, "key_id"); + } + + Schema schema = new Schema(Types.NestedField.required(1, "col1", Types.StringType.get())); + TableMetadata tableMetadata = + TableMetadata.newTableMetadata( + schema, PartitionSpec.unpartitioned(), null, ImmutableMap.of()); + + HMSTablePropertyHelper.setMetadataHash(tableMetadata, hiveTblProperties); + + String base64EncodedHash = + hiveTblProperties.get(BaseMetastoreTableOperations.METADATA_HASH_PROP); + if (isTableEncrypted) { + assertThat(base64EncodedHash).isBase64(); + HMSTablePropertyHelper.verifyMetadataHash(tableMetadata, base64EncodedHash); + } else { + assertThat(base64EncodedHash).isNull(); + } + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java index 8f0552a37877..f27aa6ec9d1e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -29,14 +29,20 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ChecksumFileSystem; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.Parameters; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; import org.apache.iceberg.Transaction; import org.apache.iceberg.encryption.Ciphers; import org.apache.iceberg.encryption.UnitestKMS; @@ -53,6 +59,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; +import org.mockito.internal.util.collections.Iterables; public class TestTableEncryption extends CatalogTestBase { private static Map appendCatalogEncryptionProperties(Map props) { @@ -162,6 +169,29 @@ public void testInsertAndDelete() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @TestTemplate + public void testMetadataTamperproofing() throws IOException { + ChecksumFileSystem fs = ((ChecksumFileSystem) FileSystem.newInstance(new Configuration())); + catalog.initialize(catalogName, catalogConfig); + + Table table = catalog.loadTable(tableIdent); + TableMetadata currentMetadata = ((HasTableOperations) table).operations().current(); + Path metadataFile = new Path(currentMetadata.metadataFileLocation()); + Path previousMetadataFile = new Path(Iterables.firstOf(currentMetadata.previousFiles()).file()); + + // manual FS tampering: replacing the current metadata file with a previous one + Path crcPath = fs.getChecksumFile(metadataFile); + fs.delete(crcPath, false); + fs.delete(metadataFile, false); + fs.rename(previousMetadataFile, metadataFile); + + assertThatThrownBy(() -> catalog.loadTable(tableIdent)) + .hasMessageContaining( + String.format( + "The current metadata file %s might have been modified. Hash of metadata loaded from storage differs from HMS-stored metadata hash.", + metadataFile)); + } + @TestTemplate public void testKeyDelete() { assertThatThrownBy(