From 7790fd160c4cf011d40d69e0245fe8b56a689911 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 1 Feb 2025 13:36:21 -0800 Subject: [PATCH 01/24] make checkpoint post commit --- .../examples/CreateTableAndInsertData.java | 7 +++--- .../io/delta/kernel/PostCommitAction.java | 24 +++++++++++++++++++ .../delta/kernel/TransactionCommitResult.java | 18 +++++--------- .../kernel/internal/TransactionImpl.java | 18 +++++++++++++- .../defaults/DeltaTableWriteSuiteBase.scala | 24 ++++++++++++++----- 5 files changed, 69 insertions(+), 22 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index 8328900056a..83685116a43 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -409,10 +409,11 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { // for every 10 versions. for (int i = 0; i < 12; i++) { TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath); - if (commitResult.isReadyForCheckpoint()) { + for(PostCommitAction action: commitResult.getPostCommitActions()) + if (action.getType().equals("checkpoint")) { // Checkpoint the table - Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion()); - didCheckpoint = true; + action.threadSafeInvoke(); + didCheckpoint = true; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java new file mode 100644 index 00000000000..a3026d6af59 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java @@ -0,0 +1,24 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel; + +public interface PostCommitAction { + + void threadSafeInvoke() throws Exception; + + String getType(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 831fa18245c..149a7fa26af 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -18,6 +18,7 @@ import io.delta.kernel.annotation.Evolving; import io.delta.kernel.engine.Engine; import io.delta.kernel.utils.CloseableIterable; +import java.util.List; /** * Contains the result of a successful transaction commit. Returned by {@link @@ -28,11 +29,11 @@ @Evolving public class TransactionCommitResult { private final long version; - private final boolean isReadyForCheckpoint; + private final List postCommitActions; - public TransactionCommitResult(long version, boolean isReadyForCheckpoint) { + public TransactionCommitResult(long version, List postCommitActions) { this.version = version; - this.isReadyForCheckpoint = isReadyForCheckpoint; + this.postCommitActions = postCommitActions; } /** @@ -44,14 +45,7 @@ public long getVersion() { return version; } - /** - * Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? If - * yes the connector can choose to checkpoint as the version the transaction is committed as using - * {@link Table#checkpoint(Engine, long)} - * - * @return Is the table ready for checkpointing? - */ - public boolean isReadyForCheckpoint() { - return isReadyForCheckpoint; + public List getPostCommitActions() { + return postCommitActions; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 44b0b49ccd9..ea35b8c08a4 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -354,7 +354,23 @@ private TransactionCommitResult doCommit( "Write file actions to JSON log file `%s`", FileNames.deltaFile(logPath, commitAsVersion)); - return new TransactionCommitResult(commitAsVersion, isReadyForCheckpoint(commitAsVersion)); + List postCommitActions = new ArrayList<>(); + if (isReadyForCheckpoint(commitAsVersion)) { + postCommitActions.add( + new PostCommitAction() { + @Override + public void threadSafeInvoke() throws IOException { + Table.forPath(engine, dataPath.toString()).checkpoint(engine, commitAsVersion); + } + + @Override + public String getType() { + return "checkpoint"; + } + }); + } + + return new TransactionCommitResult(commitAsVersion, postCommitActions); } catch (FileAlreadyExistsException e) { throw e; } catch (IOException ioe) { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 6abef53db27..5957f0b2744 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -26,8 +26,16 @@ import io.delta.kernel.internal.util.FileNames import io.delta.kernel.internal.util.Utils.singletonCloseableIterator import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl} import io.delta.kernel.utils.FileStatus -import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult} -import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row} +import io.delta.kernel.{ + Meta, + Operation, + PostCommitAction, + Table, + Transaction, + TransactionBuilder, + TransactionCommitResult +} +import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row} import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal.ofInt @@ -139,10 +147,14 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { tablePath: String, result: TransactionCommitResult, expSize: Long): Unit = { - if (result.isReadyForCheckpoint) { - Table.forPath(engine, tablePath).checkpoint(engine, result.getVersion) - verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) - } + result.getPostCommitActions.forEach( + action => { + if (action.getType == "checkpoint") { + action.threadSafeInvoke() + verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) + } + } + ) } /** From 0fc403dd2f65c11010764f64d67478a36583aa32 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sat, 1 Feb 2025 14:00:55 -0800 Subject: [PATCH 02/24] post commit action --- .../io/delta/kernel/PostCommitAction.java | 4 +++- .../defaults/DeltaTableWriteSuiteBase.scala | 8 +++++-- .../defaults/DeltaTableWritesSuite.scala | 21 ++++++++++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java index a3026d6af59..1047746cf09 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java @@ -16,9 +16,11 @@ package io.delta.kernel; +import java.io.IOException; + public interface PostCommitAction { - void threadSafeInvoke() throws Exception; + void threadSafeInvoke() throws IOException; String getType(); } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 5957f0b2744..214e60ab83f 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -29,7 +29,6 @@ import io.delta.kernel.utils.FileStatus import io.delta.kernel.{ Meta, Operation, - PostCommitAction, Table, Transaction, TransactionBuilder, @@ -411,7 +410,12 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { expVersion: Long, expIsReadyForCheckpoint: Boolean): Unit = { assert(result.getVersion === expVersion) - assert(result.isReadyForCheckpoint === expIsReadyForCheckpoint) + assert( + result.getPostCommitActions + .stream() + .filter(action => action.getType == "checkpoint") + .count() == 1 === expIsReadyForCheckpoint + ) } def verifyTableProperties( diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 83b77ef208b..c8accb6aa11 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -131,7 +131,12 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assert( + txnResult.getPostCommitActions + .stream() + .filter(action => action.getType == "checkpoint") + .count() == 0 + ) verifyCommitInfo(tablePath = tablePath, version = 0) verifyWrittenContent(tablePath, testSchema, Seq.empty) @@ -350,7 +355,12 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assert( + txnResult.getPostCommitActions + .stream() + .filter(action => action.getType == "checkpoint") + .count() == 0 + ) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) verifyWrittenContent(tablePath, schema, Seq.empty) @@ -368,7 +378,12 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert(!txnResult.isReadyForCheckpoint) + assert( + txnResult.getPostCommitActions + .stream() + .filter(action => action.getType == "checkpoint") + .count() == 0 + ) verifyCommitInfo(tablePath, version = 0) verifyWrittenContent(tablePath, schema, Seq.empty) From 9278d0bfccf94cd32ba3e02e1de388d9866ddd3e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:09:15 -0800 Subject: [PATCH 03/24] use async example --- .../kernel/examples/CreateTableAndInsertData.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index 83685116a43..df1e71ad208 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -410,11 +410,17 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { for (int i = 0; i < 12; i++) { TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath); for(PostCommitAction action: commitResult.getPostCommitActions()) - if (action.getType().equals("checkpoint")) { // Checkpoint the table - action.threadSafeInvoke(); - didCheckpoint = true; - } + didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> { + // run the code async + try{ + action.threadSafeInvoke(); + } catch (IOException e) { + return false; + } + return action.getType().equals("checkpoint"); + } + ).join(); // wait async finish. } if (!didCheckpoint) { From 396e9022ae34f9b40d4fb8824d7c71f64f1342a9 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:10:16 -0800 Subject: [PATCH 04/24] fix ident --- .../examples/CreateTableAndInsertData.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index df1e71ad208..eb093855219 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -412,15 +412,14 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { for(PostCommitAction action: commitResult.getPostCommitActions()) // Checkpoint the table didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> { - // run the code async - try{ - action.threadSafeInvoke(); - } catch (IOException e) { - return false; - } - return action.getType().equals("checkpoint"); - } - ).join(); // wait async finish. + // run the code async + try{ + action.threadSafeInvoke(); + } catch (IOException e) { + return false; + } + return action.getType().equals("checkpoint"); + }).join(); // wait async finish. } if (!didCheckpoint) { From e18b792087a099f673fd9217e8a189aba32bb1b4 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:24:35 -0800 Subject: [PATCH 05/24] add example for writing async --- .../examples/CreateTableAndInsertData.java | 3 ++- .../io/delta/kernel/PostCommitAction.java | 3 ++- .../io/delta/kernel/PostCommitActionType.java | 20 ++++++++++++++ .../kernel/internal/TransactionImpl.java | 27 ++++++++++--------- .../defaults/DeltaTableWriteSuiteBase.scala | 8 +++--- .../defaults/DeltaTableWritesSuite.scala | 19 +++++++------ 6 files changed, 53 insertions(+), 27 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index eb093855219..c84542fc246 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.*; +import java.util.concurrent.CompletableFuture; import org.apache.commons.cli.Options; @@ -418,7 +419,7 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { } catch (IOException e) { return false; } - return action.getType().equals("checkpoint"); + return action.getType().equals(PostCommitActionType.CHECKPOINT); }).join(); // wait async finish. } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java index 1047746cf09..a7fc2100c28 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java @@ -20,7 +20,8 @@ public interface PostCommitAction { + // Invokes the post commit action, implementation of action should be thread safe. void threadSafeInvoke() throws IOException; - String getType(); + PostCommitActionType getType(); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java new file mode 100644 index 00000000000..85b749dc639 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java @@ -0,0 +1,20 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel; + +public enum PostCommitActionType { + CHECKPOINT +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index ea35b8c08a4..00615b30488 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -356,18 +356,7 @@ private TransactionCommitResult doCommit( List postCommitActions = new ArrayList<>(); if (isReadyForCheckpoint(commitAsVersion)) { - postCommitActions.add( - new PostCommitAction() { - @Override - public void threadSafeInvoke() throws IOException { - Table.forPath(engine, dataPath.toString()).checkpoint(engine, commitAsVersion); - } - - @Override - public String getType() { - return "checkpoint"; - } - }); + postCommitActions.add(checkpoint(engine, dataPath.toString(), commitAsVersion)); } return new TransactionCommitResult(commitAsVersion, postCommitActions); @@ -384,6 +373,20 @@ public boolean isBlindAppend() { return true; } + private static PostCommitAction checkpoint(Engine engine, String tablePath, long version) { + return new PostCommitAction() { + @Override + public void threadSafeInvoke() throws IOException { + Table.forPath(engine, tablePath).checkpoint(engine, version); + } + + @Override + public PostCommitActionType getType() { + return PostCommitActionType.CHECKPOINT; + } + }; + } + /** * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can * result in an additional file read and that this will only happen if ICT is enabled. diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 214e60ab83f..26fe5df9f17 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -29,6 +29,7 @@ import io.delta.kernel.utils.FileStatus import io.delta.kernel.{ Meta, Operation, + PostCommitActionType, Table, Transaction, TransactionBuilder, @@ -148,7 +149,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { expSize: Long): Unit = { result.getPostCommitActions.forEach( action => { - if (action.getType == "checkpoint") { + if (action.getType == PostCommitActionType.CHECKPOINT) { action.threadSafeInvoke() verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) } @@ -413,8 +414,9 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { assert( result.getPostCommitActions .stream() - .filter(action => action.getType == "checkpoint") - .count() == 1 === expIsReadyForCheckpoint + .anyMatch( + action => action.getType == PostCommitActionType.CHECKPOINT + ) === expIsReadyForCheckpoint ) } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index c8accb6aa11..606210a22fa 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -132,10 +132,10 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - txnResult.getPostCommitActions - .stream() - .filter(action => action.getType == "checkpoint") - .count() == 0 + !txnResult.getPostCommitActions.stream() + .anyMatch( + action => action.getType == PostCommitActionType.CHECKPOINT + ) ) verifyCommitInfo(tablePath = tablePath, version = 0) @@ -358,8 +358,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert( txnResult.getPostCommitActions .stream() - .filter(action => action.getType == "checkpoint") - .count() == 0 + .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) ) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) @@ -379,10 +378,10 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - txnResult.getPostCommitActions - .stream() - .filter(action => action.getType == "checkpoint") - .count() == 0 + txnResult.getPostCommitActions.stream() + .anyMatch( + action => action.getType == PostCommitActionType.CHECKPOINT + ) ) verifyCommitInfo(tablePath, version = 0) From 8f75f46ffc91a9a201d81b7b935cd9a38af128bf Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:26:18 -0800 Subject: [PATCH 06/24] fix test --- .../delta/kernel/defaults/DeltaTableWritesSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 606210a22fa..5363680aba2 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -356,7 +356,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - txnResult.getPostCommitActions + !txnResult.getPostCommitActions .stream() .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) ) @@ -377,11 +377,10 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert( - txnResult.getPostCommitActions.stream() - .anyMatch( - action => action.getType == PostCommitActionType.CHECKPOINT - ) + assert( + !txnResult.getPostCommitActions + .stream() + .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) ) verifyCommitInfo(tablePath, version = 0) From debd1ae12962f010b6c463b0f956373500937483 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:27:59 -0800 Subject: [PATCH 07/24] add doc --- .../src/main/java/io/delta/kernel/TransactionCommitResult.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 149a7fa26af..4fd303d96a5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -45,6 +45,9 @@ public long getVersion() { return version; } + /** + * @return list of actions to trigger after commit. + */ public List getPostCommitActions() { return postCommitActions; } From c1cbc4751c28880dca3bad798c1b3b0b96172434 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:29:57 -0800 Subject: [PATCH 08/24] fix ident --- .../io/delta/kernel/defaults/DeltaTableWritesSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 5363680aba2..fd696f0a521 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -355,10 +355,10 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert( + assert( !txnResult.getPostCommitActions - .stream() - .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) + .stream() + .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) ) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) From f262fe0b4855ad199ac9d0309c0c19a8780db3c3 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Sun, 2 Feb 2025 18:36:03 -0800 Subject: [PATCH 09/24] fix java format --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 4fd303d96a5..6fd36fdfcf9 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -45,9 +45,7 @@ public long getVersion() { return version; } - /** - * @return list of actions to trigger after commit. - */ + /** @return list of actions to trigger after commit. */ public List getPostCommitActions() { return postCommitActions; } From 4d5ed9c7b01e36717c31068f0ebe096d45fea329 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 3 Feb 2025 17:39:31 -0800 Subject: [PATCH 10/24] seperate commit action to hook package --- .../examples/CreateTableAndInsertData.java | 7 ++-- .../delta/kernel/TransactionCommitResult.java | 11 ++--- .../io/delta/kernel/hook/CheckpointHook.java | 42 +++++++++++++++++++ .../PostCommitHook.java} | 11 ++--- .../PostCommitHookType.java} | 4 +- .../kernel/internal/TransactionImpl.java | 22 +++------- .../defaults/DeltaTableWriteSuiteBase.scala | 14 +++---- .../defaults/DeltaTableWritesSuite.scala | 13 +++--- 8 files changed, 79 insertions(+), 45 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java rename kernel/kernel-api/src/main/java/io/delta/kernel/{PostCommitAction.java => hook/PostCommitHook.java} (70%) rename kernel/kernel-api/src/main/java/io/delta/kernel/{PostCommitActionType.java => hook/PostCommitHookType.java} (90%) diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index c84542fc246..96cf454dc00 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -25,6 +25,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Literal; +import io.delta.kernel.hook.*; import io.delta.kernel.utils.*; import static io.delta.kernel.examples.utils.Utils.parseArgs; @@ -410,16 +411,16 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException { // for every 10 versions. for (int i = 0; i < 12; i++) { TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath); - for(PostCommitAction action: commitResult.getPostCommitActions()) + for(PostCommitHook hook: commitResult.getPostCommitHooks()) // Checkpoint the table didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> { // run the code async try{ - action.threadSafeInvoke(); + hook.threadSafeInvoke(engine); } catch (IOException e) { return false; } - return action.getType().equals(PostCommitActionType.CHECKPOINT); + return hook.getType().equals(PostCommitHookType.CHECKPOINT); }).join(); // wait async finish. } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 6fd36fdfcf9..44d8eeaad19 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -17,6 +17,7 @@ import io.delta.kernel.annotation.Evolving; import io.delta.kernel.engine.Engine; +import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.utils.CloseableIterable; import java.util.List; @@ -29,11 +30,11 @@ @Evolving public class TransactionCommitResult { private final long version; - private final List postCommitActions; + private final List postCommitHooks; - public TransactionCommitResult(long version, List postCommitActions) { + public TransactionCommitResult(long version, List postCommitHooks) { this.version = version; - this.postCommitActions = postCommitActions; + this.postCommitHooks = postCommitHooks; } /** @@ -46,7 +47,7 @@ public long getVersion() { } /** @return list of actions to trigger after commit. */ - public List getPostCommitActions() { - return postCommitActions; + public List getPostCommitHooks() { + return postCommitHooks; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java new file mode 100644 index 00000000000..1ea9a46cb35 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java @@ -0,0 +1,42 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.hook; + +import io.delta.kernel.Table; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.fs.Path; +import java.io.IOException; + +public class CheckpointHook implements PostCommitHook { + + private final Path tablePath; + private final long checkpointVersion; + + public CheckpointHook(Path tablePath, long checkpointVersion) { + this.tablePath = tablePath; + this.checkpointVersion = checkpointVersion; + } + + @Override + public void threadSafeInvoke(Engine engine) throws IOException { + Table.forPath(engine, tablePath.toString()).checkpoint(engine, checkpointVersion); + } + + @Override + public PostCommitHookType getType() { + return PostCommitHookType.CHECKPOINT; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java similarity index 70% rename from kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index a7fc2100c28..571cb503645 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitAction.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -14,14 +14,15 @@ * limitations under the License. */ -package io.delta.kernel; +package io.delta.kernel.hook; +import io.delta.kernel.engine.Engine; import java.io.IOException; -public interface PostCommitAction { +public interface PostCommitHook { - // Invokes the post commit action, implementation of action should be thread safe. - void threadSafeInvoke() throws IOException; + /** Invokes the post commit hook, implementation should be thread safe. */ + void threadSafeInvoke(Engine engine) throws IOException; - PostCommitActionType getType(); + PostCommitHookType getType(); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java similarity index 90% rename from kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java index 85b749dc639..749c5efdcc7 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/PostCommitActionType.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.delta.kernel; +package io.delta.kernel.hook; -public enum PostCommitActionType { +public enum PostCommitHookType { CHECKPOINT } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 00615b30488..60467f711d1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -27,6 +27,8 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.expressions.Column; +import io.delta.kernel.hook.CheckpointHook; +import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; @@ -354,12 +356,12 @@ private TransactionCommitResult doCommit( "Write file actions to JSON log file `%s`", FileNames.deltaFile(logPath, commitAsVersion)); - List postCommitActions = new ArrayList<>(); + List postCommitHooks = new ArrayList<>(); if (isReadyForCheckpoint(commitAsVersion)) { - postCommitActions.add(checkpoint(engine, dataPath.toString(), commitAsVersion)); + postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion)); } - return new TransactionCommitResult(commitAsVersion, postCommitActions); + return new TransactionCommitResult(commitAsVersion, postCommitHooks); } catch (FileAlreadyExistsException e) { throw e; } catch (IOException ioe) { @@ -373,20 +375,6 @@ public boolean isBlindAppend() { return true; } - private static PostCommitAction checkpoint(Engine engine, String tablePath, long version) { - return new PostCommitAction() { - @Override - public void threadSafeInvoke() throws IOException { - Table.forPath(engine, tablePath).checkpoint(engine, version); - } - - @Override - public PostCommitActionType getType() { - return PostCommitActionType.CHECKPOINT; - } - }; - } - /** * Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can * result in an additional file read and that this will only happen if ICT is enabled. diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index 26fe5df9f17..be01cc2f4d3 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -29,7 +29,6 @@ import io.delta.kernel.utils.FileStatus import io.delta.kernel.{ Meta, Operation, - PostCommitActionType, Table, Transaction, TransactionBuilder, @@ -47,6 +46,7 @@ import io.delta.kernel.types.StructType import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} import io.delta.kernel.utils.CloseableIterator import io.delta.kernel.Operation.CREATE_TABLE +import io.delta.kernel.hook.PostCommitHookType import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -147,10 +147,10 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { tablePath: String, result: TransactionCommitResult, expSize: Long): Unit = { - result.getPostCommitActions.forEach( - action => { - if (action.getType == PostCommitActionType.CHECKPOINT) { - action.threadSafeInvoke() + result.getPostCommitHooks.forEach( + hook => { + if (hook.getType == PostCommitHookType.CHECKPOINT) { + hook.threadSafeInvoke(engine) verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize) } } @@ -412,10 +412,10 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { expIsReadyForCheckpoint: Boolean): Unit = { assert(result.getVersion === expVersion) assert( - result.getPostCommitActions + result.getPostCommitHooks .stream() .anyMatch( - action => action.getType == PostCommitActionType.CHECKPOINT + hook => hook.getType == PostCommitHookType.CHECKPOINT ) === expIsReadyForCheckpoint ) } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index fd696f0a521..f85585adfe3 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -25,6 +25,7 @@ import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions._ import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal._ +import io.delta.kernel.hook.PostCommitHookType import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames import io.delta.kernel.internal.{SnapshotImpl, TableConfig} @@ -132,9 +133,9 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - !txnResult.getPostCommitActions.stream() + !txnResult.getPostCommitHooks.stream() .anyMatch( - action => action.getType == PostCommitActionType.CHECKPOINT + hook => hook.getType == PostCommitHookType.CHECKPOINT ) ) @@ -356,9 +357,9 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - !txnResult.getPostCommitActions + !txnResult.getPostCommitHooks .stream() - .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) + .anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) ) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) @@ -378,9 +379,9 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(txnResult.getVersion === 0) assert( - !txnResult.getPostCommitActions + !txnResult.getPostCommitHooks .stream() - .anyMatch(action => action.getType == PostCommitActionType.CHECKPOINT) + .anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) ) verifyCommitInfo(tablePath, version = 0) From 60942c24bf020151e3f204f0a1d647fc8c1efc91 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Mon, 3 Feb 2025 19:06:26 -0800 Subject: [PATCH 11/24] fix doc --- .../src/main/java/io/delta/kernel/TransactionCommitResult.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 44d8eeaad19..6829a013b64 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -46,7 +46,7 @@ public long getVersion() { return version; } - /** @return list of actions to trigger after commit. */ + /** @return list of operations to trigger after commit. */ public List getPostCommitHooks() { return postCommitHooks; } From ddf351d8f17b83804652d591d2e3270aec408e0d Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 4 Feb 2025 10:10:32 -0800 Subject: [PATCH 12/24] fix doc --- .../src/main/java/io/delta/kernel/hook/PostCommitHook.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index 571cb503645..ded632d1b75 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -21,7 +21,7 @@ public interface PostCommitHook { - /** Invokes the post commit hook, implementation should be thread safe. */ + /** Invokes the post commit operation whose implementation must be thread safe. */ void threadSafeInvoke(Engine engine) throws IOException; PostCommitHookType getType(); From 9ab9587c1bae9b851c1a1decca7cf6fa6151236e Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 4 Feb 2025 19:24:34 -0800 Subject: [PATCH 13/24] package --- .../io/delta/kernel/hook/PostCommitHook.java | 9 ++++++++ .../delta/kernel/hook/PostCommitHookType.java | 20 ----------------- .../kernel/internal/TransactionImpl.java | 2 +- .../{ => internal}/hook/CheckpointHook.java | 4 +++- .../defaults/DeltaTableWriteSuiteBase.scala | 22 ++++++++++++------- .../defaults/DeltaTableWritesSuite.scala | 21 ++++-------------- 6 files changed, 31 insertions(+), 47 deletions(-) delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java rename kernel/kernel-api/src/main/java/io/delta/kernel/{ => internal}/hook/CheckpointHook.java (88%) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index ded632d1b75..49558e30716 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -19,8 +19,17 @@ import io.delta.kernel.engine.Engine; import java.io.IOException; +/** + * A hook for executing operation after a transaction commit. Hooks are added in the Transaction and + * engine need to invoke the hook explicitly for executing the operation. + */ public interface PostCommitHook { + enum PostCommitHookType { + // Write a new checkpoint at the version committed by the txn if required. + CHECKPOINT, + } + /** Invokes the post commit operation whose implementation must be thread safe. */ void threadSafeInvoke(Engine engine) throws IOException; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java deleted file mode 100644 index 749c5efdcc7..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHookType.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (2025) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.hook; - -public enum PostCommitHookType { - CHECKPOINT -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java index 60467f711d1..b2bef46757a 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java @@ -27,11 +27,11 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.ConcurrentWriteException; import io.delta.kernel.expressions.Column; -import io.delta.kernel.hook.CheckpointHook; import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.internal.actions.*; import io.delta.kernel.internal.data.TransactionStateRow; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.hook.CheckpointHook; import io.delta.kernel.internal.metrics.TransactionMetrics; import io.delta.kernel.internal.metrics.TransactionReportImpl; import io.delta.kernel.internal.replay.ConflictChecker; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java similarity index 88% rename from kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java rename to kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java index 1ea9a46cb35..4434ab62c84 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/CheckpointHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java @@ -13,13 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.delta.kernel.hook; +package io.delta.kernel.internal.hook; import io.delta.kernel.Table; import io.delta.kernel.engine.Engine; +import io.delta.kernel.hook.PostCommitHook; import io.delta.kernel.internal.fs.Path; import java.io.IOException; +/** Write a new checkpoint at the version committed by the txn if required. */ public class CheckpointHook implements PostCommitHook { private final Path tablePath; diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala index be01cc2f4d3..6f24cca7aa1 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala @@ -46,7 +46,7 @@ import io.delta.kernel.types.StructType import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable} import io.delta.kernel.utils.CloseableIterator import io.delta.kernel.Operation.CREATE_TABLE -import io.delta.kernel.hook.PostCommitHookType +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -411,13 +411,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { expVersion: Long, expIsReadyForCheckpoint: Boolean): Unit = { assert(result.getVersion === expVersion) - assert( - result.getPostCommitHooks - .stream() - .anyMatch( - hook => hook.getType == PostCommitHookType.CHECKPOINT - ) === expIsReadyForCheckpoint - ) + assertCheckpointReadiness(result, expIsReadyForCheckpoint) } def verifyTableProperties( @@ -439,4 +433,16 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils { builder.append("]") checkAnswer(resultProperties, Seq(builder.toString()).map(TestRow(_))) } + + def assertCheckpointReadiness( + txnResult: TransactionCommitResult, + isReadyForCheckpoint: Boolean): Unit = { + assert( + txnResult.getPostCommitHooks + .stream() + .anyMatch( + hook => hook.getType == PostCommitHookType.CHECKPOINT + ) === isReadyForCheckpoint + ) + } } diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index f85585adfe3..487ffc666ca 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -25,7 +25,7 @@ import io.delta.kernel.engine.Engine import io.delta.kernel.exceptions._ import io.delta.kernel.expressions.Literal import io.delta.kernel.expressions.Literal._ -import io.delta.kernel.hook.PostCommitHookType +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames import io.delta.kernel.internal.{SnapshotImpl, TableConfig} @@ -132,12 +132,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert( - !txnResult.getPostCommitHooks.stream() - .anyMatch( - hook => hook.getType == PostCommitHookType.CHECKPOINT - ) - ) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath = tablePath, version = 0) verifyWrittenContent(tablePath, testSchema, Seq.empty) @@ -356,11 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert( - !txnResult.getPostCommitHooks - .stream() - .anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) - ) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) verifyWrittenContent(tablePath, schema, Seq.empty) @@ -378,11 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assert( - !txnResult.getPostCommitHooks - .stream() - .anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT) - ) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0) verifyWrittenContent(tablePath, schema, Seq.empty) From 1d96f5403c8d92f7c7864d8de26cd4edf294d552 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 4 Feb 2025 19:25:57 -0800 Subject: [PATCH 14/24] ident --- .../io/delta/kernel/defaults/DeltaTableWritesSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 487ffc666ca..22f3d07fcd0 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -351,7 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2")) verifyWrittenContent(tablePath, schema, Seq.empty) @@ -369,7 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa val txnResult = txn.commit(engine, emptyIterable()) assert(txnResult.getVersion === 0) - assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) + assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false) verifyCommitInfo(tablePath, version = 0) verifyWrittenContent(tablePath, schema, Seq.empty) From d8cdaffa9888551c7b7d99cc9992b2fc72695e20 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Tue, 4 Feb 2025 19:53:51 -0800 Subject: [PATCH 15/24] fix import --- .../io/delta/kernel/examples/CreateTableAndInsertData.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java index 96cf454dc00..c6cce250d4b 100644 --- a/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java +++ b/kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java @@ -25,7 +25,8 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.expressions.Literal; -import io.delta.kernel.hook.*; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.hook.PostCommitHook.PostCommitHookType; import io.delta.kernel.utils.*; import static io.delta.kernel.examples.utils.Utils.parseArgs; From 8a21a64934b949746ea6596615ac88514440ea46 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 11:16:28 -0800 Subject: [PATCH 16/24] update the doc --- .../src/main/java/io/delta/kernel/hook/PostCommitHook.java | 5 ++++- .../java/io/delta/kernel/internal/hook/CheckpointHook.java | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index 49558e30716..ff97dd5034c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -26,7 +26,10 @@ public interface PostCommitHook { enum PostCommitHookType { - // Write a new checkpoint at the version committed by the txn if required. + /** + * Write a new checkpoint at the version committed by the txn. This hook is present when the + * table is ready for checkpoint according to its configured checkpoint interval. + */ CHECKPOINT, } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java index 4434ab62c84..bb291422956 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/hook/CheckpointHook.java @@ -21,7 +21,7 @@ import io.delta.kernel.internal.fs.Path; import java.io.IOException; -/** Write a new checkpoint at the version committed by the txn if required. */ +/** Write a new checkpoint at the version committed by the txn. */ public class CheckpointHook implements PostCommitHook { private final Path tablePath; From 006eb107fc8e53710dd62c40c4b4744acc1f4225 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 14:56:16 -0800 Subject: [PATCH 17/24] update message --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 7 ++++++- .../src/main/java/io/delta/kernel/hook/PostCommitHook.java | 7 +++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 6829a013b64..84ca4033fe5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -46,7 +46,12 @@ public long getVersion() { return version; } - /** @return list of operations to trigger after commit. */ + /** + * Contains list of operations for kernel based connector to trigger. Connector could trigger the + * operations by calling {@link PostCommitHook#threadSafeInvoke(Engine)} in a sync or async way. + * + * @return list of operations to trigger after commit. + */ public List getPostCommitHooks() { return postCommitHooks; } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index ff97dd5034c..d4597cddb11 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -21,14 +21,17 @@ /** * A hook for executing operation after a transaction commit. Hooks are added in the Transaction and - * engine need to invoke the hook explicitly for executing the operation. + * engine need to invoke the hook explicitly for executing the operation. Supported operations are + * listed in {@link PostCommitHookType} */ public interface PostCommitHook { enum PostCommitHookType { /** * Write a new checkpoint at the version committed by the txn. This hook is present when the - * table is ready for checkpoint according to its configured checkpoint interval. + * table is ready for checkpoint according to its configured checkpoint interval. To perform + * this operation, previous checkpoint (if present) and logs after checkpoint will be read to + * construct new checkpoint. */ CHECKPOINT, } From fd21ec177319e441df901ae43c30277393c68795 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 15:29:02 -0800 Subject: [PATCH 18/24] update doc --- .../java/io/delta/kernel/TransactionCommitResult.java | 6 +++--- .../main/java/io/delta/kernel/hook/PostCommitHook.java | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 84ca4033fe5..92737f6b114 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -47,10 +47,10 @@ public long getVersion() { } /** - * Contains list of operations for kernel based connector to trigger. Connector could trigger the - * operations by calling {@link PostCommitHook#threadSafeInvoke(Engine)} in a sync or async way. + * Operations for connector to trigger post-commit. Usage: Call + * PostCommitHook#threadSafeInvoke(Engine) either sync or async in a separate thread. * - * @return list of operations to trigger after commit. + * @return list of post-commit operations */ public List getPostCommitHooks() { return postCommitHooks; diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index d4597cddb11..260b61c1dd8 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -28,12 +28,12 @@ public interface PostCommitHook { enum PostCommitHookType { /** - * Write a new checkpoint at the version committed by the txn. This hook is present when the - * table is ready for checkpoint according to its configured checkpoint interval. To perform - * this operation, previous checkpoint (if present) and logs after checkpoint will be read to - * construct new checkpoint. + * Writes a new checkpoint at the version committed by the transaction. This hook is present + * when the table is ready for checkpoint according to its configured checkpoint interval. To + * perform this operation, reading previous checkpoint + logs is required to construct a new + * checkpoint, with latency scaling based on log size (typically seconds to minutes). */ - CHECKPOINT, + CHECKPOINT } /** Invokes the post commit operation whose implementation must be thread safe. */ From c9428e27c6d11612e5953dffc084825191ae2288 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 15:31:57 -0800 Subject: [PATCH 19/24] update doc --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 92737f6b114..2b31d608912 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -47,8 +47,8 @@ public long getVersion() { } /** - * Operations for connector to trigger post-commit. Usage: Call - * PostCommitHook#threadSafeInvoke(Engine) either sync or async in a separate thread. + * Operations for connector to trigger post-commit. + * Usage: Call {@link PostCommitHook#threadSafeInvoke(Engine)} either sync or async in a separate thread. * * @return list of post-commit operations */ From c926f8590b66ae8e8c41de43f07a5d242417b67f Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 15:41:05 -0800 Subject: [PATCH 20/24] fix doc --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 2b31d608912..96460f2cbef 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -48,7 +48,9 @@ public long getVersion() { /** * Operations for connector to trigger post-commit. - * Usage: Call {@link PostCommitHook#threadSafeInvoke(Engine)} either sync or async in a separate thread. + * + *

Usage: Call {@link PostCommitHook#threadSafeInvoke(Engine)} either sync or async in a + * separate thread. * * @return list of post-commit operations */ From 6cf22c090539bfbb0a4bf9a5da7bde658f0a4962 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 16:02:30 -0800 Subject: [PATCH 21/24] full stop --- .../src/main/java/io/delta/kernel/hook/PostCommitHook.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java index 260b61c1dd8..173168ff1a0 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/hook/PostCommitHook.java @@ -22,7 +22,7 @@ /** * A hook for executing operation after a transaction commit. Hooks are added in the Transaction and * engine need to invoke the hook explicitly for executing the operation. Supported operations are - * listed in {@link PostCommitHookType} + * listed in {@link PostCommitHookType}. */ public interface PostCommitHook { From b7877817c2bd4b5b3438514185a776e702207201 Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 16:11:38 -0800 Subject: [PATCH 22/24] fix doc --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 96460f2cbef..91798875a41 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -49,8 +49,9 @@ public long getVersion() { /** * Operations for connector to trigger post-commit. * - *

Usage: Call {@link PostCommitHook#threadSafeInvoke(Engine)} either sync or async in a - * separate thread. + *

Usage: 1. Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread. 2. + * Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until operation + * ends. * * @return list of post-commit operations */ From b09a550bd55e8ff7e291d7a0fbaa2b458f6a778a Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 16:15:19 -0800 Subject: [PATCH 23/24] fix doc --- .../main/java/io/delta/kernel/TransactionCommitResult.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index 91798875a41..f1c62f203d9 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -49,9 +49,10 @@ public long getVersion() { /** * Operations for connector to trigger post-commit. * - *

Usage: 1. Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread. 2. - * Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until operation - * ends. + *

Usage: + *

  • Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread. + *
  • Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until operation + * ends. * * @return list of post-commit operations */ From d8ce4f53826b7e4f3ab256620e31cda980c07dcb Mon Sep 17 00:00:00 2001 From: Xin Huang Date: Wed, 5 Feb 2025 16:17:40 -0800 Subject: [PATCH 24/24] add itemize --- .../java/io/delta/kernel/TransactionCommitResult.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java index f1c62f203d9..f95f5b3a537 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java @@ -50,9 +50,12 @@ public long getVersion() { * Operations for connector to trigger post-commit. * *

    Usage: - *

  • Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread. - *
  • Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until operation - * ends. + * + *
      + *
    • Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread. + *
    • Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until + * operation ends. + *
    * * @return list of post-commit operations */