Skip to content

Commit 088c5e4

Browse files
Fix replace transactions using Hive approach
1 parent dc5d4ba commit 088c5e4

File tree

2 files changed

+42
-13
lines changed

2 files changed

+42
-13
lines changed

core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Objects;
24+
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.function.Consumer;
2627
import java.util.function.Supplier;
@@ -77,7 +78,13 @@ enum UpdateType {
7778
private EncryptingFileIO encryptingFileIO;
7879
private String tableKeyId;
7980
private int encryptionDekLength;
80-
private List<EncryptedKey> encryptedKeysFromMetadata;
81+
82+
// keys loaded from the latest metadata
83+
private Optional<List<EncryptedKey>> encryptedKeysFromMetadata = Optional.empty();
84+
85+
// keys added to EM (e.g. as a result of a FileAppend) but not committed into the latest metadata
86+
// yet
87+
private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
8188

8289
RESTTableOperations(
8390
RESTClient client,
@@ -290,9 +297,12 @@ public EncryptionManager encryption() {
290297
TableProperties.ENCRYPTION_DEK_LENGTH,
291298
String.valueOf(encryptionDekLength));
292299

300+
List<EncryptedKey> keys = Lists.newLinkedList();
301+
encryptedKeysFromMetadata.ifPresent(keys::addAll);
302+
encryptedKeysPending.ifPresent(keys::addAll);
303+
293304
encryptionManager =
294-
EncryptionUtil.createEncryptionManager(
295-
encryptedKeysFromMetadata, encryptionProperties, kmsClient);
305+
EncryptionUtil.createEncryptionManager(keys, encryptionProperties, kmsClient);
296306
} else {
297307
return PlaintextEncryptionManager.instance();
298308
}
@@ -342,7 +352,25 @@ private void encryptionPropsFromMetadata(TableMetadata metadata) {
342352
return;
343353
}
344354

345-
encryptedKeysFromMetadata = metadata.encryptionKeys();
355+
encryptedKeysFromMetadata = Optional.ofNullable(metadata.encryptionKeys());
356+
357+
if (encryptionManager != null) {
358+
encryptedKeysPending = Optional.of(Lists.newLinkedList());
359+
360+
Set<String> keyIdsFromMetadata =
361+
encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
362+
.map(EncryptedKey::keyId)
363+
.collect(Collectors.toSet());
364+
365+
for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) {
366+
if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
367+
encryptedKeysPending.get().add(keyFromEM);
368+
}
369+
}
370+
371+
} else {
372+
encryptedKeysPending = Optional.empty();
373+
}
346374

347375
// Refresh encryption-related table properties on new/refreshed metadata
348376
Map<String, String> tableProperties = metadata.properties();
@@ -368,7 +396,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) {
368396
if (current == null
369397
|| !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) {
370398
this.current = checkUUID(current, response.tableMetadata());
371-
encryptionPropsFromMetadata(this.current);
399+
encryptionPropsFromMetadata(current);
372400
}
373401

374402
return current;

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void testConcurrentAppendTransactions() {
164164
assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2);
165165
}
166166

167+
// See CatalogTests#testConcurrentReplaceTransactions
167168
@TestTemplate
168169
public void testConcurrentReplaceTransactions() {
169170
validationCatalog.initialize(catalogName, catalogConfig);
@@ -174,18 +175,18 @@ public void testConcurrentReplaceTransactions() {
174175

175176
// Write data for a replace transaction that will be committed later
176177
Transaction secondReplace =
177-
validationCatalog
178-
.buildTable(tableIdent, schema)
179-
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
180-
.replaceTransaction();
178+
validationCatalog
179+
.buildTable(tableIdent, schema)
180+
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
181+
.replaceTransaction();
181182
secondReplace.newFastAppend().appendFile(file).commit();
182183

183184
// Commit another replace transaction first
184185
Transaction firstReplace =
185-
validationCatalog
186-
.buildTable(tableIdent, schema)
187-
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
188-
.replaceTransaction();
186+
validationCatalog
187+
.buildTable(tableIdent, schema)
188+
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
189+
.replaceTransaction();
189190
firstReplace.newFastAppend().appendFile(file).commit();
190191
firstReplace.commitTransaction();
191192

0 commit comments

Comments
 (0)