Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperatio
public static final String TABLE_TYPE_PROP = "table_type";
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
public static final String METADATA_LOCATION_PROP = "metadata_location";
public static final String METADATA_HASH_PROP = "metadata_hash";
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";

private static final String METADATA_FOLDER_NAME = "metadata";
Expand Down
78 changes: 78 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/HashWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.io.IOException;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
* {@link java.io.Writer} implementation that uses a hashing function to produce a hash value based
* on the streamed bytes. The output of the writer is not preserved.
*/
public class HashWriter extends Writer {

private final MessageDigest digest;
private final CharsetEncoder encoder;
private boolean isClosed = false;

public HashWriter(String hashAlgorithm, Charset charset) throws NoSuchAlgorithmException {
this.digest = MessageDigest.getInstance(hashAlgorithm);
this.encoder = charset.newEncoder();
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
ensureNotClosed();
CharBuffer chars = CharBuffer.wrap(cbuf, off, len);
ByteBuffer byteBuffer = encoder.encode(chars);
digest.update(byteBuffer);
}

@Override
public void flush() throws IOException {}

@Override
public void close() throws IOException {
isClosed = true;
}

/**
* Calculates the final hash value. The underlying digest will be reset thus subsequent getHash()
* calls are not permitted.
*
* @return bytes of final hash value
*/
public byte[] getHash() {
ensureNotClosed();
isClosed = true;
return digest.digest();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: an exception if not closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, it's needed as calling digest() will cause a reset in MessageDigest internal state and further calls will produce a wrong value. I've added a closing logic now.

}

private void ensureNotClosed() {
if (isClosed) {
throw new IllegalStateException("HashWriter is closed.");
}
}
}
75 changes: 75 additions & 0 deletions core/src/test/java/org/apache/iceberg/util/TestHashWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.core.JsonGenerator;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;

