Skip to content

Commit

Permalink
[Kernel] Refactor SnapshotManager to take logPath and dataPath as par…
Browse files Browse the repository at this point in the history
…t of the constructor (#2613)
  • Loading branch information
allisonport-db authored Feb 8, 2024
1 parent 5545f28 commit 49f2625
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 51 deletions.
7 changes: 7 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ static Table forPath(TableClient tableClient, String path)
*/
Snapshot getLatestSnapshot(TableClient tableClient)
throws TableNotFoundException;

/**
* The fully qualified path of this {@link Table} instance.
*
* @return the table path
*/
String getPath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,26 @@ public static Table forPath(TableClient tableClient, String path)
} catch (IOException io) {
throw new RuntimeException(io);
}
final Path dataPath = new Path(resolvedPath);
final Path logPath = new Path(dataPath, "_delta_log");

return new TableImpl(logPath, dataPath);
return new TableImpl(resolvedPath);
}

private final Path logPath;
private final Path dataPath;
private final SnapshotManager snapshotManager;
private final String tablePath;

public TableImpl(Path logPath, Path dataPath) {
this.logPath = logPath;
this.dataPath = dataPath;
this.snapshotManager = new SnapshotManager();
public TableImpl(String tablePath) {
this.tablePath = tablePath;
final Path dataPath = new Path(tablePath);
final Path logPath = new Path(dataPath, "_delta_log");
this.snapshotManager = new SnapshotManager(logPath, dataPath);
}

@Override
public Snapshot getLatestSnapshot(TableClient tableClient) throws TableNotFoundException {
return snapshotManager.buildLatestSnapshot(tableClient, logPath, dataPath);
return snapshotManager.buildLatestSnapshot(tableClient);
}

@Override
public String getPath() {
return tablePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ public class SnapshotManager {
* is `null`.
*/
private AtomicReference<SnapshotHint> latestSnapshotHint;
private final Path logPath;
private final Path dataPath;

public SnapshotManager() {
public SnapshotManager(Path logPath, Path dataPath) {
this.latestSnapshotHint = new AtomicReference<>();
this.logPath = logPath;
this.dataPath = dataPath;
}

private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
Expand Down Expand Up @@ -98,14 +102,12 @@ public static void verifyDeltaVersions(
* Construct the latest snapshot for given table.
*
* @param tableClient Instance of {@link TableClient} to use.
* @param logPath Where the Delta log files are located.
* @param dataPath Where the Delta data files are located.
* @return
* @throws TableNotFoundException
*/
public Snapshot buildLatestSnapshot(TableClient tableClient, Path logPath, Path dataPath)
public Snapshot buildLatestSnapshot(TableClient tableClient)
throws TableNotFoundException {
return getSnapshotAtInit(tableClient, logPath, dataPath);
return getSnapshotAtInit(tableClient);
}

////////////////////
Expand All @@ -130,7 +132,6 @@ private void registerHint(SnapshotHint newHint) {
* Get an iterator of files in the _delta_log directory starting with the startVersion.
*/
private CloseableIterator<FileStatus> listFrom(
Path logPath,
TableClient tableClient,
long startVersion)
throws IOException {
Expand All @@ -153,17 +154,16 @@ private boolean isDeltaCommitOrCheckpointFile(String fileName) {
}

/**
* Returns an iterator containing a list of files found from the provided path
* Returns an iterator containing a list of files found in the _delta_log directory starting
* with the startVersion. Returns None if no files are found or the directory is missing.
*/
private Optional<CloseableIterator<FileStatus>> listFromOrNone(
Path logPath,
TableClient tableClient,
long startVersion) {
// LIST the directory, starting from the provided lower bound (treat missing dir as empty).
// NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files."
try {
CloseableIterator<FileStatus> results = listFrom(
logPath,
tableClient,
startVersion);
if (results.hasNext()) {
Expand All @@ -190,14 +190,12 @@ private Optional<CloseableIterator<FileStatus>> listFromOrNone(
* None if the listing returned no files at all.
*/
protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
Path logPath,
TableClient tableClient,
long startVersion,
Optional<Long> versionToLoad) {
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(
logPath,
tableClient,
startVersion).map(fileStatusesIter -> {
final List<FileStatus> output = new ArrayList<>();
Expand Down Expand Up @@ -237,27 +235,23 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
* Load the Snapshot for this Delta table at initialization. This method uses the
* `lastCheckpoint` file as a hint on where to start listing the transaction log directory.
*/
private SnapshotImpl getSnapshotAtInit(TableClient tableClient, Path logPath, Path dataPath)
private SnapshotImpl getSnapshotAtInit(TableClient tableClient)
throws TableNotFoundException {
Checkpointer checkpointer = new Checkpointer(logPath);
Optional<CheckpointMetaData> lastCheckpointOpt =
checkpointer.readLastCheckpointFile(tableClient);
Optional<LogSegment> logSegmentOpt =
getLogSegmentFrom(logPath, tableClient, lastCheckpointOpt);
getLogSegmentFrom(tableClient, lastCheckpointOpt);

return logSegmentOpt
.map(logSegment -> createSnapshot(
logSegment,
logPath,
dataPath,
tableClient))
.orElseThrow(() -> new TableNotFoundException(dataPath.toString()));
}

private SnapshotImpl createSnapshot(
LogSegment initSegment,
Path logPath,
Path dataPath,
TableClient tableClient) {
final String startingFromStr = initSegment
.checkpointVersionOpt
Expand Down Expand Up @@ -292,11 +286,9 @@ private SnapshotImpl createSnapshot(
* @param startingCheckpoint A checkpoint that we can start our listing from
*/
private Optional<LogSegment> getLogSegmentFrom(
Path logPath,
TableClient tableClient,
Optional<CheckpointMetaData> startingCheckpoint) {
return getLogSegmentForVersion(
logPath,
tableClient,
startingCheckpoint.map(x -> x.version),
Optional.empty());
Expand All @@ -316,10 +308,9 @@ private Optional<LogSegment> getLogSegmentFrom(
* latest
* version of the table.
* @return Some LogSegment to build a Snapshot if files do exist after the given
* startCheckpoint. None, if the directory was missing or empty.
* startCheckpoint. None, if the delta log directory was missing or empty.
*/
public Optional<LogSegment> getLogSegmentForVersion(
Path logPath,
TableClient tableClient,
Optional<Long> startCheckpoint,
Optional<Long> versionToLoad) {
Expand All @@ -329,12 +320,10 @@ public Optional<LogSegment> getLogSegmentForVersion(
// startCheckpoint > versionToLoad
final Optional<List<FileStatus>> newFiles =
listDeltaAndCheckpointFiles(
logPath,
tableClient,
startCheckpoint.orElse(0L),
versionToLoad);
return getLogSegmentForVersion(
logPath,
tableClient,
startCheckpoint,
versionToLoad,
Expand All @@ -346,7 +335,6 @@ public Optional<LogSegment> getLogSegmentForVersion(
* and will then try to construct a new LogSegment using that.
*/
protected Optional<LogSegment> getLogSegmentForVersion(
Path logPath,
TableClient tableClient,
Optional<Long> startCheckpointOpt,
Optional<Long> versionToLoadOpt,
Expand Down Expand Up @@ -387,7 +375,6 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// The directory may be deleted and recreated and we may have stale state in our
// DeltaLog singleton, so try listing from the first version
return getLogSegmentForVersion(
logPath,
tableClient,
Optional.empty(),
versionToLoadOpt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ class SnapshotManagerSuite extends AnyFunSuite {
val checkpoints = singularCheckpointFileStatuses(checkpointVersions)
val multiCheckpoints = multiCheckpointFileStatuses(multiCheckpointVersions, numParts)

val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(deltas ++ checkpoints ++ multiCheckpoints)),
startCheckpoint,
versionToLoad
Expand Down Expand Up @@ -262,8 +261,7 @@ class SnapshotManagerSuite extends AnyFunSuite {
versionToLoad: Optional[java.lang.Long] = Optional.empty(),
expectedErrorMessageContains: String = "")(implicit classTag: ClassTag[T]): Unit = {
val e = intercept[T] {
new SnapshotManager().getLogSegmentForVersion(
logPath,
snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(files)),
startCheckpoint,
versionToLoad
Expand Down Expand Up @@ -381,8 +379,7 @@ class SnapshotManagerSuite extends AnyFunSuite {

test("getLogSegmentForVersion: empty delta log") {
// listDeltaAndCheckpointFiles = Optional.empty()
val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(Seq.empty)),
Optional.empty(),
Optional.empty()
Expand Down Expand Up @@ -428,8 +425,7 @@ class SnapshotManagerSuite extends AnyFunSuite {
listFromFileList(files)(filePath)
}
for (checkpointV <- Seq(10, 20)) {
val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFrom(checkpointV)),
Optional.of(checkpointV),
Optional.empty()
Expand Down Expand Up @@ -521,8 +517,7 @@ class SnapshotManagerSuite extends AnyFunSuite {
startCheckpoint = Optional.of(10),
expectedErrorMessageContains = "Could not find any delta files for version 10"
)
val logSegment = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegment = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(fileList)),
Optional.empty(),
Optional.empty()
Expand Down Expand Up @@ -668,8 +663,7 @@ class SnapshotManagerSuite extends AnyFunSuite {
.take(4)
val checkpoints = singularCheckpointFileStatuses(validVersions)
val deltas = deltaFileStatuses(deltaVersions)
val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(deltas ++ corruptedCheckpoint ++ checkpoints)),
Optional.empty(),
Optional.empty()
Expand All @@ -690,8 +684,7 @@ class SnapshotManagerSuite extends AnyFunSuite {

test("getLogSegmentForVersion: corrupt _last_checkpoint with empty delta log") {
// listDeltaAndCheckpointFiles = Optional.empty()
val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion(
logPath,
val logSegmentOpt = snapshotManager.getLogSegmentForVersion(
createMockTableClient(listFromFileList(Seq.empty)),
Optional.of(1),
Optional.empty()
Expand All @@ -702,7 +695,9 @@ class SnapshotManagerSuite extends AnyFunSuite {

object SnapshotManagerSuite {

private val logPath = new Path("/fake/path/to/table/_delta_log")
private val dataPath = new Path("/fake/path/to/table/")
private val logPath = new Path(dataPath, "_delta_log")
private val snapshotManager = new SnapshotManager(logPath, dataPath)

private def deltaFileStatuses(deltaVersions: Seq[Long]): Seq[FileStatus] = {
assert(deltaVersions.size == deltaVersions.toSet.size)
Expand Down

0 comments on commit 49f2625

Please sign in to comment.