Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Generalize the actions after commit(like checkpoint) by introducing post commit action to kernel #4115

Merged
merged 24 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.expressions.Literal;
import io.delta.kernel.hook.*;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType;
import io.delta.kernel.utils.*;
import static io.delta.kernel.examples.utils.Utils.parseArgs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@
import io.delta.kernel.engine.Engine;
import java.io.IOException;

/**
* A hook for executing operation after a transaction commit. Hooks are added in the Transaction and
* engine need to invoke the hook explicitly for executing the operation.
*/
public interface PostCommitHook {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add more docs here declaring what type of work is considered a PostCommitHook? And how an engine should treat them? i.e. are they required, how do they relate to the commit, etc

Alternatively, maybe more thorough docs for each type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some doc, PTAL. Thanks

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add what sort of operations and latency wise. So that the connector can choose to run it async.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the documentation to indicate supported operations and latency indication for checkpoint in below section.


enum PostCommitHookType {
// Write a new checkpoint at the version committed by the txn if required.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is maybe a little ambiguous as to when it is present (i.e. is it always present and only sometime checkpoints?)

Maybe something like "Write a new checkpoint at the version committed by the txn. This hook is present when the table is ready for checkpoint according to its configured checkpoint interval"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do /* */ comment for the enum

CHECKPOINT,
}

/** Invokes the post commit operation whose implementation must be thread safe. */
void threadSafeInvoke(Engine engine) throws IOException;

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.hook.CheckpointHook;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.hook.CheckpointHook;
import io.delta.kernel.internal.metrics.TransactionMetrics;
import io.delta.kernel.internal.metrics.TransactionReportImpl;
import io.delta.kernel.internal.replay.ConflictChecker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.hook;
package io.delta.kernel.internal.hook;

import io.delta.kernel.Table;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.hook.PostCommitHook;
import io.delta.kernel.internal.fs.Path;
import java.io.IOException;

/** Write a new checkpoint at the version committed by the txn if required. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Write a new checkpoint at the version committed by the txn if required. */
/** Write a new checkpoint at the version committed by the txn. */

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this hook is created, it is required

public class CheckpointHook implements PostCommitHook {

private final Path tablePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import io.delta.kernel.types.StructType
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
import io.delta.kernel.utils.CloseableIterator
import io.delta.kernel.Operation.CREATE_TABLE
import io.delta.kernel.hook.PostCommitHookType
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -411,13 +411,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
expVersion: Long,
expIsReadyForCheckpoint: Boolean): Unit = {
assert(result.getVersion === expVersion)
assert(
result.getPostCommitHooks
.stream()
.anyMatch(
hook => hook.getType == PostCommitHookType.CHECKPOINT
) === expIsReadyForCheckpoint
)
assertCheckpointReadiness(result, expIsReadyForCheckpoint)
}

def verifyTableProperties(
Expand All @@ -439,4 +433,16 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
builder.append("]")
checkAnswer(resultProperties, Seq(builder.toString()).map(TestRow(_)))
}

def assertCheckpointReadiness(
txnResult: TransactionCommitResult,
isReadyForCheckpoint: Boolean): Unit = {
assert(
txnResult.getPostCommitHooks
.stream()
.anyMatch(
hook => hook.getType == PostCommitHookType.CHECKPOINT
) === isReadyForCheckpoint
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.delta.kernel.engine.Engine
import io.delta.kernel.exceptions._
import io.delta.kernel.expressions.Literal
import io.delta.kernel.expressions.Literal._
import io.delta.kernel.hook.PostCommitHookType
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement
import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames
import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
Expand Down Expand Up @@ -132,12 +132,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(
!txnResult.getPostCommitHooks.stream()
.anyMatch(
hook => hook.getType == PostCommitHookType.CHECKPOINT
)
)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath = tablePath, version = 0)
verifyWrittenContent(tablePath, testSchema, Seq.empty)
Expand Down Expand Up @@ -356,11 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(
!txnResult.getPostCommitHooks
.stream()
.anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT)
)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2"))
verifyWrittenContent(tablePath, schema, Seq.empty)
Expand All @@ -378,11 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
val txnResult = txn.commit(engine, emptyIterable())

assert(txnResult.getVersion === 0)
assert(
!txnResult.getPostCommitHooks
.stream()
.anyMatch(hook => hook.getType == PostCommitHookType.CHECKPOINT)
)
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)

verifyCommitInfo(tablePath, version = 0)
verifyWrittenContent(tablePath, schema, Seq.empty)
Expand Down
Loading