public class TestHashWriter {

@Test
public void testIncrementalHashCalculation() throws Exception {
HashWriter hashWriter = spy(new HashWriter("SHA-256", StandardCharsets.UTF_8));

// Create large enough TableMetadata which will be serialized into JSON in multiple chunks by
// the JSON generator
Map<String, String> icebergTblProperties = Maps.newHashMap();
for (int i = 0; i < 300; ++i) {
icebergTblProperties.put("Property Key " + i, "Property Value " + i);
}
Schema schema = new Schema(Types.NestedField.required(1, "col1", Types.StringType.get()));
TableMetadata tableMetadata =
TableMetadata.newTableMetadata(
schema, PartitionSpec.unpartitioned(), null, icebergTblProperties);

JsonGenerator generator = JsonUtil.factory().createGenerator(hashWriter);
TableMetadataParser.toJson(tableMetadata, generator);

// Expecting to see 3 write() invocations (and therefore incremental hash calculations)
verify(hashWriter, times(3)).write(any(char[].class), anyInt(), anyInt());

// +1 after flushing
generator.flush();
verify(hashWriter, times(4)).write(any(char[].class), anyInt(), anyInt());

// Expected hash is calculated on the whole object i.e. without streaming
byte[] expectedHash =
MessageDigest.getInstance("SHA-256")
.digest(TableMetadataParser.toJson(tableMetadata).getBytes(StandardCharsets.UTF_8));
assertThat(hashWriter.getHash()).isEqualTo(expectedHash);
assertThatThrownBy(() -> hashWriter.getHash()).hasMessageContaining("HashWriter is closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@

import static org.apache.iceberg.TableProperties.GC_ENABLED;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand All @@ -36,10 +42,12 @@
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.SortOrderParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.HashWriter;
import org.apache.iceberg.util.JsonUtil;
import org.apache.iceberg.view.ViewMetadata;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,6 +109,7 @@ public static void updateHmsTableForIcebergTable(
metadata.schema(),
maxHiveTablePropertySize);
setStorageHandler(parameters, hiveEngineEnabled);
setMetadataHash(metadata, parameters);

// Set the basic statistics
if (summary.get(SnapshotSummary.TOTAL_DATA_FILES_PROP) != null) {
Expand Down Expand Up @@ -256,6 +265,41 @@ static void setSchema(
}
}

@VisibleForTesting
static void setMetadataHash(TableMetadata metadata, Map<String, String> parameters) {
if (parameters.containsKey(TableProperties.ENCRYPTION_TABLE_KEY)) {
byte[] currentHashBytes = hashOf(metadata);
parameters.put(
BaseMetastoreTableOperations.METADATA_HASH_PROP,
Base64.getEncoder().encodeToString(currentHashBytes));
}
}

@VisibleForTesting
static void verifyMetadataHash(TableMetadata metadata, String metadataHashFromHMS) {
byte[] currentHashBytes = hashOf(metadata);
byte[] expectedHashBytes = Base64.getDecoder().decode(metadataHashFromHMS);

if (!Arrays.equals(expectedHashBytes, currentHashBytes)) {
throw new RuntimeException(
String.format(
"The current metadata file %s might have been modified. Hash of metadata loaded from storage differs "
+ "from HMS-stored metadata hash.",
metadata.metadataFileLocation()));
}
}

private static byte[] hashOf(TableMetadata tableMetadata) {
try (HashWriter hashWriter = new HashWriter("SHA-256", StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(hashWriter);
TableMetadataParser.toJson(tableMetadata, generator);
generator.flush();
return hashWriter.getHash();
} catch (NoSuchAlgorithmException | IOException e) {
throw new RuntimeException("Unable to produce hash of table metadata", e);
}
}

private static void setField(
Map<String, String> parameters, String key, String value, long maxHiveTablePropertySize) {
if (value.length() <= maxHiveTablePropertySize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
Expand Down Expand Up @@ -174,6 +175,7 @@ protected void doRefresh() {
String metadataLocation = null;
String tableKeyIdFromHMS = null;
String dekLengthFromHMS = null;
String metadataHashFromHMS = null;
try {
Table table = metaClients.run(client -> client.getTable(database, tableName));

Expand All @@ -189,6 +191,7 @@ the table key parameter (along with existing snapshots) in the file, making the
produce unencrypted files. Table key ID is taken directly from HMS catalog */
tableKeyIdFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_TABLE_KEY);
dekLengthFromHMS = table.getParameters().get(TableProperties.ENCRYPTION_DEK_LENGTH);
metadataHashFromHMS = table.getParameters().get(METADATA_HASH_PROP);
} catch (NoSuchObjectException e) {
if (currentMetadataLocation() != null) {
throw new NoSuchTableException("No such table: %s.%s", database, tableName);
Expand All @@ -207,7 +210,7 @@ the table key parameter (along with existing snapshots) in the file, making the
refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);

if (tableKeyIdFromHMS != null) {
checkEncryptionProperties(tableKeyIdFromHMS, dekLengthFromHMS);
checkIntegrityForEncryption(tableKeyIdFromHMS, dekLengthFromHMS, metadataHashFromHMS);

tableKeyId = tableKeyIdFromHMS;
encryptionDekLength =
Expand Down Expand Up @@ -245,6 +248,7 @@ the table key parameter (along with existing snapshots) in the file, making the
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean newTable = base == null;
final TableMetadata tableMetadata;
encryptionPropsFromMetadata(metadata.properties());

String newMetadataLocation;
Expand All @@ -257,19 +261,21 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
builder.addEncryptionKey(entry.getValue());
}

newMetadataLocation = writeNewMetadataIfRequired(newTable, builder.build());
tableMetadata = builder.build();
} else {
newMetadataLocation = writeNewMetadataIfRequired(newTable, metadata);
tableMetadata = metadata;
}

boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
newMetadataLocation = writeNewMetadataIfRequired(newTable, tableMetadata);

boolean hiveEngineEnabled = hiveEngineEnabled(tableMetadata, conf);
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);

BaseMetastoreOperations.CommitStatus commitStatus =
BaseMetastoreOperations.CommitStatus.FAILURE;
boolean updateHiveTable = false;

HiveLock lock = lockObject(base != null ? base : metadata);
HiveLock lock = lockObject(base != null ? base : tableMetadata);
try {
lock.lock();

Expand All @@ -293,14 +299,14 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
} else {
tbl =
newHmsTable(
metadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()));
tableMetadata.property(HiveCatalog.HMS_TABLE_OWNER, HiveHadoopUtil.currentUser()));
LOG.debug("Committing new table: {}", fullName);
}

tbl.setSd(
HiveOperationsBase.storageDescriptor(
metadata.schema(),
metadata.location(),
tableMetadata.schema(),
tableMetadata.location(),
hiveEngineEnabled)); // set to pickup any schema changes

String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
Expand All @@ -316,7 +322,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
if (base != null) {
removedProps =
base.properties().keySet().stream()
.filter(key -> !metadata.properties().containsKey(key))
.filter(key -> !tableMetadata.properties().containsKey(key))
.collect(Collectors.toSet());
}

Expand All @@ -334,7 +340,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
HMSTablePropertyHelper.updateHmsTableForIcebergTable(
newMetadataLocation,
tbl,
metadata,
tableMetadata,
removedProps,
hiveEngineEnabled,
maxHiveTablePropertySize,
Expand Down Expand Up @@ -391,7 +397,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
// issue for example, and triggers this exception. So we need double-check to make sure
// this is really a concurrent modification. Hitting this exception means no pending
// requests, if any, can succeed later, so it's safe to check status in strict mode
commitStatus = checkCommitStatusStrict(newMetadataLocation, metadata);
commitStatus = checkCommitStatusStrict(newMetadataLocation, tableMetadata);
if (commitStatus == BaseMetastoreOperations.CommitStatus.FAILURE) {
throw new CommitFailedException(
e, "The table %s.%s has been modified concurrently", database, tableName);
Expand All @@ -402,7 +408,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
database,
tableName,
e);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
commitStatus = checkCommitStatus(newMetadataLocation, tableMetadata);
}

switch (commitStatus) {
Expand Down Expand Up @@ -574,8 +580,20 @@ private void encryptionPropsFromMetadata(Map<String, String> tableProperties) {
}
}

private void checkEncryptionProperties(String encryptionKeyIdFromHMS, String dekLengthFromHMS) {
Map<String, String> propertiesFromMetadata = current().properties();
private void checkIntegrityForEncryption(
String encryptionKeyIdFromHMS, String dekLengthFromHMS, String metadataHashFromHMS) {
TableMetadata metadata = current();
if (StringUtils.isNotEmpty(metadataHashFromHMS)) {
HMSTablePropertyHelper.verifyMetadataHash(metadata, metadataHashFromHMS);
return;
}

LOG.warn(
"Full metadata integrity check skipped because no metadata hash was recorded in HMS for table {}."
+ " Falling back to encryption property based check.",
tableName);

Map<String, String> propertiesFromMetadata = metadata.properties();

String encryptionKeyIdFromMetadata =
propertiesFromMetadata.get(TableProperties.ENCRYPTION_TABLE_KEY);
Expand Down
Loading