Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of Simple crc write #4116

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,11 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException {
// for every 10 versions.
for (int i = 0; i < 12; i++) {
TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath);
if (commitResult.isReadyForCheckpoint()) {
for(PostCommitAction action: commitResult.getPostCommitActions())
if (action.getType().equals("checkpoint")) {
// Checkpoint the table
Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
didCheckpoint = true;
action.threadSafeInvoke();
didCheckpoint = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel;

import java.io.IOException;

public interface PostCommitAction {

void threadSafeInvoke() throws IOException;

String getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.utils.CloseableIterable;
import java.util.List;

/**
* Contains the result of a successful transaction commit. Returned by {@link
Expand All @@ -28,11 +29,11 @@
@Evolving
public class TransactionCommitResult {
private final long version;
private final boolean isReadyForCheckpoint;
private final List<PostCommitAction> postCommitActions;

public TransactionCommitResult(long version, boolean isReadyForCheckpoint) {
public TransactionCommitResult(long version, List<PostCommitAction> postCommitActions) {
this.version = version;
this.isReadyForCheckpoint = isReadyForCheckpoint;
this.postCommitActions = postCommitActions;
}

/**
Expand All @@ -44,14 +45,7 @@ public long getVersion() {
return version;
}

/**
* Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? If
* yes the connector can choose to checkpoint as the version the transaction is committed as using
* {@link Table#checkpoint(Engine, long)}
*
* @return Is the table ready for checkpointing?
*/
public boolean isReadyForCheckpoint() {
return isReadyForCheckpoint;
public List<PostCommitAction> getPostCommitActions() {
return postCommitActions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
Expand Down Expand Up @@ -153,6 +154,10 @@ public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

public Optional<CRCInfo> getCurrentCrcInfo() {
return logReplay.getCurrentCrcInfo();
}

public Metadata getMetadata() {
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.delta.kernel.exceptions.ConcurrentWriteException;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.checksum.ChecksumWriter;
import io.delta.kernel.internal.data.TransactionStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.TransactionMetrics;
Expand All @@ -36,6 +38,7 @@
import io.delta.kernel.internal.replay.ConflictChecker.TransactionRebaseState;
import io.delta.kernel.internal.rowtracking.RowTracking;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.TransactionMetricsResult;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterable;
Expand Down Expand Up @@ -354,7 +357,14 @@ private TransactionCommitResult doCommit(
"Write file actions to JSON log file `%s`",
FileNames.deltaFile(logPath, commitAsVersion));

return new TransactionCommitResult(commitAsVersion, isReadyForCheckpoint(commitAsVersion));
List<PostCommitAction> postCommitActions = new ArrayList<>();
if (isReadyForCheckpoint(commitAsVersion)) {
postCommitActions.add(checkpoint(engine, dataPath.toString(), commitAsVersion));
}
buildPostCommitCrcInfo(commitAsVersion, transactionMetrics.captureTransactionMetricsResult())
.ifPresent(crcInfo -> postCommitActions.add(checksumSimple(engine, crcInfo, logPath)));

return new TransactionCommitResult(commitAsVersion, postCommitActions);
} catch (FileAlreadyExistsException e) {
throw e;
} catch (IOException ioe) {
Expand All @@ -368,6 +378,63 @@ public boolean isBlindAppend() {
return true;
}

private static PostCommitAction checkpoint(Engine engine, String tablePath, long version) {
return new PostCommitAction() {
@Override
public void threadSafeInvoke() throws IOException {
Table.forPath(engine, tablePath).checkpoint(engine, version);
}

@Override
public String getType() {
return "checkpoint";
}
};
}

private static PostCommitAction checksumSimple(Engine engine, CRCInfo crcInfo, Path logPath) {
return new PostCommitAction() {
@Override
public void threadSafeInvoke() throws IOException {
new ChecksumWriter(logPath).writeCheckSum(engine, crcInfo);
}

@Override
public String getType() {
return "checksum_simple";
}
};
}

private Optional<CRCInfo> buildPostCommitCrcInfo(
long commitAtVersion, TransactionMetricsResult metricsResult) {
if (commitAtVersion == 0) {
return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
metricsResult.getTotalAddFilesSizeInBytes(),
metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}
// Retry or CRC is read for old version
if (!readSnapshot.getCurrentCrcInfo().isPresent()
|| commitAtVersion != readSnapshot.getCurrentCrcInfo().get().getVersion() + 1) {
return Optional.empty();
}

CRCInfo lastCrcInfo = readSnapshot.getCurrentCrcInfo().get();
return Optional.of(
new CRCInfo(
commitAtVersion,
metadata,
protocol,
lastCrcInfo.getTableSizeBytes() + metricsResult.getTotalAddFilesSizeInBytes(),
lastCrcInfo.getNumFiles() + metricsResult.getNumAddFiles(),
Optional.of(txnId.toString())));
}

/**
* Generates a timestamp which is greater than the commit timestamp of the readSnapshot. This can
* result in an additional file read and that this will only happen if ICT is enabled.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.checksum.ChecksumUtils.CRC_FILE_SCHEMA;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.types.StructType;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,31 +30,46 @@ public class CRCInfo {

public static Optional<CRCInfo> fromColumnarBatch(
long version, ColumnarBatch batch, int rowId, String crcFilePath) {
Protocol protocol = Protocol.fromColumnVector(batch.getColumnVector(PROTOCOL_ORDINAL), rowId);
Metadata metadata = Metadata.fromColumnVector(batch.getColumnVector(METADATA_ORDINAL), rowId);
Protocol protocol =
Protocol.fromColumnVector(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("protocol")), rowId);
Metadata metadata =
Metadata.fromColumnVector(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("metadata")), rowId);
long tableSizeBytes =
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("tableSizeBytes")).getLong(rowId);
long numFiles = batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("numFiles")).getLong(rowId);
Optional<String> txnId =
Optional.ofNullable(
batch.getColumnVector(CRC_FILE_SCHEMA.indexOf("txnId")).getString(rowId));
// protocol and metadata are nullable per fromColumnVector's implementation.
if (protocol == null || metadata == null) {
logger.warn("Invalid checksum file missing protocol and/or metadata: {}", crcFilePath);
return Optional.empty();
}
return Optional.of(new CRCInfo(version, metadata, protocol));
return Optional.of(new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txnId));
}

// We can add additional fields later
public static final StructType FULL_SCHEMA =
new StructType().add("protocol", Protocol.FULL_SCHEMA).add("metadata", Metadata.FULL_SCHEMA);

private static final int PROTOCOL_ORDINAL = 0;
private static final int METADATA_ORDINAL = 1;

private final long version;
private final Metadata metadata;
private final Protocol protocol;
private final long tableSizeBytes;
private final long numFiles;
private final Optional<String> txnId;

protected CRCInfo(long version, Metadata metadata, Protocol protocol) {
public CRCInfo(
long version,
Metadata metadata,
Protocol protocol,
long tableSizeBytes,
long numFiles,
Optional<String> txnId) {
this.version = version;
this.metadata = requireNonNull(metadata);
this.protocol = requireNonNull(protocol);
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
this.txnId = txnId;
}

/** The version of the Delta table that this CRCInfo represents. */
Expand All @@ -71,4 +86,16 @@ public Metadata getMetadata() {
public Protocol getProtocol() {
return protocol;
}

public long getNumFiles() {
return numFiles;
}

public long getTableSizeBytes() {
return tableSizeBytes;
}

public Optional<String> getTxnId() {
return txnId;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.replay;
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
Expand Down Expand Up @@ -96,7 +96,7 @@ private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath)
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
ChecksumUtils.CRC_FILE_SCHEMA,
Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;

/** Helper class for shared components in checksum file read and write */
public final class ChecksumUtils {

public static StructType CRC_FILE_SCHEMA =
new StructType()
.add("tableSizeBytes", LongType.LONG)
.add("numFiles", LongType.LONG)
.add("numMetadata", LongType.LONG)
.add("numProtocol", LongType.LONG)
.add("metadata", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA)
.add("txnId", StringType.STRING, /*nullable*/ true);

private ChecksumUtils() {}
}
Loading
Loading