Skip to content

Commit 33e8666

Browse files
authored
[Kernel]Remove two file IO in CRC loading by reusing log listing (#4112)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Incremental CRC loading for P&M was added in #4077. For snapshot N, we try to read N.crc, if N.crc missing, we list CRC files and find the one up to N-100. These two file listing operation could be actually avoided, because we have already listed files under _delta_log folder. This PR collects latest CRC in the first file listing and remove two unnecessary IO. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> LogReplay test ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 4cf685b commit 33e8666

19 files changed

+416
-172
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaLogActionUtils.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,10 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
219219
startVersion,
220220
endVersionOpt);
221221

222+
// This variable is used to help determine if we should throw an error if the table history is
223+
// not reconstructable. Only commit and checkpoint files are applicable.
222224
// Must be final to be used in lambda
223-
final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false);
225+
final AtomicBoolean hasReturnedCommitOrCheckpoint = new AtomicBoolean(false);
224226

225227
return listLogDir(engine, tablePath, startVersion)
226228
.breakableFilter(
@@ -237,6 +239,9 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
237239
// Checkpoint files of 0 size are invalid but may be ignored silently when read,
238240
// hence we ignore them so that we never pick up such checkpoints.
239241
// Here, we do nothing (we will consume this file).
242+
} else if (fileTypes.contains(DeltaLogFileType.CHECKSUM)
243+
&& FileNames.isChecksumFile(fileName)) {
244+
// Here, we do nothing (we will consume this file).
240245
} else {
241246
logger.debug("Ignoring file {} as it is not of the desired type", fs.getPath());
242247
return BreakableFilterResult.EXCLUDE; // Here, we exclude and filter out this file.
@@ -277,7 +282,7 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
277282
final long endVersion = endVersionOpt.get();
278283

279284
if (fileVersion > endVersion) {
280-
if (mustBeRecreatable && !hasReturnedAnElement.get()) {
285+
if (mustBeRecreatable && !hasReturnedCommitOrCheckpoint.get()) {
281286
final long earliestVersion =
282287
DeltaHistoryManager.getEarliestRecreatableCommit(engine, logPath);
283288
throw DeltaErrors.versionBeforeFirstAvailableCommit(
@@ -292,7 +297,11 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter(
292297
}
293298
}
294299

295-
hasReturnedAnElement.set(true);
300+
if (FileNames.isCommitFile(fileName)
301+
|| FileNames.isCheckpointFile(fileName)
302+
|| FileNames.isLogCompactionFile(fileName)) {
303+
hasReturnedCommitOrCheckpoint.set(true);
304+
}
296305

297306
return BreakableFilterResult.INCLUDE;
298307
});

kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/ChecksumReader.java

Lines changed: 11 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
import static io.delta.kernel.internal.util.FileNames.*;
1919
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
20-
import static java.lang.Math.min;
2120

2221
import io.delta.kernel.data.ColumnarBatch;
2322
import io.delta.kernel.engine.Engine;
2423
import io.delta.kernel.internal.fs.Path;
2524
import io.delta.kernel.internal.util.FileNames;
2625
import io.delta.kernel.utils.CloseableIterator;
2726
import io.delta.kernel.utils.FileStatus;
28-
import java.io.IOException;
2927
import java.util.*;
3028
import org.slf4j.Logger;
3129
import org.slf4j.LoggerFactory;
@@ -35,89 +33,41 @@ public class ChecksumReader {
3533
private static final Logger logger = LoggerFactory.getLogger(ChecksumReader.class);
3634

3735
/**
38-
* Load the CRCInfo from the checksum file at the given version. If the checksum file is not found
39-
* at the given version, it will try to find the latest checksum file that is created at or after
40-
* the lower bound version.
36+
* Load the CRCInfo from the provided checksum file.
4137
*
4238
* @param engine the engine to use for reading the checksum file
43-
* @param logPath the path to the Delta log
44-
* @param targetedVersion the target version to read the checksum file from
45-
* @param lowerBound the inclusive lower bound version to search for the checksum file
46-
* @return Optional {@link CRCInfo} containing the protocol and metadata, and the version of the
47-
* checksum file. If the checksum file is not found, it will return an empty
39+
* @param checkSumFile the file status of the checksum file to read
40+
* @return Optional {@link CRCInfo} containing the information included in the checksum file, such
41+
* as protocol, metadata.
4842
*/
49-
public static Optional<CRCInfo> getCRCInfo(
50-
Engine engine, Path logPath, long targetedVersion, long lowerBound) {
51-
// lower bound should always smaller than the targetedVersion.
52-
lowerBound = min(lowerBound, targetedVersion);
53-
logger.info("Loading CRC file for version {} with lower bound {}", targetedVersion, lowerBound);
54-
// First try to load the CRC at given version. If not found or failed to read then try to
55-
// find the latest CRC file that is created at or after the lower bound version.
56-
Path crcFilePath = checksumFile(logPath, targetedVersion);
57-
Optional<CRCInfo> crcInfoOpt = readChecksumFile(engine, crcFilePath);
58-
if (crcInfoOpt.isPresent()
59-
||
60-
// we don't expect any more checksum files as it is the first version
61-
targetedVersion == 0
62-
|| targetedVersion == lowerBound) {
63-
return crcInfoOpt;
64-
}
65-
logger.info(
66-
"CRC file for version {} not found, listing CRC files from version {}",
67-
targetedVersion,
68-
lowerBound);
69-
70-
Path lowerBoundFilePath = checksumFile(logPath, lowerBound);
71-
try (CloseableIterator<FileStatus> crcFiles =
72-
engine.getFileSystemClient().listFrom(lowerBoundFilePath.toString())) {
73-
List<FileStatus> crcFilesList =
74-
crcFiles
75-
.filter(file -> isChecksumFile(file.getPath()))
76-
.takeWhile(file -> checksumVersion(new Path(file.getPath())) <= targetedVersion)
77-
.toInMemoryList();
78-
79-
// pick the last file which is the latest version that has the CRC file
80-
if (crcFilesList.isEmpty()) {
81-
logger.warn("No checksum files found in the range {} to {}", lowerBound, targetedVersion);
82-
return Optional.empty();
83-
}
84-
85-
FileStatus latestCRCFile = crcFilesList.get(crcFilesList.size() - 1);
86-
return readChecksumFile(engine, new Path(latestCRCFile.getPath()));
87-
} catch (IOException e) {
88-
logger.warn("Failed to list checksum files from {}", lowerBoundFilePath, e);
89-
return Optional.empty();
90-
}
91-
}
92-
93-
private static Optional<CRCInfo> readChecksumFile(Engine engine, Path filePath) {
43+
public static Optional<CRCInfo> getCRCInfo(Engine engine, FileStatus checkSumFile) {
9444
try (CloseableIterator<ColumnarBatch> iter =
9545
engine
9646
.getJsonHandler()
9747
.readJsonFiles(
98-
singletonCloseableIterator(FileStatus.of(filePath.toString())),
48+
singletonCloseableIterator(checkSumFile),
9949
CRCInfo.CRC_FILE_SCHEMA,
10050
Optional.empty())) {
10151
// We do this instead of iterating through the rows or using `getSingularRow` so we
10252
// can use the existing fromColumnVector methods in Protocol, Metadata, Format etc
10353
if (!iter.hasNext()) {
104-
logger.warn("Checksum file is empty: {}", filePath);
54+
logger.warn("Checksum file is empty: {}", checkSumFile.getPath());
10555
return Optional.empty();
10656
}
10757

10858
ColumnarBatch batch = iter.next();
10959
if (batch.getSize() != 1) {
11060
String msg = "Expected exactly one row in the checksum file {}, found {} rows";
111-
logger.warn(msg, filePath, batch.getSize());
61+
logger.warn(msg, checkSumFile.getPath(), batch.getSize());
11262
return Optional.empty();
11363
}
11464

115-
long crcVersion = FileNames.checksumVersion(filePath);
65+
long crcVersion = FileNames.checksumVersion(new Path(checkSumFile.getPath()));
11666

117-
return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, filePath.toString());
67+
return CRCInfo.fromColumnarBatch(crcVersion, batch, 0 /* rowId */, checkSumFile.getPath());
11868
} catch (Exception e) {
11969
// This can happen when the version does not have a checksum file
120-
logger.warn("Failed to read checksum file {}", filePath, e);
70+
logger.warn("Failed to read checksum file {}", checkSumFile.getPath(), e);
12171
return Optional.empty();
12272
}
12373
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/compaction/LogCompactionWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public void writeLogCompactionFile(Engine engine) throws IOException {
105105
deltas,
106106
Collections.emptyList(),
107107
Collections.emptyList(),
108+
Optional.empty(),
108109
lastCommitTimestamp);
109110
CreateCheckpointIterator checkpointIterator =
110111
new CreateCheckpointIterator(engine, segment, minFileRetentionTimestampMillis);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO;
2020
import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED;
2121
import static io.delta.kernel.internal.actions.SingleAction.*;
22+
import static io.delta.kernel.internal.util.FileNames.checksumFile;
2223
import static io.delta.kernel.internal.util.FileNames.deltaFile;
2324
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
2425
import static io.delta.kernel.internal.util.Preconditions.checkState;
@@ -175,7 +176,8 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
175176

176177
Optional<CRCInfo> updatedCrcInfo =
177178
ChecksumReader.getCRCInfo(
178-
engine, snapshot.getLogPath(), lastWinningVersion, lastWinningVersion);
179+
engine,
180+
FileStatus.of(checksumFile(snapshot.getLogPath(), lastWinningVersion).toString()));
179181

180182
// if we get here, we have successfully rebased (i.e no logical conflicts)
181183
// against the winning transactions

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

Lines changed: 95 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818

1919
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
2020
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
21-
import static java.util.Arrays.asList;
22-
import static java.util.Collections.max;
21+
import static java.util.Objects.requireNonNull;
2322

2423
import io.delta.kernel.data.ColumnVector;
2524
import io.delta.kernel.data.ColumnarBatch;
@@ -38,6 +37,7 @@
3837
import io.delta.kernel.internal.snapshot.SnapshotHint;
3938
import io.delta.kernel.internal.tablefeatures.TableFeatures;
4039
import io.delta.kernel.internal.util.DomainMetadataUtils;
40+
import io.delta.kernel.internal.util.FileNames;
4141
import io.delta.kernel.internal.util.Tuple2;
4242
import io.delta.kernel.types.StringType;
4343
import io.delta.kernel.types.StructType;
@@ -135,7 +135,7 @@ public static StructType getAddReadSchema(boolean shouldReadStats) {
135135
private final LogSegment logSegment;
136136
private final Tuple2<Protocol, Metadata> protocolAndMetadata;
137137
private final Lazy<Map<String, DomainMetadata>> domainMetadataMap;
138-
private final Optional<CRCInfo> currentCrcInfo;
138+
private final CrcInfoContext crcInfoContext;
139139

140140
public LogReplay(
141141
Path logPath,
@@ -147,17 +147,23 @@ public LogReplay(
147147
SnapshotMetrics snapshotMetrics) {
148148

149149
assertLogFilesBelongToTable(logPath, logSegment.allLogFilesUnsorted());
150-
Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>> newerSnapshotHintAndCurrentCrcInfo =
151-
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
152-
engine, logSegment, snapshotHint, snapshotVersion);
153-
this.currentCrcInfo = newerSnapshotHintAndCurrentCrcInfo._2;
150+
151+
// Ignore the snapshot hint whose version is larger than the snapshot version.
152+
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
153+
snapshotHint = Optional.empty();
154+
}
155+
156+
this.crcInfoContext = new CrcInfoContext(engine);
154157
this.dataPath = dataPath;
155158
this.logSegment = logSegment;
159+
Optional<SnapshotHint> newerSnapshotHint =
160+
crcInfoContext.maybeGetNewerSnapshotHintAndUpdateCache(
161+
engine, logSegment, snapshotHint, snapshotVersion);
156162
this.protocolAndMetadata =
157163
snapshotMetrics.loadInitialDeltaActionsTimer.time(
158164
() ->
159165
loadTableProtocolAndMetadata(
160-
engine, logSegment, newerSnapshotHintAndCurrentCrcInfo._1, snapshotVersion));
166+
engine, logSegment, newerSnapshotHint, snapshotVersion));
161167
// Lazy loading of domain metadata only when needed
162168
this.domainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine));
163169
}
@@ -186,9 +192,11 @@ public long getVersion() {
186192
return logSegment.getVersion();
187193
}
188194

