Skip to content

Commit 64e958a

Browse files
Fix and test concurrent replace transactions
1 parent 5b7119b commit 64e958a

File tree

2 files changed

+69
-9
lines changed

2 files changed

+69
-9
lines changed

core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.Objects;
24+
import java.util.Optional;
2425
import java.util.Set;
2526
import java.util.function.Consumer;
2627
import java.util.function.Supplier;
@@ -77,7 +78,13 @@ enum UpdateType {
7778
private EncryptingFileIO encryptingFileIO;
7879
private String tableKeyId;
7980
private int encryptionDekLength;
80-
private List<EncryptedKey> encryptedKeysFromMetadata;
81+
82+
// keys loaded from the latest metadata
83+
private Optional<List<EncryptedKey>> encryptedKeysFromMetadata = Optional.empty();
84+
85+
// keys added to EM (e.g. as a result of a FileAppend) but not committed into the latest metadata
86+
// yet
87+
private Optional<List<EncryptedKey>> encryptedKeysPending = Optional.empty();
8188

8289
RESTTableOperations(
8390
RESTClient client,
@@ -290,9 +297,12 @@ public EncryptionManager encryption() {
290297
TableProperties.ENCRYPTION_DEK_LENGTH,
291298
String.valueOf(encryptionDekLength));
292299

300+
List<EncryptedKey> keys = Lists.newLinkedList();
301+
encryptedKeysFromMetadata.ifPresent(keys::addAll);
302+
encryptedKeysPending.ifPresent(keys::addAll);
303+
293304
encryptionManager =
294-
EncryptionUtil.createEncryptionManager(
295-
encryptedKeysFromMetadata, encryptionProperties, kmsClient);
305+
EncryptionUtil.createEncryptionManager(keys, encryptionProperties, kmsClient);
296306
} else {
297307
return PlaintextEncryptionManager.instance();
298308
}
@@ -342,7 +352,25 @@ private void encryptionPropsFromMetadata(TableMetadata metadata) {
342352
return;
343353
}
344354

345-
encryptedKeysFromMetadata = metadata.encryptionKeys();
355+
encryptedKeysFromMetadata = Optional.ofNullable(metadata.encryptionKeys());
356+
357+
if (encryptionManager != null) {
358+
encryptedKeysPending = Optional.of(Lists.newLinkedList());
359+
360+
Set<String> keyIdsFromMetadata =
361+
encryptedKeysFromMetadata.orElseGet(Lists::newLinkedList).stream()
362+
.map(EncryptedKey::keyId)
363+
.collect(Collectors.toSet());
364+
365+
for (EncryptedKey keyFromEM : EncryptionUtil.encryptionKeys(encryptionManager).values()) {
366+
if (!keyIdsFromMetadata.contains(keyFromEM.keyId())) {
367+
encryptedKeysPending.get().add(keyFromEM);
368+
}
369+
}
370+
371+
} else {
372+
encryptedKeysPending = Optional.empty();
373+
}
346374

347375
// Refresh encryption-related table properties on new/refreshed metadata
348376
Map<String, String> tableProperties = metadata.properties();
@@ -368,7 +396,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) {
368396
if (current == null
369397
|| !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) {
370398
this.current = checkUUID(current, response.tableMetadata());
371-
encryptionPropsFromMetadata(this.current);
399+
encryptionPropsFromMetadata(current);
372400
}
373401

374402
return current;

spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void testRefresh() {
127127
}
128128

129129
@TestTemplate
130-
public void testTransaction() {
130+
public void testAppendTransaction() {
131131
validationCatalog.initialize(catalogName, catalogConfig);
132132
Table table = validationCatalog.loadTable(tableIdent);
133133

@@ -140,11 +140,11 @@ public void testTransaction() {
140140
append.commit();
141141
transaction.commitTransaction();
142142

143-
assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1);
143+
assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 1);
144144
}
145145

146146
@TestTemplate
147-
public void testTransactionRetry() {
147+
public void testConcurrentAppendTransactions() {
148148
validationCatalog.initialize(catalogName, catalogConfig);
149149
Table table = validationCatalog.loadTable(tableIdent);
150150

@@ -161,7 +161,39 @@ public void testTransactionRetry() {
161161
append.commit();
162162
transaction.commitTransaction();
163163

164-
assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 2);
164+
assertThat(currentDataFiles(table)).hasSize(dataFiles.size() + 2);
165+
}
166+
167+
// See CatalogTests#testConcurrentReplaceTransactions
168+
@TestTemplate
169+
public void testConcurrentReplaceTransactions() {
170+
validationCatalog.initialize(catalogName, catalogConfig);
171+
172+
Table table = validationCatalog.loadTable(tableIdent);
173+
DataFile file = currentDataFiles(table).get(0);
174+
Schema schema = table.schema();
175+
176+
// Write data for a replace transaction that will be committed later
177+
Transaction secondReplace =
178+
validationCatalog
179+
.buildTable(tableIdent, schema)
180+
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
181+
.replaceTransaction();
182+
secondReplace.newFastAppend().appendFile(file).commit();
183+
184+
// Commit another replace transaction first
185+
Transaction firstReplace =
186+
validationCatalog
187+
.buildTable(tableIdent, schema)
188+
.withProperty("encryption.key-id", UnitestKMS.MASTER_KEY_NAME1)
189+
.replaceTransaction();
190+
firstReplace.newFastAppend().appendFile(file).commit();
191+
firstReplace.commitTransaction();
192+
193+
secondReplace.commitTransaction();
194+
195+
Table afterSecondReplace = validationCatalog.loadTable(tableIdent);
196+
assertThat(currentDataFiles(afterSecondReplace)).hasSize(1);
165197
}
166198

167199
@TestTemplate

0 commit comments

Comments
 (0)