From 387dcf6626ef829a7647d4f2b83ad5f66c5daaa0 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Mon, 5 Feb 2024 12:00:30 +0100 Subject: [PATCH 1/2] Factor logic to collect files to REORG out of OPTIMIZE --- .../commands/DeltaReorgTableCommand.scala | 73 ++++++++++++++----- .../delta/commands/OptimizeTableCommand.scala | 43 ++++------- .../ReorgTableForUpgradeUniformHelper.scala | 11 +-- .../spark/sql/delta/hooks/AutoCompact.scala | 2 +- 4 files changed, 71 insertions(+), 58 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index 59016b3ff77..c21efc5ddf0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -16,13 +16,10 @@ package org.apache.spark.sql.delta.commands -import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.delta.sources.DeltaSourceUtils +import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.analysis.ResolvedTable import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand} -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} object DeltaReorgTableMode extends Enumeration { val PURGE, UNIFORM_ICEBERG = Value @@ -47,7 +44,7 @@ case class DeltaReorgTable( } /** - * The PURGE command. + * The REORG TABLE command. */ case class DeltaReorgTableCommand( target: LogicalPlan, @@ -60,30 +57,66 @@ case class DeltaReorgTableCommand( override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil - override def optimizeByReorg( - sparkSession: SparkSession, - isPurge: Boolean, - icebergCompatVersion: Option[Int]): Seq[Row] = { + override def optimizeByReorg(sparkSession: SparkSession): Seq[Row] = { val command = OptimizeTableCommand( target, predicates, optimizeContext = DeltaOptimizeContext( - isPurge = isPurge, + reorg = Some(reorgOperation), minFileSize = Some(0L), - maxDeletedRowsRatio = Some(0d), - icebergCompatVersion = icebergCompatVersion - ) + maxDeletedRowsRatio = Some(0d)) )(zOrderBy = Nil) command.run(sparkSession) } - override def run(sparkSession: SparkSession): Seq[Row] = { - reorgTableSpec match { - case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) => - optimizeByReorg(sparkSession, isPurge = true, icebergCompatVersion = None) - case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => - val table = getDeltaTable(target, "REORG") - upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion) + override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match { + case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) => + optimizeByReorg(sparkSession) + case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => + val table = getDeltaTable(target, "REORG") + upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion) + } + + protected def reorgOperation: DeltaReorgOperation = reorgTableSpec match { + case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) => + new DeltaPurgeOperation() + case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => + new DeltaUpgradeUniformOperation(icebergCompatVersion) + } +} + +/** + * Defines a Reorg operation to be applied during optimize. + */ +sealed trait DeltaReorgOperation { + /** + * Collects files that need to be processed by the reorg operation from the list of candidate + * files. + */ + def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] +} + +/** + * Reorg operation to purge files with soft deleted rows. + */ +class DeltaPurgeOperation extends DeltaReorgOperation { + override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = + files.filter { file => + (file.deletionVector != null && file.numPhysicalRecords.isEmpty) || + file.numDeletedRecords > 0L + } +} + +/** + * Reorg operation to upgrade the iceberg compatibility version of a table. + */ +class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation { + override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = { + def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = { + if (file.tags == null) return true + val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0") + !icebergCompatVersion.exists(_.toString == icebergCompatVersion) } + files.filter(shouldRewriteToBeIcebergCompatible) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 2e84ec52460..4342c7cd29d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -187,30 +187,24 @@ case class OptimizeTableCommand( /** * Stored all runtime context information that can control the execution of optimize. * - * @param isPurge Whether the rewriting task is only for purging soft-deleted data instead of - * for compaction. If [[isPurge]] is true, only files with DVs will be selected - * for compaction. + * @param reorg The REORG operation that triggered the rewriting task, if any. * @param minFileSize Files which are smaller than this threshold will be selected for compaction. * If not specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE]] will be used. - * This parameter must be set to `0` when [[isPurge]] is true. + * This parameter must be set to `0` when [[reorg]] is set. * @param maxDeletedRowsRatio Files with a ratio of soft-deleted rows to the total rows larger than * this threshold will be rewritten by the OPTIMIZE command. If not * specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]] - * will be used. This parameter must be set to `0` when [[isPurge]] is - * true. - * @param icebergCompatVersion The iceberg compatibility version used to rewrite data for - * uniform tables. + * will be used. This parameter must be set to `0` when [[reorg]] is set. */ case class DeltaOptimizeContext( - isPurge: Boolean = false, + reorg: Option[DeltaReorgOperation] = None, minFileSize: Option[Long] = None, maxFileSize: Option[Long] = None, - maxDeletedRowsRatio: Option[Double] = None, - icebergCompatVersion: Option[Int] = None) { - if (isPurge || icebergCompatVersion.isDefined) { + maxDeletedRowsRatio: Option[Double] = None) { + if (reorg.nonEmpty) { require( minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d), - "minFileSize and maxDeletedRowsRatio must be 0 when running PURGE.") + "minFileSize and maxDeletedRowsRatio must be 0 when running REORG TABLE.") } } @@ -269,7 +263,10 @@ class OptimizeExecutor( val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true) val partitionSchema = txn.metadata.partitionSchema - val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) + val filesToProcess = optimizeContext.reorg match { + case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles) + case None => pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) + } val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize) @@ -358,18 +355,9 @@ class OptimizeExecutor( file.deletedToPhysicalRecordsRatio.getOrElse(0d) > maxDeletedRowsRatio } - def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = { - if (optimizeContext.icebergCompatVersion.isEmpty) return false - if (file.tags == null) return true - val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0") - !optimizeContext.icebergCompatVersion.exists(_.toString == icebergCompatVersion) - } - - // Select files that are small, have too many deleted rows, - // or need to be made iceberg compatible + // Select files that are small or have too many deleted rows files.filter( - addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile) || - shouldRewriteToBeIcebergCompatible(addFile)) + addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile)) } /** @@ -414,8 +402,7 @@ class OptimizeExecutor( bins.filter { bin => bin.size > 1 || // bin has more than one file or - (bin.size == 1 && bin(0).deletionVector != null) || // single file in the bin has a DV or - (bin.size == 1 && optimizeContext.icebergCompatVersion.isDefined) || // uniform reorg + bin.size == 1 && optimizeContext.reorg.nonEmpty || // always rewrite files during reorg isMultiDimClustering // multi-clustering }.map(b => (partition, b)) } @@ -511,7 +498,7 @@ class OptimizeExecutor( /** Create the appropriate [[Operation]] object for txn commit history */ private def getOperation(): Operation = { - if (optimizeContext.isPurge) { + if (optimizeContext.reorg.nonEmpty) { DeltaOperations.Reorg(partitionPredicate) } else { DeltaOperations.Optimize(partitionPredicate, clusteringColumns, auto = isAutoCompact) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala index c15831b761a..79964431b39 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala @@ -49,10 +49,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { /** * Helper function to rewrite the table. Implemented by Reorg Table Command. */ - def optimizeByReorg( - sparkSession: SparkSession, - isPurge: Boolean, - icebergCompatVersion: Option[Int]): Seq[Row] + def optimizeByReorg(sparkSession: SparkSession): Seq[Row] /** * Helper function to update the table icebergCompat properties. @@ -172,11 +169,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging { logInfo(s"Reorg Table ${target.tableIdentifier} to iceberg compat version = " + s"$targetIcebergCompatVersion need rewrite data files.") val metrics = try { - optimizeByReorg( - sparkSession, - isPurge = false, - icebergCompatVersion = Some(targetIcebergCompatVersion) - ) + optimizeByReorg(sparkSession) } catch { case NonFatal(e) => throw DeltaErrors.icebergCompatDataFileRewriteFailedException( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala index 13fb836879a..72404d2ea77 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/AutoCompact.scala @@ -197,7 +197,7 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging { recordDeltaOperation(deltaLog, s"$opType.execute") { val txn = deltaLog.startTransaction(catalogTable) val optimizeContext = DeltaOptimizeContext( - isPurge = false, + reorg = None, minFileSizeOpt, maxFileSizeOpt, maxDeletedRowsRatio = maxDeletedRowsRatio From 0b0fcb908179e92e0dcb91a91431383d2b7d3690 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Wed, 14 Feb 2024 09:21:31 +0100 Subject: [PATCH 2/2] Address comments --- .../spark/sql/delta/commands/OptimizeTableCommand.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 4342c7cd29d..5e474423716 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -265,7 +265,7 @@ class OptimizeExecutor( val filesToProcess = optimizeContext.reorg match { case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles) - case None => pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) + case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq @@ -341,7 +341,7 @@ class OptimizeExecutor( * Helper method to prune the list of selected files based on fileSize and ratio of * deleted rows according to the deletion vector in [[AddFile]]. */ - private def pruneCandidateFileList( + private def filterCandidateFileList( minFileSize: Long, maxDeletedRowsRatio: Double, files: Seq[AddFile]): Seq[AddFile] = { // Select all files in case of multi-dimensional clustering