From 49f2625423e16a7d0f6c7892145c7b3eed329052 Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Wed, 7 Feb 2024 17:09:32 -0800 Subject: [PATCH] [Kernel] Refactor SnapshotManager to take logPath and dataPath as part of the constructor (#2613) --- .../src/main/java/io/delta/kernel/Table.java | 7 ++++ .../io/delta/kernel/internal/TableImpl.java | 24 ++++++------ .../internal/snapshot/SnapshotManager.java | 37 ++++++------------- .../internal/SnapshotManagerSuite.scala | 25 +++++-------- 4 files changed, 42 insertions(+), 51 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java b/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java index bba720df23b..ae70f6741eb 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/Table.java @@ -49,4 +49,11 @@ static Table forPath(TableClient tableClient, String path) */ Snapshot getLatestSnapshot(TableClient tableClient) throws TableNotFoundException; + + /** + * The fully qualified path of this {@link Table} instance. + * + * @return the table path + */ + String getPath(); } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 4ab1d4acde1..c9f7c157dcf 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -38,24 +38,26 @@ public static Table forPath(TableClient tableClient, String path) } catch (IOException io) { throw new RuntimeException(io); } - final Path dataPath = new Path(resolvedPath); - final Path logPath = new Path(dataPath, "_delta_log"); - - return new TableImpl(logPath, dataPath); + return new TableImpl(resolvedPath); } - private final Path logPath; - private final Path dataPath; private final SnapshotManager snapshotManager; + private final String tablePath; - public TableImpl(Path logPath, Path dataPath) { - this.logPath = logPath; - this.dataPath = dataPath; - this.snapshotManager = new SnapshotManager(); + public TableImpl(String tablePath) { + this.tablePath = tablePath; + final Path dataPath = new Path(tablePath); + final Path logPath = new Path(dataPath, "_delta_log"); + this.snapshotManager = new SnapshotManager(logPath, dataPath); } @Override public Snapshot getLatestSnapshot(TableClient tableClient) throws TableNotFoundException { - return snapshotManager.buildLatestSnapshot(tableClient, logPath, dataPath); + return snapshotManager.buildLatestSnapshot(tableClient); + } + + @Override + public String getPath() { + return tablePath; } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 5ebfaff6c31..dcaba0ce529 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -51,9 +51,13 @@ public class SnapshotManager { * is `null`. */ private AtomicReference latestSnapshotHint; + private final Path logPath; + private final Path dataPath; - public SnapshotManager() { + public SnapshotManager(Path logPath, Path dataPath) { this.latestSnapshotHint = new AtomicReference<>(); + this.logPath = logPath; + this.dataPath = dataPath; } private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); @@ -98,14 +102,12 @@ public static void verifyDeltaVersions( * Construct the latest snapshot for given table. * * @param tableClient Instance of {@link TableClient} to use. - * @param logPath Where the Delta log files are located. - * @param dataPath Where the Delta data files are located. * @return * @throws TableNotFoundException */ - public Snapshot buildLatestSnapshot(TableClient tableClient, Path logPath, Path dataPath) + public Snapshot buildLatestSnapshot(TableClient tableClient) throws TableNotFoundException { - return getSnapshotAtInit(tableClient, logPath, dataPath); + return getSnapshotAtInit(tableClient); } //////////////////// @@ -130,7 +132,6 @@ private void registerHint(SnapshotHint newHint) { * Get an iterator of files in the _delta_log directory starting with the startVersion. */ private CloseableIterator listFrom( - Path logPath, TableClient tableClient, long startVersion) throws IOException { @@ -153,17 +154,16 @@ private boolean isDeltaCommitOrCheckpointFile(String fileName) { } /** - * Returns an iterator containing a list of files found from the provided path + * Returns an iterator containing a list of files found in the _delta_log directory starting + * with the startVersion. Returns None if no files are found or the directory is missing. */ private Optional> listFromOrNone( - Path logPath, TableClient tableClient, long startVersion) { // LIST the directory, starting from the provided lower bound (treat missing dir as empty). // NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files." try { CloseableIterator results = listFrom( - logPath, tableClient, startVersion); if (results.hasNext()) { @@ -190,14 +190,12 @@ private Optional> listFromOrNone( * None if the listing returned no files at all. */ protected final Optional> listDeltaAndCheckpointFiles( - Path logPath, TableClient tableClient, long startVersion, Optional versionToLoad) { logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad); return listFromOrNone( - logPath, tableClient, startVersion).map(fileStatusesIter -> { final List output = new ArrayList<>(); @@ -237,27 +235,23 @@ protected final Optional> listDeltaAndCheckpointFiles( * Load the Snapshot for this Delta table at initialization. This method uses the * `lastCheckpoint` file as a hint on where to start listing the transaction log directory. */ - private SnapshotImpl getSnapshotAtInit(TableClient tableClient, Path logPath, Path dataPath) + private SnapshotImpl getSnapshotAtInit(TableClient tableClient) throws TableNotFoundException { Checkpointer checkpointer = new Checkpointer(logPath); Optional lastCheckpointOpt = checkpointer.readLastCheckpointFile(tableClient); Optional logSegmentOpt = - getLogSegmentFrom(logPath, tableClient, lastCheckpointOpt); + getLogSegmentFrom(tableClient, lastCheckpointOpt); return logSegmentOpt .map(logSegment -> createSnapshot( logSegment, - logPath, - dataPath, tableClient)) .orElseThrow(() -> new TableNotFoundException(dataPath.toString())); } private SnapshotImpl createSnapshot( LogSegment initSegment, - Path logPath, - Path dataPath, TableClient tableClient) { final String startingFromStr = initSegment .checkpointVersionOpt @@ -292,11 +286,9 @@ private SnapshotImpl createSnapshot( * @param startingCheckpoint A checkpoint that we can start our listing from */ private Optional getLogSegmentFrom( - Path logPath, TableClient tableClient, Optional startingCheckpoint) { return getLogSegmentForVersion( - logPath, tableClient, startingCheckpoint.map(x -> x.version), Optional.empty()); @@ -316,10 +308,9 @@ private Optional getLogSegmentFrom( * latest * version of the table. * @return Some LogSegment to build a Snapshot if files do exist after the given - * startCheckpoint. None, if the directory was missing or empty. + * startCheckpoint. None, if the delta log directory was missing or empty. */ public Optional getLogSegmentForVersion( - Path logPath, TableClient tableClient, Optional startCheckpoint, Optional versionToLoad) { @@ -329,12 +320,10 @@ public Optional getLogSegmentForVersion( // startCheckpoint > versionToLoad final Optional> newFiles = listDeltaAndCheckpointFiles( - logPath, tableClient, startCheckpoint.orElse(0L), versionToLoad); return getLogSegmentForVersion( - logPath, tableClient, startCheckpoint, versionToLoad, @@ -346,7 +335,6 @@ public Optional getLogSegmentForVersion( * and will then try to construct a new LogSegment using that. */ protected Optional getLogSegmentForVersion( - Path logPath, TableClient tableClient, Optional startCheckpointOpt, Optional versionToLoadOpt, @@ -387,7 +375,6 @@ protected Optional getLogSegmentForVersion( // The directory may be deleted and recreated and we may have stale state in our // DeltaLog singleton, so try listing from the first version return getLogSegmentForVersion( - logPath, tableClient, Optional.empty(), versionToLoadOpt); diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index c26df75943c..2893c1ef11b 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -164,8 +164,7 @@ class SnapshotManagerSuite extends AnyFunSuite { val checkpoints = singularCheckpointFileStatuses(checkpointVersions) val multiCheckpoints = multiCheckpointFileStatuses(multiCheckpointVersions, numParts) - val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(deltas ++ checkpoints ++ multiCheckpoints)), startCheckpoint, versionToLoad @@ -262,8 +261,7 @@ class SnapshotManagerSuite extends AnyFunSuite { versionToLoad: Optional[java.lang.Long] = Optional.empty(), expectedErrorMessageContains: String = "")(implicit classTag: ClassTag[T]): Unit = { val e = intercept[T] { - new SnapshotManager().getLogSegmentForVersion( - logPath, + snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(files)), startCheckpoint, versionToLoad @@ -381,8 +379,7 @@ class SnapshotManagerSuite extends AnyFunSuite { test("getLogSegmentForVersion: empty delta log") { // listDeltaAndCheckpointFiles = Optional.empty() - val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(Seq.empty)), Optional.empty(), Optional.empty() @@ -428,8 +425,7 @@ class SnapshotManagerSuite extends AnyFunSuite { listFromFileList(files)(filePath) } for (checkpointV <- Seq(10, 20)) { - val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFrom(checkpointV)), Optional.of(checkpointV), Optional.empty() @@ -521,8 +517,7 @@ class SnapshotManagerSuite extends AnyFunSuite { startCheckpoint = Optional.of(10), expectedErrorMessageContains = "Could not find any delta files for version 10" ) - val logSegment = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegment = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(fileList)), Optional.empty(), Optional.empty() @@ -668,8 +663,7 @@ class SnapshotManagerSuite extends AnyFunSuite { .take(4) val checkpoints = singularCheckpointFileStatuses(validVersions) val deltas = deltaFileStatuses(deltaVersions) - val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(deltas ++ corruptedCheckpoint ++ checkpoints)), Optional.empty(), Optional.empty() @@ -690,8 +684,7 @@ class SnapshotManagerSuite extends AnyFunSuite { test("getLogSegmentForVersion: corrupt _last_checkpoint with empty delta log") { // listDeltaAndCheckpointFiles = Optional.empty() - val logSegmentOpt = new SnapshotManager().getLogSegmentForVersion( - logPath, + val logSegmentOpt = snapshotManager.getLogSegmentForVersion( createMockTableClient(listFromFileList(Seq.empty)), Optional.of(1), Optional.empty() @@ -702,7 +695,9 @@ class SnapshotManagerSuite extends AnyFunSuite { object SnapshotManagerSuite { - private val logPath = new Path("/fake/path/to/table/_delta_log") + private val dataPath = new Path("/fake/path/to/table/") + private val logPath = new Path(dataPath, "_delta_log") + private val snapshotManager = new SnapshotManager(logPath, dataPath) private def deltaFileStatuses(deltaVersions: Seq[Long]): Seq[FileStatus] = { assert(deltaVersions.size == deltaVersions.toSet.size)