diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java index 67f2add01f3..b32cc5af3e3 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java @@ -219,7 +219,7 @@ public static CloseableIterator listDeltaLogFilesAsIter( endVersionOpt); // Must be final to be used in lambda - final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false); + final AtomicBoolean hasReturnedlogOrCheckPoint = new AtomicBoolean(false); return listLogDir(engine, tablePath, startVersion) .breakableFilter( @@ -233,6 +233,9 @@ public static CloseableIterator listDeltaLogFilesAsIter( // Checkpoint files of 0 size are invalid but may be ignored silently when read, // hence we ignore them so that we never pick up such checkpoints. // Here, we do nothing (we will consume this file). + } else if (fileTypes.contains(DeltaLogFileType.CHECKSUM) + && FileNames.isChecksumFile(getName(fs.getPath()))) { + // Here, we do nothing (we will consume this file). } else { logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath()); return BreakableFilterResult.EXCLUDE; // Here, we exclude and filter out this file. @@ -251,7 +254,7 @@ public static CloseableIterator listDeltaLogFilesAsIter( final long endVersion = endVersionOpt.get(); if (fileVersion > endVersion) { - if (mustBeRecreatable && !hasReturnedAnElement.get()) { + if (mustBeRecreatable && !hasReturnedlogOrCheckPoint.get()) { final long earliestVersion = DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath); throw DeltaErrors.versionBeforeFirstAvailableCommit( @@ -266,7 +269,11 @@ public static CloseableIterator listDeltaLogFilesAsIter( } } - hasReturnedAnElement.set(true); + // Only log and checkpoint could use to construct table state. + if (FileNames.isCommitFile(getName(fs.getPath())) + || FileNames.isCheckpointFile(getName(fs.getPath()))) { + hasReturnedlogOrCheckPoint.set(true); + } return BreakableFilterResult.INCLUDE; }); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java index 314c456afe8..ac0b1fa2764 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ChecksumReader.java @@ -17,7 +17,6 @@ import static io.delta.kernel.internal.util.FileNames.*; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; -import static java.lang.Math.min; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.engine.Engine; @@ -25,7 +24,6 @@ import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; -import java.io.IOException; import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,90 +32,32 @@ public class ChecksumReader { private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class); - /** - * Load the CRCInfo from the checksum file at the given version. If the checksum file is not found - * at the given version, it will try to find the latest checksum file that is created at or after - * the lower bound version. - * - * @param engine the engine to use for reading the checksum file - * @param logPath the path to the Delta log - * @param targetedVersion the target version to read the checksum file from - * @param lowerBound the inclusive lower bound version to search for the checksum file - * @return Optional {@link CRCInfo} containing the protocol and metadata, and the version of the - * checksum file. If the checksum file is not found, it will return an empty - */ - public static Optional getCRCInfo( - Engine engine, Path logPath, long targetedVersion, long lowerBound) { - // lower bound should always smaller than the targetedVersion. - lowerBound = min(lowerBound, targetedVersion); - logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound); - // First try to load the CRC at given version. If not found or failed to read then try to - // find the latest CRC file that is created at or after the lower bound version. - Path crcFilePath = checksumFile(logPath, targetedVersion); - Optional crcInfoOpt = readChecksumFile(engine, crcFilePath); - if (crcInfoOpt.isPresent() - || - // we don't expect any more checksum files as it is the first version - targetedVersion == 0 - || targetedVersion == lowerBound) { - return crcInfoOpt; - } - logger.info( - "CRC file for version {} not found, listing CRC files from version {}", - targetedVersion, - lowerBound); - - Path lowerBoundFilePath = checksumFile(logPath, lowerBound); - try (CloseableIterator crcFiles = - engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) { - List crcFilesList = - crcFiles - .filter(file -> isChecksumFile(file.getPath())) - .takeWhile(file -> checksumVersion(new Path(file.getPath())) <= targetedVersion) - .toInMemoryList(); - - // pick the last file which is the latest version that has the CRC file - if (crcFilesList.isEmpty()) { - logger.warn("No checksum files found in the range {} to {}", lowerBound, targetedVersion); - return Optional.empty(); - } - - FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1); - return readChecksumFile(engine, new Path(latestCRCFile.getPath())); - } catch (IOException e) { - logger.warn("Failed to list checksum files from {}", lowerBoundFilePath, e); - return Optional.empty(); - } - } - - private static Optional readChecksumFile(Engine engine, Path filePath) { + public static Optional getCRCInfo(Engine engine, FileStatus checkSumFile) { try (CloseableIterator iter = engine .getJsonHandler() .readJsonFiles( - singletonCloseableIterator(FileStatus.of(filePath.toString())), - CRCInfo.FULL_SCHEMA, - Optional.empty())) { + singletonCloseableIterator(checkSumFile), CRCInfo.FULL_SCHEMA, Optional.empty())) { // We do this instead of iterating through the rows or using `getSingularRow` so we // can use the existing fromColumnVector methods in Protocol, Metadata, Format etc if (!iter.hasNext()) { - logger.warn("Checksum file is empty: {}", filePath); + logger.warn("Checksum file is empty: {}", checkSumFile.getPath()); return Optional.empty(); } ColumnarBatch batch = iter.next(); if (batch.getSize() != 1) { String msg = "Expected exactly one row in the checksum file {}, found {} rows"; - logger.warn(msg, filePath, batch.getSize()); + logger.warn(msg, checkSumFile.getPath(), batch.getSize()); return Optional.empty(); } - long crcVersion = FileNames.checksumVersion(filePath); + long crcVersion = FileNames.checksumVersion(new Path(checkSumFile.getPath())); - return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, filePath.toString()); + return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, checkSumFile.getPath()); } catch (Exception e) { // This can happen when the version does not have a checksum file - logger.warn("Failed to read checksum file {}", filePath, e); + logger.warn("Failed to read checksum file {}", checkSumFile.getPath(), e); return Optional.empty(); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index e3d0213884e..f5a78e5ba2d 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -35,6 +35,7 @@ import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; import io.delta.kernel.internal.util.DomainMetadataUtils; +import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.StringType; import io.delta.kernel.types.StructType; @@ -217,18 +218,25 @@ protected Tuple2 loadTableProtocolAndMetadata( snapshotHint = Optional.empty(); } - long crcSearchLowerBound = + long crcReadLowerBound = max( asList( // Prefer reading hint over CRC, so start listing from hint's version + 1. snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1, logSegment.getCheckpointVersionOpt().orElse(0L), // Only find the CRC within 100 versions. - snapshotVersion - 100, - 0L)); + snapshotVersion - 100)); Optional crcInfoOpt = - ChecksumReader.getCRCInfo( - engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound); + logSegment + .getLatestChecksum() + .flatMap( + checksum -> { + if (FileNames.getFileVersion(new Path(checksum.getPath())) < crcReadLowerBound) { + return Optional.empty(); + } + return ChecksumReader.getCRCInfo(engine, checksum); + }); + if (crcInfoOpt.isPresent()) { CRCInfo crcInfo = crcInfoOpt.get(); if (crcInfo.getVersion() == snapshotVersion) { @@ -236,7 +244,7 @@ protected Tuple2 loadTableProtocolAndMetadata( return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata()); } checkArgument( - crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion); + crcInfo.getVersion() >= crcReadLowerBound && crcInfo.getVersion() <= snapshotVersion); // We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion) // but (b) newer than the current hint. Use this CRCInfo to create a new hint snapshotHint = diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java index 5689ffcfa5f..a8a377d7e73 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/LogSegment.java @@ -38,7 +38,8 @@ public class LogSegment { ////////////////////////////////// public static LogSegment empty(Path logPath) { - return new LogSegment(logPath, -1, Collections.emptyList(), Collections.emptyList(), -1); + return new LogSegment( + logPath, -1, Collections.emptyList(), Collections.emptyList(), Optional.empty(), -1); } ////////////////////////////////// @@ -50,6 +51,7 @@ public static LogSegment empty(Path logPath) { private final List deltas; private final List checkpoints; private final Optional checkpointVersionOpt; + private final Optional latestChecksum; private final long lastCommitTimestamp; private final Lazy> allFiles; private final Lazy> allFilesReversed; @@ -84,6 +86,7 @@ public LogSegment( long version, List deltas, List checkpoints, + Optional latestChecksum, long lastCommitTimestamp) { /////////////////////// @@ -161,6 +164,7 @@ public LogSegment( this.version = version; this.deltas = deltas; this.checkpoints = checkpoints; + this.latestChecksum = latestChecksum; this.lastCommitTimestamp = lastCommitTimestamp; this.allFiles = @@ -245,6 +249,7 @@ public String toString() { + " version=%d,\n" + " deltas=[%s\n ],\n" + " checkpoints=[%s\n ],\n" + + " latestChecksum=%s,\n" + " checkpointVersion=%s,\n" + " lastCommitTimestamp=%d\n" + "}", @@ -252,6 +257,7 @@ public String toString() { version, formatList(deltas), formatList(checkpoints), + latestChecksum.map(FileStatus::toString).orElse("None"), checkpointVersionOpt.map(String::valueOf).orElse("None"), lastCommitTimestamp); } @@ -263,4 +269,8 @@ private String formatList(List list) { return "\n " + list.stream().map(FileStatus::toString).collect(Collectors.joining(",\n ")); } + + public Optional getLatestChecksum() { + return latestChecksum; + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 40d8bb411e3..611ba7f0fd5 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -346,7 +346,11 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional versionT final List listedFileStatuses = DeltaLogActionUtils.listDeltaLogFilesAsIter( engine, - new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)), + new HashSet<>( + Arrays.asList( + DeltaLogFileType.COMMIT, + DeltaLogFileType.CHECKPOINT, + DeltaLogFileType.CHECKSUM)), tablePath, listFromStartVersion, versionToLoadOpt, @@ -388,10 +392,16 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional versionT listedFileStatuses, fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName())); final List listedCheckpointFileStatuses = listedCheckpointAndDeltaFileStatuses._1; - final List listedDeltaFileStatuses = listedCheckpointAndDeltaFileStatuses._2; + final Tuple2, List> listedDeltaFileCheckSumStatuses = + ListUtils.partition( + listedCheckpointAndDeltaFileStatuses._2, + fileStatus -> FileNames.isCommitFile(new Path(fileStatus.getPath()).getName())); + final List listedDeltaFileStatuses = listedDeltaFileCheckSumStatuses._1; + final List listedChecksumFileStatues = listedDeltaFileCheckSumStatuses._2; logDebugFileStatuses("listedCheckpointFileStatuses", listedCheckpointFileStatuses); logDebugFileStatuses("listedDeltaFileStatuses", listedDeltaFileStatuses); + logDebugFileStatuses("listedCheckSumFileStatuses", listedChecksumFileStatues); ///////////////////////////////////////////////////////////////////////////////////////////// // Step 6: Determine the latest complete checkpoint version. The intuition here is that we // @@ -562,6 +572,15 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional versionT newVersion, deltasAfterCheckpoint, latestCompleteCheckpointFileStatuses, + listedChecksumFileStatues.stream() + .max( + new Comparator() { + public int compare(FileStatus file1, FileStatus file2) { + return Long.compare( + FileNames.getFileVersion(new Path(file1.getPath())), + FileNames.getFileVersion(new Path(file2.getPath()))); + } + }), lastCommitTimestamp); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java index a447b7eac07..a035aed3682 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/FileNames.java @@ -31,7 +31,8 @@ private FileNames() {} public enum DeltaLogFileType { COMMIT, - CHECKPOINT + CHECKPOINT, + CHECKSUM } /** Example: 00000000000000000001.json */ diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala index 739ee67f2a0..eeb8e327fec 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/LogSegmentSuite.scala @@ -16,16 +16,15 @@ package io.delta.kernel.internal.snapshot -import java.util.Collections - +import java.util.{Collections, Optional} import scala.collection.JavaConverters._ - import io.delta.kernel.test.MockFileSystemClientUtils import io.delta.kernel.utils.FileStatus import org.scalatest.funsuite.AnyFunSuite class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { private val checkpointFs10List = singularCheckpointFileStatuses(Seq(10)).toList.asJava + private val checksumAtVersion10 = checksumFileStatus(10) private val deltaFs11List = deltaFileStatuses(Seq(11)).toList.asJava private val deltaFs12List = deltaFileStatuses(Seq(12)).toList.asJava private val deltasFs11To12List = deltaFileStatuses(Seq(11, 12)).toList.asJava @@ -39,74 +38,76 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { } test("constructor -- valid case (non-empty)") { - new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, 1) + new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, Optional.empty(), 1) } test("constructor -- null arguments => throw") { // logPath is null intercept[NullPointerException] { - new LogSegment(null, 1, Collections.emptyList(), Collections.emptyList(), -1) + new LogSegment( + null, 1, Collections.emptyList(), Collections.emptyList(), Optional.empty(), -1) } // deltas is null intercept[NullPointerException] { - new LogSegment(logPath, 1, null, Collections.emptyList(), -1) + new LogSegment(logPath, 1, null, Collections.emptyList(), Optional.empty(), -1) } // checkpoints is null intercept[NullPointerException] { - new LogSegment(logPath, 1, Collections.emptyList(), null, -1) + new LogSegment(logPath, 1, Collections.emptyList(), null, Optional.empty(), -1) } } test("constructor -- non-empty deltas or checkpoints with version -1 => throw") { val exMsg1 = intercept[IllegalArgumentException] { - new LogSegment(logPath, -1, deltasFs11To12List, Collections.emptyList(), 1) + new LogSegment(logPath, -1, deltasFs11To12List, Collections.emptyList(), Optional.empty(), 1) }.getMessage assert(exMsg1 === "Version -1 should have no files") val exMsg2 = intercept[IllegalArgumentException] { - new LogSegment(logPath, -1, Collections.emptyList(), checkpointFs10List, 1) + new LogSegment(logPath, -1, Collections.emptyList(), checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg2 === "Version -1 should have no files") } test("constructor -- all deltas must be actual delta files") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 12, badJsonsList, checkpointFs10List, 1) + new LogSegment(logPath, 12, badJsonsList, checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg === "deltas must all be actual delta (commit) files") } test("constructor -- all checkpoints must be actual checkpoint files") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 12, deltasFs11To12List, badCheckpointsList, 1) + new LogSegment(logPath, 12, deltasFs11To12List, badCheckpointsList, Optional.empty(), 1) }.getMessage assert(exMsg === "checkpoints must all be actual checkpoint files") } test("constructor -- if version >= 0 then both deltas and checkpoints cannot be empty") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 12, Collections.emptyList(), Collections.emptyList(), 1) + new LogSegment( + logPath, 12, Collections.emptyList(), Collections.emptyList(), Optional.empty(), 1) }.getMessage assert(exMsg === "No files to read") } test("constructor -- if deltas non-empty then first delta must equal checkpointVersion + 1") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 12, deltaFs12List, checkpointFs10List, 1) + new LogSegment(logPath, 12, deltaFs12List, checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg === "First delta file version must equal checkpointVersion + 1") } test("constructor -- if deltas non-empty then last delta must equal version") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 12, deltaFs11List, checkpointFs10List, 1) + new LogSegment(logPath, 12, deltaFs11List, checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg === "Last delta file version must equal the version of this LogSegment") } test("constructor -- if no deltas then checkpointVersion must equal version") { val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 11, Collections.emptyList(), checkpointFs10List, 1) + new LogSegment(logPath, 11, Collections.emptyList(), checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg === "If there are no deltas, then checkpointVersion must equal the version of this LogSegment") @@ -115,7 +116,7 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { test("constructor -- deltas not contiguous") { val deltas = deltaFileStatuses(Seq(11, 13)).toList.asJava val exMsg = intercept[IllegalArgumentException] { - new LogSegment(logPath, 13, deltas, checkpointFs10List, 1) + new LogSegment(logPath, 13, deltas, checkpointFs10List, Optional.empty(), 1) }.getMessage assert(exMsg === "Delta versions must be contiguous: [11, 13]") } @@ -123,23 +124,39 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { test("isComplete") { { // case 1: checkpoint and deltas => complete - val logSegment = new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, 1) + val logSegment = + new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, Optional.empty(), 1) assert(logSegment.isComplete) } { // case 2: checkpoint only => complete - val logSegment = new LogSegment(logPath, 10, Collections.emptyList(), checkpointFs10List, 1) + val logSegment = new LogSegment( + logPath, + 10, + Collections.emptyList(), + checkpointFs10List, + Optional.empty(), + 1 + ) assert(logSegment.isComplete) } { // case 3: deltas from 0 to N with no checkpoint => complete val deltaFiles = deltaFileStatuses((0L to 17L)).toList.asJava - val logSegment = new LogSegment(logPath, 17, deltaFiles, Collections.emptyList(), 1) + val logSegment = + new LogSegment(logPath, 17, deltaFiles, Collections.emptyList(), Optional.empty(), 1) assert(logSegment.isComplete) } { // case 4: just deltas from 11 to 12 with no checkpoint => incomplete - val logSegment = new LogSegment(logPath, 12, deltasFs11To12List, Collections.emptyList(), 1) + val logSegment = new LogSegment( + logPath, + 12, + deltasFs11To12List, + Collections.emptyList(), + Optional.empty(), + 1 + ) assert(!logSegment.isComplete) } { @@ -149,7 +166,8 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { } test("toString") { - val logSegment = new LogSegment(logPath, 12, deltasFs11To12List, checkpointFs10List, 1) + val logSegment = new LogSegment( + logPath, 12, deltasFs11To12List, checkpointFs10List, Optional.of(checksumAtVersion10), 1) // scalastyle:off line.size.limit val expectedToString = """LogSegment { @@ -162,10 +180,12 @@ class LogSegmentSuite extends AnyFunSuite with MockFileSystemClientUtils { | checkpoints=[ | FileStatus{path='/fake/path/to/table/_delta_log/00000000000000000010.checkpoint.parquet', size=10, modificationTime=100} | ], + | latestChecksum=FileStatus{path='/fake/path/to/table/_delta_log/00000000000000000010.crc', size=10, modificationTime=10}, | checkpointVersion=10, | lastCommitTimestamp=1 |}""".stripMargin // scalastyle:on line.size.limit assert(logSegment.toString === expectedToString) } + } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala index a40e55093ae..e0abd71b8e1 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala @@ -44,6 +44,11 @@ trait MockFileSystemClientUtils extends MockEngineUtils { deltaVersions.map(v => FileStatus.of(FileNames.deltaFile(logPath, v), v, v*10)) } + /** Checksum file status for given a version */ + def checksumFileStatus(deltaVersion: Long): FileStatus = { + FileStatus.of(FileNames.checksumFile(logPath, deltaVersion).toString, 10, 10) + } + /** Checkpoint file statuses where the timestamp = 10*version */ def singularCheckpointFileStatuses(checkpointVersions: Seq[Long]): Seq[FileStatus] = { assert(checkpointVersions.size == checkpointVersions.toSet.size) diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala index df280a7001a..bee9ea53e51 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/LogReplayEngineMetricsSuite.scala @@ -388,7 +388,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { test(s"checksum found at the read version: ${if (version == -1) "latest" else version}") { withTempDirAndMetricsEngine { (path, engine) => // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet - buildTableWithCrc(path) + buildTableWithCrc(path) val table = Table.forPath(engine, path) loadPandMCheckMetrics( @@ -412,23 +412,23 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { buildTableWithCrc(path) Seq(10L, 11L, 5L, 6L).foreach { version => assert( - Files.deleteIfExists( - new File( - FileNames.checksumFile(new Path(f"$path/_delta_log"), version).toString - ).toPath - ) + Files.deleteIfExists( + new File( + FileNames.checksumFile(new Path(f"$path/_delta_log"), version).toString + ).toPath ) + ) } loadPandMCheckMetrics( Table.forPath(engine, path) .getLatestSnapshot(engine).getSchema(), engine, - // 10.checkpoint found, so use it and combined with 11.crc - expJsonVersionsRead = Seq(11), - expParquetVersionsRead = Seq(10), - expParquetReadSetSizes = Seq(1), - expChecksumReadSet = Seq(11) + // 10.checkpoint found, so use it and combined with 11.crc + expJsonVersionsRead = Seq(11), + expParquetVersionsRead = Seq(10), + expParquetReadSetSizes = Seq(1), + expChecksumReadSet = Nil ) loadPandMCheckMetrics( @@ -442,9 +442,8 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { expJsonVersionsRead = Seq(6, 5), expParquetVersionsRead = Nil, expParquetReadSetSizes = Nil, - // First attempted to read checksum for version 6, then we do a listing of - // last 100 crc files and read the latest one which is version 4 (as version 5 is deleted) - expChecksumReadSet = Seq(6, 4)) + // Latest checkpoint + expChecksumReadSet = Seq(4)) // now try to load version 3 and it should get P&M from checksum files only @@ -465,11 +464,11 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet buildTableWithCrc(path) (3 to 6).foreach { version => - assert( - Files.deleteIfExists( - new File(FileNames.checksumFile( - new Path(f"$path/_delta_log"), version).toString).toPath - ) + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile( + new Path(f"$path/_delta_log"), version).toString).toPath + ) ) } @@ -487,7 +486,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { // First attempted to read checksum for version 4 which doesn't exists, // then we do a listing of last 100 crc files and read the latest // one which is version 2 (as version 3-6 are deleted) - expChecksumReadSet = Seq(4, 2)) + expChecksumReadSet = Seq(2)) // read version 4 which sets the snapshot P&M hint to 4 // now try to load version 6 and we expect no checksums are read @@ -501,7 +500,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { // First we attempt to read at version 6, then we do a listing of last 100 crc files // bound by the snapshot hint which is at version 4 and we don't try to read checksums // beyond version 4 - expChecksumReadSet = Seq(6) + expChecksumReadSet = Nil ) } } @@ -536,18 +535,18 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { val logPath = f"$path/_delta_log" assert( Files.exists( - new File( - FileNames - .checkpointFileSingular(new Path(logPath), checkpointVersion) - .toString - ).toPath - ) + new File( + FileNames + .checkpointFileSingular(new Path(logPath), checkpointVersion) + .toString + ).toPath ) - assert( - Files.deleteIfExists( - new File(FileNames.checksumFile(new Path(logPath), checkpointVersion).toString).toPath - ) + ) + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile(new Path(logPath), checkpointVersion).toString).toPath ) + ) val table = Table.forPath(engine, path) @@ -559,14 +558,14 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { expJsonVersionsRead = Nil, expParquetVersionsRead = Seq(10), expParquetReadSetSizes = Seq(1), - expChecksumReadSet = Seq(10) + expChecksumReadSet = Nil ) } } test( "checksum missing read version and the previous version, " + - "checkpoint exists the read version the previous version => use checkpoint" + "checkpoint exists the read version the previous version => use checkpoint" ) { withTempDirAndMetricsEngine { (path, engine) => // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet @@ -597,21 +596,21 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { val table = Table.forPath(engine, path) // 11.crc, 10.crc missing, 10.checkpoint.parquet exists. - // Attempt to read 11.crc fails and read 10.checkpoint.parquet and 11.json succeeds. + // Read 10.checkpoint.parquet and 11.json succeeds. loadPandMCheckMetrics( table.getSnapshotAsOfVersion(engine, 11).getSchema(), engine, expJsonVersionsRead = Seq(11), expParquetVersionsRead = Seq(10), expParquetReadSetSizes = Seq(1), - expChecksumReadSet = Seq(11) + expChecksumReadSet = Nil ) } } test( "checksum missing read version, " + - "both checksum and checkpoint exist the read version the previous version => use checksum" + "both checksum and checkpoint exist the read version the previous version => use checksum" ) { withTempDirAndMetricsEngine { (path, engine) => // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet @@ -645,7 +644,7 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { expJsonVersionsRead = Seq(11), expParquetVersionsRead = Nil, expParquetReadSetSizes = Nil, - expChecksumReadSet = Seq(11, 10) + expChecksumReadSet = Seq(10) ) } } @@ -679,14 +678,45 @@ class LogReplayEngineMetricsSuite extends AnyFunSuite with TestUtils { } } - // Produce a test table with 0 to 11 .json, 0 to 11.crc, 10.checkpoint.parquet - def buildTableWithCrc(path: String): Unit = { + test("crc found at older than read version - 100 => full log replay") { + withTempDirAndMetricsEngine { (path, engine) => + buildTableWithCrc(path, 100) + (1 to 101).foreach { version => + assert( + Files.deleteIfExists( + new File(FileNames.checksumFile(new Path(f"$path/_delta_log"), version).toString).toPath + ) + ) + if (version % 10 == 0) { + assert( + Files.deleteIfExists( + new File( + FileNames.checkpointFileSingular(new Path(f"$path/_delta_log"), version).toString + ).toPath + ) + ) + } + } + val table = Table.forPath(engine, path) + + loadPandMCheckMetrics( + table.getSnapshotAsOfVersion(engine, 101).getSchema(), + engine, + expJsonVersionsRead = (0L to 101L).reverse, + expParquetVersionsRead = Nil, + expParquetReadSetSizes = Nil, + expChecksumReadSet = Nil + ) + } + } + + def buildTableWithCrc(path: String, numOfWrite: Int = 10): Unit = { withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true") { spark.sql( s"CREATE TABLE delta.`$path` USING DELTA AS " + - s"SELECT 0L as id" + s"SELECT 0L as id" ) - for (_ <- 0 to 10) { appendCommit(path) } + for (_ <- 0 to numOfWrite) { appendCommit(path) } } } }