189-
/** Returns the crc info for the current snapshot if the checksum file is read */
195+
/** Returns the crc info for the current snapshot if it is cached */
190196
public Optional<CRCInfo> getCurrentCrcInfo() {
191-
return currentCrcInfo;
197+
return crcInfoContext
198+
.getLastSeenCrcInfo()
199+
.filter(crcInfo -> crcInfo.getVersion() == getVersion());
192200
}
193201

194202
/**
@@ -367,6 +375,7 @@ private Optional<Long> loadLatestTransactionVersion(Engine engine, String applic
367375
*/
368376
private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) {
369377
// First try to load from CRC info if available
378+
Optional<CRCInfo> currentCrcInfo = getCurrentCrcInfo();
370379
if (currentCrcInfo.isPresent() && currentCrcInfo.get().getDomainMetadata().isPresent()) {
371380
return currentCrcInfo.get().getDomainMetadata().get().stream()
372381
.collect(Collectors.toMap(DomainMetadata::getDomain, Function.identity()));
@@ -415,50 +424,87 @@ private Map<String, DomainMetadata> loadDomainMetadataMapFromLog(Engine engine)
415424
}
416425

417426
/**
418-
* Calculates the latest snapshot hint before or at the current snapshot version, returns the
419-
* CRCInfo if checksum file at the current version is read
427+
* Encapsulates CRC-related functionality and state for the LogReplay. This includes caching CRC
428+
* info and extracting snapshot hints from CRC files.
429+
*
430+
* <p>This class uses {@code maybeGetNewerSnapshotHintAndUpdateCache} to calculate a {@code
431+
* SnapshotHint} and also exposes a {@code getLastSeenCrcInfo} method. Their relationship is:
432+
*
433+
* <ul>
434+
* <li>We want to find the latest {@code SnapshotHint} to use during log replay for Protocol and
435+
* Metadata loading
436+
* <li>If we are not provided a SnapshotHint for this version, or are provided a stale hint, we
437+
* will try to read the latest seen (by file listing) CRC file (if it exists). If so, we
438+
* read it, cache it, and create a newer hint.
439+
* <li>Then, when {@code getLastSeenCrcInfo} is called, we will either use the cached CRCInfo
440+
* that we have already read, parsed, and cached; or, if it was never cached (because the
441+
* hint was sufficiently new) we will read it, parse it, and cache it for the first time
442+
* </ul>
420443
*/
421-
private Tuple2<Optional<SnapshotHint>, Optional<CRCInfo>>
422-
maybeGetNewerSnapshotHintAndCurrentCrcInfo(
423-
Engine engine,
424-
LogSegment logSegment,
425-
Optional<SnapshotHint> snapshotHint,
426-
long snapshotVersion) {
427-
428-
// Snapshot hint's version is current.
429-
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) {
430-
return new Tuple2<>(snapshotHint, Optional.empty());
444+
private class CrcInfoContext {
445+
private final Engine engine;
446+
private Optional<CRCInfo> cachedLastSeenCrcInfo;
447+
448+
CrcInfoContext(Engine engine) {
449+
this.engine = requireNonNull(engine);
450+
this.cachedLastSeenCrcInfo = Optional.empty();
431451
}
432452

433-
// Ignore the snapshot hint whose version is larger.
434-
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() > snapshotVersion) {
435-
snapshotHint = Optional.empty();
453+
/** Returns the CRC info persisted in the logSegment's lastSeenChecksum File */
454+
public Optional<CRCInfo> getLastSeenCrcInfo() {
455+
if (!cachedLastSeenCrcInfo.isPresent()) {
456+
cachedLastSeenCrcInfo =
457+
logSegment
458+
.getLastSeenChecksum()
459+
.flatMap(crcFile -> ChecksumReader.getCRCInfo(engine, crcFile));
460+
}
461+
return cachedLastSeenCrcInfo;
436462
}
437463

438-
long crcSearchLowerBound =
439-
max(
440-
asList(
441-
// Prefer reading hint over CRC, so start listing from hint's version + 1,
442-
// if hint is not present, list from version 0.
443-
snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1,
444-
logSegment.getCheckpointVersionOpt().orElse(0L),
445-
// Only find the CRC within 100 versions.
446-
snapshotVersion - 100,
447-
0L));
448-
Optional<CRCInfo> crcInfoOpt =
449-
ChecksumReader.getCRCInfo(
450-
engine, logSegment.getLogPath(), snapshotVersion, crcSearchLowerBound);
451-
if (!crcInfoOpt.isPresent()) {
452-
return new Tuple2<>(snapshotHint, Optional.empty());
464+
/**
465+
* Attempts to build a newer snapshot hint from CRC that can be used for loading table state
466+
* more efficiently. When CRC is read, updates the internal cache.
467+
*
468+
* @param engine The engine used to read CRC files
469+
* @param logSegment The log segment containing checksum information
470+
* @param snapshotHint Existing snapshot hint, if any
471+
* @param snapshotVersion Target snapshot version
472+
* @return An updated snapshot hint if a newer CRC file was found, otherwise the original hint
473+
*/
474+
public Optional<SnapshotHint> maybeGetNewerSnapshotHintAndUpdateCache(
475+
Engine engine,
476+
LogSegment logSegment,
477+
Optional<SnapshotHint> snapshotHint,
478+
long snapshotVersion) {
479+
480+
// Snapshot hint's version is current so we could use it in loading P&M.
481+
// No need to read crc.
482+
if (snapshotHint.isPresent() && snapshotHint.get().getVersion() == snapshotVersion) {
483+
return snapshotHint;
484+
}
485+
486+
// Prefer reading hint over CRC to save 1 io, only read crc if it is newer than snapshot hint.
487+
long crcReadLowerBound = snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1;
488+
489+
Optional<CRCInfo> crcInfoOpt =
490+
logSegment
491+
.getLastSeenChecksum()
492+
.filter(
493+
checksum ->
494+
FileNames.getFileVersion(new Path(checksum.getPath())) >= crcReadLowerBound)
495+
.flatMap(checksum -> ChecksumReader.getCRCInfo(engine, checksum));
496+
497+
if (!crcInfoOpt.isPresent()) {
498+
return snapshotHint;
499+
}
500+
501+
CRCInfo crcInfo = crcInfoOpt.get();
502+
this.cachedLastSeenCrcInfo = Optional.of(crcInfo);
503+
checkArgument(
504+
crcInfo.getVersion() >= crcReadLowerBound && crcInfo.getVersion() <= snapshotVersion);
505+
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
506+
// but (b) newer than the current hint. Use this CRCInfo to create a new hint, and return.
507+
return Optional.of(SnapshotHint.fromCrcInfo(crcInfo));
453508
}
454-
CRCInfo crcInfo = crcInfoOpt.get();
455-
checkArgument(
456-
crcInfo.getVersion() >= crcSearchLowerBound && crcInfo.getVersion() <= snapshotVersion);
457-
// We found a CRCInfo of a version (a) older than the one we are looking for (snapshotVersion)
458-
// but (b) newer than the current hint. Use this CRCInfo to create a new hint, and return this
459-
// crc info if it matches the current version.
460-
return new Tuple2<>(
461-
Optional.of(SnapshotHint.fromCrcInfo(crcInfo)),
462-
crcInfo.getVersion() == snapshotVersion ? crcInfoOpt : Optional.empty());
463509
}
464510
}

0 commit comments

Comments
 (0)