Skip to content

Commit

Permalink
[Kernel]Cache the crc info for a snapshot if its version's checksum f…
Browse files Browse the repository at this point in the history
…ile is read (#4113)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [x] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Persist CRCInfo for a logreplay.java if its version's checksum file is
read. This will be used for calculate the new CRC info for a new commit.
No actual functional changes to loading crc.

Follow up PR will be #4116 which
introduces crc_simple post commit action to write CRC file
## How was this patch tested?
Unit test
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
huan233usc authored Feb 5, 2025
1 parent 3873570 commit 1d8f6ac
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down Expand Up @@ -153,6 +154,11 @@ public Map<String, DomainMetadata> getDomainMetadataMap() {
return logReplay.getDomainMetadataMap();
}

/** Returns the crc info for the current snapshot if the checksum file is read */
public Optional<CRCInfo> getCurrentCrcInfo() {
return logReplay.getCurrentCrcInfo();
}

public Metadata getMetadata() {
return metadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
private final LogSegment logSegment;
private final Tuple2<Protocol, Metadata> protocolAndMetadata;
private final Lazy<Map<String, DomainMetadata>> domainMetadataMap;
private final Optional<CRCInfo> currentCrcInfo;

public LogReplay(
Path logPath,
Expand All @@ -131,13 +132,19 @@ public LogReplay(
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
SnapshotMetrics snapshotMetrics) {
assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted());

assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted());
Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>> newerSnapshotHintAndCurrentCrcInfo =
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
engine, logSegment, snapshotHint, snapshotVersion);
this.currentCrcInfo = newerSnapshotHintAndCurrentCrcInfo._2;
this.dataPath = dataPath;
this.logSegment = logSegment;
this.protocolAndMetadata =
snapshotMetrics.loadInitialDeltaActionsTimer.time(
() -> loadTableProtocolAndMetadata(engine, logSegment, snapshotHint, snapshotVersion));
() ->
loadTableProtocolAndMetadata(
engine, logSegment, newerSnapshotHintAndCurrentCrcInfo._1, snapshotVersion));
// Lazy loading of domain metadata only when needed
this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine));
}
Expand Down Expand Up @@ -166,6 +173,11 @@ public long getVersion() {
return logSegment.getVersion();
}

/** Returns the crc info for the current snapshot if the checksum file is read */
public Optional<CRCInfo> getCurrentCrcInfo() {
return currentCrcInfo;
}

/**
* Returns an iterator of {@link FilteredColumnarBatch} representing all the active AddFiles in
* the table.
Expand Down Expand Up @@ -216,38 +228,6 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
return new Tuple2<>(snapshotHint.get().getProtocol(), snapshotHint.get().getMetadata());
}

// Snapshot hit is not use-able in this case for determine the lower bound.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
snapshotHint = Optional.empty();
}

long crcSearchLowerBound =
max(
asList(
// Prefer reading hint over CRC, so start listing from hint's version + 1.
snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1,
logSegment.getCheckpointVersionOpt().orElse(0L),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
if (crcInfoOpt.isPresent()) {
CRCInfo crcInfo = crcInfoOpt.get();
if (crcInfo.getVersion() == snapshotVersion) {
// CRC is related to the desired snapshot version. Load protocol and metadata from CRC.
return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata());
}
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint
snapshotHint =
Optional.of(
new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata()));
}

Protocol protocol = null;
Metadata metadata = null;

Expand Down Expand Up @@ -396,4 +376,52 @@ private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) {
throw new UncheckedIOException("Could not close iterator", ex);
}
}

/**
* Calculates the latest snapshot hint before or at the current snapshot version, returns the
* CRCInfo if checksum file at the current version is read
*/
private Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>>
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
Engine engine,
LogSegment logSegment,
Optional<SnapshotHint> snapshotHint,
long snapshotVersion) {

// Snapshot hint's version is current.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) {
return new Tuple2<>(snapshotHint, Optional.empty());
}

// Ignore the snapshot hint whose version is larger.
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
snapshotHint = Optional.empty();
}

