Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel]Remove two file IO in CRC loading by reusing log listing #4112

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
endVersionOpt);

// Must be final to be used in lambda
final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);
final AtomicBoolean hasReturnedlogOrCheckPoint = new AtomicBoolean(false);

return listLogDir(engine, tablePath, startVersion)
.breakableFilter(
Expand All @@ -233,6 +233,9 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
// Checkpoint files of 0 size are invalid but may be ignored silently when read,
// hence we ignore them so that we never pick up such checkpoints.
// Here, we do nothing (we will consume this file).
} else if (fileTypes.contains(DeltaLogFileType.CHECKSUM)
&& FileNames.isChecksumFile(getName(fs.getPath()))) {
// Here, we do nothing (we will consume this file).
} else {
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
return BreakableFilterResult.EXCLUDE; // Here, we exclude and filter out this file.
Expand All @@ -251,7 +254,7 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
final long endVersion = endVersionOpt.get();

if (fileVersion > endVersion) {
if (mustBeRecreatable && !hasReturnedAnElement.get()) {
if (mustBeRecreatable && !hasReturnedlogOrCheckPoint.get()) {
final long earliestVersion =
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
throw DeltaErrors.versionBeforeFirstAvailableCommit(
Expand All @@ -266,7 +269,11 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
}
}

hasReturnedAnElement.set(true);
// Only log and checkpoint could use to construct table state.
if (FileNames.isCommitFile(getName(fs.getPath()))
|| FileNames.isCheckpointFile(getName(fs.getPath()))) {
hasReturnedlogOrCheckPoint.set(true);
}

return BreakableFilterResult.INCLUDE;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

import static io.delta.kernel.internal.util.FileNames.*;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
import static java.lang.Math.min;

import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import java.io.IOException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,90 +32,32 @@
public class ChecksumReader {
private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class);

/**
* Load the CRCInfo from the checksum file at the given version. If the checksum file is not found
* at the given version, it will try to find the latest checksum file that is created at or after
* the lower bound version.
*
* @param engine the engine to use for reading the checksum file
* @param logPath the path to the Delta log
* @param targetedVersion the target version to read the checksum file from
* @param lowerBound the inclusive lower bound version to search for the checksum file
* @return Optional {@link CRCInfo} containing the protocol and metadata, and the version of the
* checksum file. If the checksum file is not found, it will return an empty
*/
public static Optional<CRCInfo> getCRCInfo(
Engine engine, Path logPath, long targetedVersion, long lowerBound) {
// lower bound should always smaller than the targetedVersion.
lowerBound = min(lowerBound, targetedVersion);
logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound);
// First try to load the CRC at given version. If not found or failed to read then try to
// find the latest CRC file that is created at or after the lower bound version.
Path crcFilePath = checksumFile(logPath, targetedVersion);
Optional<CRCInfo> crcInfoOpt = readChecksumFile(engine, crcFilePath);
if (crcInfoOpt.isPresent()
||
// we don't expect any more checksum files as it is the first version
targetedVersion == 0
|| targetedVersion == lowerBound) {
return crcInfoOpt;
}
logger.info(
"CRC file for version {} not found, listing CRC files from version {}",
targetedVersion,
lowerBound);

Path lowerBoundFilePath = checksumFile(logPath, lowerBound);
try (CloseableIterator<FileStatus> crcFiles =
engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) {
List<FileStatus> crcFilesList =
crcFiles
.filter(file -> isChecksumFile(file.getPath()))
.takeWhile(file -> checksumVersion(new Path(file.getPath())) <= targetedVersion)
.toInMemoryList();

// pick the last file which is the latest version that has the CRC file
if (crcFilesList.isEmpty()) {
logger.warn("No checksum files found in the range {} to {}", lowerBound, targetedVersion);
return Optional.empty();
}

FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1);
return readChecksumFile(engine, new Path(latestCRCFile.getPath()));
} catch (IOException e) {
logger.warn("Failed to list checksum files from {}", lowerBoundFilePath, e);
return Optional.empty();
}
}

private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath) {
public static Optional<CRCInfo> getCRCInfo(Engine engine, FileStatus checkSumFile) {
try (CloseableIterator<ColumnarBatch> iter =
engine
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(FileStatus.of(filePath.toString())),
CRCInfo.FULL_SCHEMA,
Optional.empty())) {
singletonCloseableIterator(checkSumFile), CRCInfo.FULL_SCHEMA, Optional.empty())) {
// We do this instead of iterating through the rows or using `getSingularRow` so we
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
if (!iter.hasNext()) {
logger.warn("Checksum file is empty: {}", filePath);
logger.warn("Checksum file is empty: {}", checkSumFile.getPath());
return Optional.empty();
}

ColumnarBatch batch = iter.next();
if (batch.getSize() != 1) {
String msg = "Expected exactly one row in the checksum file {}, found {} rows";
logger.warn(msg, filePath, batch.getSize());
logger.warn(msg, checkSumFile.getPath(), batch.getSize());
return Optional.empty();
}

long crcVersion = FileNames.checksumVersion(filePath);
long crcVersion = FileNames.checksumVersion(new Path(checkSumFile.getPath()));

return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, filePath.toString());
return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, checkSumFile.getPath());
} catch (Exception e) {
// This can happen when the version does not have a checksum file
logger.warn("Failed to read checksum file {}", filePath, e);
logger.warn("Failed to read checksum file {}", checkSumFile.getPath(), e);
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import io.delta.kernel.internal.util.DomainMetadataUtils;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.types.StringType;
import io.delta.kernel.types.StructType;
Expand Down Expand Up @@ -217,26 +218,33 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(
snapshotHint = Optional.empty();
}

long crcSearchLowerBound =
long crcReadLowerBound =
max(
asList(
// Prefer reading hint over CRC, so start listing from hint's version + 1.
snapshotHint.map(SnapshotHint::getVersion).orElse(0L) + 1,
logSegment.getCheckpointVersionOpt().orElse(0L),
// Only find the CRC within 100 versions.
snapshotVersion - 100,
0L));
snapshotVersion - 100));
Optional<CRCInfo> crcInfoOpt =
ChecksumReader.getCRCInfo(
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
logSegment
.getLatestChecksum()
.flatMap(
checksum -> {
if (FileNames.getFileVersion(new Path(checksum.getPath())) < crcReadLowerBound) {
return Optional.empty();
}
return ChecksumReader.getCRCInfo(engine, checksum);
});

if (crcInfoOpt.isPresent()) {
CRCInfo crcInfo = crcInfoOpt.get();
if (crcInfo.getVersion() == snapshotVersion) {
// CRC is related to the desired snapshot version. Load protocol and metadata from CRC.
return new Tuple2<>(crcInfo.getProtocol(), crcInfo.getMetadata());
}
checkArgument(
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
crcInfo.getVersion() >= crcReadLowerBound && crcInfo.getVersion() <= snapshotVersion);
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
// but (b) newer than the current hint. Use this CRCInfo to create a new hint
snapshotHint =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class LogSegment {
//////////////////////////////////

public static LogSegment empty(Path logPath) {
return new LogSegment(logPath, -1, Collections.emptyList(), Collections.emptyList(), -1);
return new LogSegment(
logPath, -1, Collections.emptyList(), Collections.emptyList(), Optional.empty(), -1);
}

//////////////////////////////////
Expand All @@ -50,6 +51,7 @@ public static LogSegment empty(Path logPath) {
private final List<FileStatus> deltas;
private final List<FileStatus> checkpoints;
private final Optional<Long> checkpointVersionOpt;
private final Optional<FileStatus> latestChecksum;
private final long lastCommitTimestamp;
private final Lazy<List<FileStatus>> allFiles;
private final Lazy<List<FileStatus>> allFilesReversed;
Expand Down Expand Up @@ -84,6 +86,7 @@ public LogSegment(
long version,
List<FileStatus> deltas,
List<FileStatus> checkpoints,
Optional<FileStatus> latestChecksum,
long lastCommitTimestamp) {

///////////////////////
Expand Down Expand Up @@ -161,6 +164,7 @@ public LogSegment(
this.version = version;
this.deltas = deltas;
this.checkpoints = checkpoints;
this.latestChecksum = latestChecksum;
this.lastCommitTimestamp = lastCommitTimestamp;

this.allFiles =
Expand Down Expand Up @@ -245,13 +249,15 @@ public String toString() {
+ " version=%d,\n"
+ " deltas=[%s\n ],\n"
+ " checkpoints=[%s\n ],\n"
+ " latestChecksum=%s,\n"
+ " checkpointVersion=%s,\n"
+ " lastCommitTimestamp=%d\n"
+ "}",
logPath,
version,
formatList(deltas),
formatList(checkpoints),
latestChecksum.map(FileStatus::toString).orElse("None"),
checkpointVersionOpt.map(String::valueOf).orElse("None"),
lastCommitTimestamp);
}
Expand All @@ -263,4 +269,8 @@ private String formatList(List<FileStatus> list) {
return "\n "
+ list.stream().map(FileStatus::toString).collect(Collectors.joining(",\n "));
}

public Optional<FileStatus> getLatestChecksum() {
return latestChecksum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,11 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
final List<FileStatus> listedFileStatuses =
DeltaLogActionUtils.listDeltaLogFilesAsIter(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
new HashSet<>(
Arrays.asList(
DeltaLogFileType.COMMIT,
DeltaLogFileType.CHECKPOINT,
DeltaLogFileType.CHECKSUM)),
tablePath,
listFromStartVersion,
versionToLoadOpt,
Expand Down Expand Up @@ -388,10 +392,16 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
listedFileStatuses,
fileStatus -> FileNames.isCheckpointFile(new Path(fileStatus.getPath()).getName()));
final List<FileStatus> listedCheckpointFileStatuses = listedCheckpointAndDeltaFileStatuses._1;
final List<FileStatus> listedDeltaFileStatuses = listedCheckpointAndDeltaFileStatuses._2;
final Tuple2<List<FileStatus>, List<FileStatus>> listedDeltaFileCheckSumStatuses =
ListUtils.partition(
listedCheckpointAndDeltaFileStatuses._2,
fileStatus -> FileNames.isCommitFile(new Path(fileStatus.getPath()).getName()));
final List<FileStatus> listedDeltaFileStatuses = listedDeltaFileCheckSumStatuses._1;
final List<FileStatus> listedChecksumFileStatues = listedDeltaFileCheckSumStatuses._2;

logDebugFileStatuses("listedCheckpointFileStatuses", listedCheckpointFileStatuses);
logDebugFileStatuses("listedDeltaFileStatuses", listedDeltaFileStatuses);
logDebugFileStatuses("listedCheckSumFileStatuses", listedChecksumFileStatues);

/////////////////////////////////////////////////////////////////////////////////////////////
// Step 6: Determine the latest complete checkpoint version. The intuition here is that we //
Expand Down Expand Up @@ -562,6 +572,15 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT
newVersion,
deltasAfterCheckpoint,
latestCompleteCheckpointFileStatuses,
listedChecksumFileStatues.stream()
.max(
new Comparator<FileStatus>() {
public int compare(FileStatus file1, FileStatus file2) {
return Long.compare(
FileNames.getFileVersion(new Path(file1.getPath())),
FileNames.getFileVersion(new Path(file2.getPath())));
}
}),
lastCommitTimestamp);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ private FileNames() {}

public enum DeltaLogFileType {
COMMIT,
CHECKPOINT
CHECKPOINT,
CHECKSUM
}

/** Example: 00000000000000000001.json */
Expand Down
Loading
Loading