Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Generalize the actions after commit(like checkpoint) by introducing post commit action to kernel #4115

Merged
merged 24 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.cli.Options;

import io.delta.kernel.*;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType;
import io.delta.kernel.utils.*;
import static io.delta.kernel.examples.utils.Utils.parseArgs;

Expand Down Expand Up @@ -409,11 +412,17 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException {
// for every 10 versions.
for (int i = 0; i < 12; i++) {
TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath);
if (commitResult.isReadyForCheckpoint()) {
for(PostCommitHook hook: commitResult.getPostCommitHooks())
// Checkpoint the table
Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
didCheckpoint = true;
}
didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> {
// run the code async
try{
hook.threadSafeInvoke(engine);
} catch (IOException e) {
return false;
}
return hook.getType().equals(PostCommitHookType.CHECKPOINT);
}).join(); // wait async finish.
}

if (!didCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.utils.CloseableIterable;
import java.util.List;

/**
* Contains the result of a successful transaction commit. Returned by {@link
Expand All @@ -28,11 +30,11 @@
@Evolving
public class TransactionCommitResult {
private final long version;
private final boolean isReadyForCheckpoint;
private final List<PostCommitHook> postCommitHooks;

public TransactionCommitResult(long version, boolean isReadyForCheckpoint) {
public TransactionCommitResult(long version, List<PostCommitHook> postCommitHooks) {
this.version = version;
this.isReadyForCheckpoint = isReadyForCheckpoint;
this.postCommitHooks = postCommitHooks;
}

/**
Expand All @@ -45,13 +47,14 @@ public long getVersion() {
}

/**
* Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? If
* yes the connector can choose to checkpoint as the version the transaction is committed as using
* {@link Table#checkpoint(Engine, long)}
* Operations for connector to trigger post-commit.
*
* @return Is the table ready for checkpointing?
* <p>Usage: Call {@link PostCommitHook#threadSafeInvoke(Engine)} either sync or async in a
* separate thread.
*
* @return list of post-commit operations
*/
public boolean isReadyForCheckpoint() {
return isReadyForCheckpoint;
public List<PostCommitHook> getPostCommitHooks() {
return postCommitHooks;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.hook;

import io.delta.kernel.engine.Engine;
import java.io.IOException;

/**
* A hook for executing operation after a transaction commit. Hooks are added in the Transaction and
* engine need to invoke the hook explicitly for executing the operation. Supported operations are
* listed in {@link PostCommitHookType}.
*/
public interface PostCommitHook {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add more docs here declaring what type of work is considered a PostCommitHook? And how an engine should treat them? i.e. are they required, how do they relate to the commit, etc

Alternatively, maybe more thorough docs for each type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some doc, PTAL. Thanks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add what sort of operations and latency wise. So that the connector can choose to run it async.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the documentation to indicate supported operations and latency indication for checkpoint in below section.


enum PostCommitHookType {
/**
* Writes a new checkpoint at the version committed by the transaction. This hook is present
* when the table is ready for checkpoint according to its configured checkpoint interval. To
* perform this operation, reading previous checkpoint + logs is required to construct a new
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
*/
CHECKPOINT
}

/** Invokes the post commit operation whose implementation must be thread safe. */
void threadSafeInvoke(Engine engine) throws IOException;

PostCommitHookType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
Expand Down Expand Up @@ -354,7 +356,12 @@ private TransactionCommitResult doCommit(
"Write file actions to JSON log file `%s`",
FileNames.deltaFile(logPath, commitAsVersion));

return new TransactionCommitResult(commitAsVersion, isReadyForCheckpoint(commitAsVersion));
List<PostCommitHook> postCommitHooks = new ArrayList<>();
if (isReadyForCheckpoint(commitAsVersion)) {
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
}

return new TransactionCommitResult(commitAsVersion, postCommitHooks);
} catch (FileAlreadyExistsException e) {
throw e;
} catch (IOException ioe) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.hook;

import io.delta.kernel.Table;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.fs.Path;
import java.io.IOException;

/** Write a new checkpoint at the version committed by the txn. */
public class CheckpointHook implements PostCommitHook {

private final Path tablePath;
private final long checkpointVersion;

public CheckpointHook(Path tablePath, long checkpointVersion) {
this.tablePath = tablePath;
this.checkpointVersion = checkpointVersion;
}

@Override
public void threadSafeInvoke(Engine engine) throws IOException {
Table.forPath(engine, tablePath.toString()).checkpoint(engine, checkpointVersion);
}

@Override
public PostCommitHookType getType() {
return PostCommitHookType.CHECKPOINT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ import io.delta.kernel.internal.util.FileNames
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl}
import io.delta.kernel.utils.FileStatus
import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult}
import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row}
import io.delta.kernel.{
Meta,
Operation,
Table,
Transaction,
TransactionBuilder,
TransactionCommitResult
}
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch
import io.delta.kernel.expressions.Literal
import io.delta.kernel.expressions.Literal.ofInt
Expand All @@ -39,6 +46,7 @@ import io.delta.kernel.types.StructType
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
import io.delta.kernel.utils.CloseableIterator
import io.delta.kernel.Operation.CREATE_TABLE
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -139,10 +147,14 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
tablePath: String,
result: TransactionCommitResult,
expSize: Long): Unit = {
if (result.isReadyForCheckpoint) {
Table.forPath(engine, tablePath).checkpoint(engine, result.getVersion)
verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize)
}
result.getPostCommitHooks.forEach(
hook => {
if (hook.getType == PostCommitHookType.CHECKPOINT) {
hook.threadSafeInvoke(engine)
verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize)
}
}
)
}

/**
Expand Down Expand Up @@ -399,7 +411,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
expVersion: Long,
expIsReadyForCheckpoint: Boolean): Unit = {
assert(result.getVersion === expVersion)
assert(result.isReadyForCheckpoint === expIsReadyForCheckpoint)
assertCheckpointReadiness(result, expIsReadyForCheckpoint)
}

def verifyTableProperties(
Expand All @@ -421,4 +433,16 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
builder.append("]")
checkAnswer(resultProperties, Seq(builder.toString()).map(TestRow(_)))
}

def assertCheckpointReadiness(
txnResult: TransactionCommitResult,
isReadyForCheckpoint: Boolean): Unit = {
assert(
txnResult.getPostCommitHooks
.stream()
.anyMatch(
hook => hook.getType == PostCommitHookType.CHECKPOINT
) === isReadyForCheckpoint
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.delta.kernel.engine.Engine
import io.delta.kernel.exceptions._
import io.delta.kernel.expressions.Literal
import io.delta.kernel.expressions.Literal._
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement
import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames
import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
Expand Down Expand Up @@ -131,7 +132,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(!txnResult.isReadyForCheckpoint)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath = tablePath, version = 0)
verifyWrittenContent(tablePath, testSchema, Seq.empty)
Expand Down Expand Up @@ -350,7 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(!txnResult.isReadyForCheckpoint)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2"))
verifyWrittenContent(tablePath, schema, Seq.empty)
Expand All @@ -368,7 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(!txnResult.isReadyForCheckpoint)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath, version = 0)
verifyWrittenContent(tablePath, schema, Seq.empty)
Expand Down
Loading