long crcSearchLowerBound =
max(
asList(
// Prefer reading hint over CRC, so start listing from hint's version + 1,
// if hint is not present, list from version 0.
snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1,
logSegment.getCheckpointVersionOpt().orElse(0L),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
if (!crcInfoOpt.isPresent()) {
return new Tuple2<>(snapshotHint, Optional.empty());
}
CRCInfo crcInfo = crcInfoOpt.get();
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint, and return this
// crc info if it matches the current version.
return new Tuple2<>(
Optional.of(SnapshotHint.fromCrcInfo(crcInfo)),
crcInfo.getVersion() == snapshotVersion ? crcInfoOpt : Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@

import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.replay.CRCInfo;

/** Contains summary information of a {@link io.delta.kernel.Snapshot}. */
public class SnapshotHint {
private final long version;
private final Protocol protocol;
private final Metadata metadata;

/** Constructs a new SnapshotHint based on a CRCInfo */
public static SnapshotHint fromCrcInfo(CRCInfo crcInfo) {
return new SnapshotHint(crcInfo.getVersion(), crcInfo.getProtocol(), crcInfo.getMetadata());
}

public SnapshotHint(long version, Protocol protocol, Metadata metadata) {
this.version = version;
this.protocol = protocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils {
// Tests for loading P & M through checksums files //
/////////////////////////////////////////////////////////////////////////////////////////////////

Seq(-1L, 3L, 4L).foreach { version => // -1 means latest version
Seq(-1L, 0L, 3L, 4L).foreach { version => // -1 means latest version
test(s"checksum found at the read version: ${if (version == -1) "latest" else version}") {
withTempDirAndMetricsEngine { (path, engine) =>
// Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ import io.delta.kernel.internal.data.ScanStateRow
import io.delta.kernel.defaults.engine.DefaultEngine
import io.delta.kernel.defaults.utils.{TestRow, TestUtils}
import io.delta.kernel.Table
import io.delta.kernel.internal.fs.Path
import io.delta.kernel.internal.util.FileNames
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import java.nio.file.Files

class LogReplaySuite extends AnyFunSuite with TestUtils {

Expand Down Expand Up @@ -298,4 +303,60 @@ class LogReplaySuite extends AnyFunSuite with TestUtils {
assert(snapshotImpl.getLatestTransactionVersion(defaultEngine, "fakeAppId") === Optional.of(3L))
assert(!snapshotImpl.getLatestTransactionVersion(defaultEngine, "nonExistentAppId").isPresent)
}

test("current checksum read => snapshot provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(snapshot.getCurrentCrcInfo.isPresent)
val crcInfo = snapshot.getCurrentCrcInfo.get()
assert(crcInfo.getVersion == 1)
assert(crcInfo.getProtocol == snapshot.getProtocol)
assert(crcInfo.getMetadata == snapshot.getMetadata)
}
}

test("stale checksum read => snapshot doesn't provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
deleteChecksumFileForTable(tablePath, versions = Seq(1))

val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(!snapshot.getCurrentCrcInfo.isPresent)
}
}

test("no checksum read => snapshot doesn't provides crc info") {
withTempDir { tempFile =>
val tablePath = tempFile.getAbsolutePath
spark.sql(
s"CREATE TABLE delta.`$tablePath` USING DELTA AS " +
s"SELECT 0L as id"
)
spark.sql(
s"INSERT INTO delta.`$tablePath` SELECT 1L as id"
)
deleteChecksumFileForTable(tablePath, versions = Seq(0, 1))

val table = Table.forPath(defaultEngine, tablePath)
val snapshot = table.getLatestSnapshot(defaultEngine).asInstanceOf[SnapshotImpl]
assert(!snapshot.getCurrentCrcInfo.isPresent)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
import io.delta.kernel.types._
import io.delta.kernel.utils.CloseableIterator
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.shaded.org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{types => sparktypes}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.delta.util.FileNames
import org.scalatest.Assertions

trait TestUtils extends Assertions with SQLHelper {
Expand Down Expand Up @@ -717,4 +719,10 @@ trait TestUtils extends Assertions with SQLHelper {
}
resource.getFile
}

def deleteChecksumFileForTable(tablePath: String, versions: Seq[Int]): Unit =
versions.foreach(
v => Files.deleteIfExists(
new File(FileNames.checksumFile(new Path(s"$tablePath/_delta_log"), v).toString).toPath)
)
}

0 comments on commit 1d8f6ac

Please sign in to comment.