Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Implement basic CRC writer #4073

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum;

import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Utils.toCloseableIterator;

import io.delta.kernel.data.Row;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.data.GenericRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.types.LongType;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Writers for writing checksum files from a snapshot */
public class ChecksumWriter {

private static final Logger log = LoggerFactory.getLogger(ChecksumWriter.class);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We typically use logger

public static StructType CRC_FILE_SCHEMA =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this CRC_FILE_SCHEMA be stored in https://github.com/delta-io/delta/pull/4077/files#diff-897131313222a4b4add04e6c677dc523ce53481fb5e784d7058e9d550a12159a VersionStats

(although I'd like VersionStats to be renamed)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can have a FULL_SCHEMA (that we use for writing) and a READ_SCHEMA that we use for reading

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a ChecksumUtils.java for holding shared code path like FULL_SCHEMA. In the end we may also want to make read schema also the same as FULL_SCHEMA. I will do a refactor later.

new StructType()
.add("tableSizeBytes", LongType.LONG)
.add("numFiles", LongType.LONG)
.add("numMetadata", LongType.LONG)
.add("numProtocol", LongType.LONG)
.add("metadata", Metadata.FULL_SCHEMA)
.add("protocol", Protocol.FULL_SCHEMA)
.add("txnId", StringType.STRING, /*nullable*/ true);

private final Engine engine;
huan233usc marked this conversation as resolved.
Show resolved Hide resolved
private final Path logPath;

public ChecksumWriter(Engine engine, Path logPath) {
this.engine = engine;
this.logPath = logPath;
}

/**
* Writes a checksum file in a best-effort manner from a post commit snapshot, write will only
* process if all required fields, including NumFiles and TableSizeBytes, are collected.
*
* @return true if checksum file is successfully written, false otherwise.
*/
public boolean maybeWriteCheckSum(SnapshotHint postCommitSnapshot, String tnxId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some logs stmts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: txnId not tnxId

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the transaction id? do all transactions have one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah pretty sure this is optional (I think this is setTxnId?...)

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#version-checksum-file-schema

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is optional, changed the signature.

// No sufficient information to write checksum file.
if (!postCommitSnapshot.getNumFiles().isPresent()
|| !postCommitSnapshot.getTableSizeBytes().isPresent()) {
return false;
}
Path newChecksumPath = FileNames.checksumFile(logPath, postCommitSnapshot.getVersion());
try {
return wrapEngineExceptionThrowsIO(
() -> {
engine
.getJsonHandler()
.writeJsonFileAtomically(
newChecksumPath.toString(),
toCloseableIterator(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to use Utils.singletonCloseableIterator here instead (cleaner since that's exactly what we want)

Arrays.asList(buildCheckSumRow(postCommitSnapshot, tnxId)).iterator()),
false /* overwrite */);
return true;
},
"Write checksum file `%s`",
newChecksumPath);
} catch (IOException io) {
//
huan233usc marked this conversation as resolved.
Show resolved Hide resolved
log.error(String.format("Write checksum fails with error %s", io.getMessage()));
}
return false;
}

private Row buildCheckSumRow(SnapshotHint snapshot, String tnxId) {
checkArgument(snapshot.getTableSizeBytes().isPresent() && snapshot.getNumFiles().isPresent());
Map<Integer, Object> value = new HashMap<>();
value.put(CRC_FILE_SCHEMA.indexOf("tableSizeBytes"), snapshot.getTableSizeBytes().getAsLong());
value.put(CRC_FILE_SCHEMA.indexOf("numFiles"), snapshot.getNumFiles().getAsLong());
value.put(CRC_FILE_SCHEMA.indexOf("numMetadata"), 1L);
value.put(CRC_FILE_SCHEMA.indexOf("numProtocol"), 1L);
value.put(CRC_FILE_SCHEMA.indexOf("metadata"), snapshot.getMetadata().toRow());
value.put(CRC_FILE_SCHEMA.indexOf("protocol"), snapshot.getProtocol().toRow());
value.put(CRC_FILE_SCHEMA.indexOf("txnId"), tnxId);
return new GenericRow(CRC_FILE_SCHEMA, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,35 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import java.util.OptionalLong;

/** Contains summary information of a {@link io.delta.kernel.Snapshot}. */
public class SnapshotHint {
private final long version;
private final Protocol protocol;
private final Metadata metadata;
private final OptionalLong tableSizeBytes;
private final OptionalLong numFiles;

public SnapshotHint(long version, Protocol protocol, Metadata metadata) {
this.version = version;
huan233usc marked this conversation as resolved.
Show resolved Hide resolved
this.protocol = protocol;
this.metadata = metadata;
this.tableSizeBytes = OptionalLong.empty();
this.numFiles = OptionalLong.empty();
}

public SnapshotHint(
long version,
Protocol protocol,
Metadata metadata,
OptionalLong tableSizeBytes,
OptionalLong numFiles) {
this.version = version;
this.protocol = protocol;
this.metadata = metadata;
this.tableSizeBytes = tableSizeBytes;
this.numFiles = numFiles;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is metadata and protocol required non-null here? let's add requireNonNull to all these non-primitive args

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise we need null checks up in ChecksumWriter. buildCheckSumRow

}

public long getVersion() {
Expand All @@ -42,4 +60,12 @@ public Protocol getProtocol() {
public Metadata getMetadata() {
return metadata;
}

public OptionalLong getTableSizeBytes() {
return tableSizeBytes;
}

public OptionalLong getNumFiles() {
return numFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public static String deltaFile(Path path, long version) {
return String.format("%s/%020d.json", path, version);
}

/** Returns the path to the checksum file for the given version. */
public static Path checksumFile(Path path, long version) {
return new Path(path, String.format("%020d.crc", version));
}

/** Returns the version for the given delta path. */
public static long deltaVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.internal.checksum

import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue, Row}
import io.delta.kernel.internal.actions.{Format, Metadata, Protocol}
import io.delta.kernel.internal.data.GenericRow
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.snapshot.SnapshotHint
import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector
import io.delta.kernel.internal.util.VectorUtils
import io.delta.kernel.test.{BaseMockJsonHandler, MockEngineUtils}
import io.delta.kernel.types.{StringType, StructType}
import io.delta.kernel.utils.CloseableIterator
import org.scalatest.funsuite.AnyFunSuite

import java.util
import java.util.{Collections, HashMap, Optional, OptionalLong}

class ChecksumReadWriteSuite extends AnyFunSuite with MockEngineUtils {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests for numFiles or getTableSizeBytes aren't present ==> no checksum file is written

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also probably want to test that the entire write does not fail if the checksum write fails, but I think we can probably do this in the PR with the e2e integration

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also tests for when txnId is not present (let's make it optional as in the protocol)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done for numFiles or getTableSizeBytes aren't present ==> no checksum file is written and when txnId is not present.

for the second one I will make sure to add it in e2e tests.


private val FAKE_DELTA_LOG_PATH = new Path("/path/to/delta/log")

huan233usc marked this conversation as resolved.
Show resolved Hide resolved

test("basic checksum write") {
val jsonHandler = new MockCheckSumFileJsonWriter()
val checksumWriter =
new ChecksumWriter(mockEngine(jsonHandler = jsonHandler), FAKE_DELTA_LOG_PATH)
val protocol = createTestProtocol()
val metadata = createTestMetadata()
val snapshotHint = new SnapshotHint(
1,
protocol,
metadata,
OptionalLong.of(100),
OptionalLong.of(1))
checksumWriter.maybeWriteCheckSum(snapshotHint, "tnx")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate the schema first

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate the checksum file path that it was written to

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate that no additional rows were written to the file (only 1 row was written and it has the schema we expect)

assert(jsonHandler.capturedCrcRow.getLong(
ChecksumWriter.CRC_FILE_SCHEMA.indexOf("tableSizeBytes")) == 100L)
assert(jsonHandler.capturedCrcRow.getLong(
ChecksumWriter.CRC_FILE_SCHEMA.indexOf("numFiles")) == 1L)
assert(jsonHandler.capturedCrcRow.getLong(
ChecksumWriter.CRC_FILE_SCHEMA.indexOf("numMetadata")) == 1L)
assert(jsonHandler.capturedCrcRow.getLong(
ChecksumWriter.CRC_FILE_SCHEMA.indexOf("numProtocol")) == 1L)
assert(jsonHandler.capturedCrcRow.getString(
ChecksumWriter.CRC_FILE_SCHEMA.indexOf("txnId")) == "tnx")
checkMetadata(metadata,
jsonHandler.capturedCrcRow.getStruct(ChecksumWriter.CRC_FILE_SCHEMA.indexOf("metadata")))
checkProtocol(protocol,
jsonHandler.capturedCrcRow.getStruct(ChecksumWriter.CRC_FILE_SCHEMA.indexOf("protocol")))
}

def createTestMetadata(): Metadata = {
new Metadata(
"id",
Optional.of("name"),
Optional.of("description"),
new Format("parquet", Collections.emptyMap()),
"sss",
new StructType(),
new ArrayValue() { // partitionColumns
override def getSize = 1

override def getElements: ColumnVector = singletonStringColumnVector("c3")
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use VectorUtils.stringArrayValue here

Optional.of(123),
new MapValue() { // conf
override def getSize = 1

override def getKeys: ColumnVector = singletonStringColumnVector("delta.appendOnly")

override def getValues: ColumnVector =
singletonStringColumnVector("true")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here with VectorUtils.stringStringMapValue

)
}

def createTestProtocol(): Protocol = {
new Protocol(
/* minReaderVersion= */ 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use version 1, 2 not 0, 1.

we may assert in the future that these values are proper, in the protocol constructor; let's get ahead of that now

/* minWriterVersion= */ 1,
Collections.emptyList(),
Collections.emptyList()
)
}

def checkMetadata(metadata: Metadata, metadataRow: Row): Unit = {
assert(metadataRow.getSchema == Metadata.FULL_SCHEMA)
assert(metadataRow.getString(Metadata.FULL_SCHEMA.indexOf("id")) == metadata.getId)
assert(Optional.ofNullable(
metadataRow.getString(Metadata.FULL_SCHEMA.indexOf("name"))) == metadata.getName)
assert(Optional.ofNullable(metadataRow.getString(Metadata.FULL_SCHEMA.indexOf("description")))
== metadata.getDescription)
assert(
metadataRow.getStruct(
Metadata.FULL_SCHEMA.indexOf("format")
).getString(Format.FULL_SCHEMA.indexOf("provider")) == metadata.getFormat.getProvider)
assert(
metadataRow.getString(
Metadata.FULL_SCHEMA.indexOf("schemaString")) == metadata.getSchemaString
)
assert(metadataRow.getArray(
Metadata.FULL_SCHEMA.indexOf("partitionColumns")) == metadata.getPartitionColumns)
assert(Optional.ofNullable(metadataRow.getLong(
Metadata.FULL_SCHEMA.indexOf("createdTime"))) == metadata.getCreatedTime)
assert(VectorUtils.toJavaMap(metadataRow.getMap(
Metadata.FULL_SCHEMA.indexOf("configuration"))) == metadata.getConfiguration)
}

def checkProtocol(protocol: Protocol, protocolRow: Row): Unit = {
assert(protocolRow.getSchema == Protocol.FULL_SCHEMA)
assert(protocol.getMinReaderVersion ==
protocolRow.getInt(Protocol.FULL_SCHEMA.indexOf("minReaderVersion")))
assert(protocol.getMinWriterVersion ==
protocolRow.getInt(Protocol.FULL_SCHEMA.indexOf("minWriterVersion")))
}
}

class MockCheckSumFileJsonWriter extends BaseMockJsonHandler {
var capturedCrcRow: Row = new GenericRow(new StructType(), new util.HashMap[Integer, AnyRef]);

override def writeJsonFileAtomically(filePath: String,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: weird indentation. Please see https://github.com/databricks/scala-style-guide

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed scala format plugin locally and re-run

data: CloseableIterator[Row],
overwrite: Boolean): Unit = {
if (data.hasNext) capturedCrcRow = data.next()
}

}
Loading