Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor logic to collect files to REORG out of OPTIMIZE #2616

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@

johanl-db marked this conversation as resolved.
Show resolved Hide resolved
package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.spark.sql.delta.actions.AddFile

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

object DeltaReorgTableMode extends Enumeration {
val PURGE, UNIFORM_ICEBERG = Value
Expand All @@ -47,7 +44,7 @@ case class DeltaReorgTable(
}

/**
* The PURGE command.
* The REORG TABLE command.
*/
case class DeltaReorgTableCommand(
target: LogicalPlan,
Expand All @@ -60,30 +57,66 @@ case class DeltaReorgTableCommand(

override val otherCopyArgs: Seq[AnyRef] = predicates :: Nil

override def optimizeByReorg(
sparkSession: SparkSession,
isPurge: Boolean,
icebergCompatVersion: Option[Int]): Seq[Row] = {
override def optimizeByReorg(sparkSession: SparkSession): Seq[Row] = {
val command = OptimizeTableCommand(
target,
predicates,
optimizeContext = DeltaOptimizeContext(
isPurge = isPurge,
reorg = Some(reorgOperation),
minFileSize = Some(0L),
maxDeletedRowsRatio = Some(0d),
icebergCompatVersion = icebergCompatVersion
)
maxDeletedRowsRatio = Some(0d))
)(zOrderBy = Nil)
command.run(sparkSession)
}

override def run(sparkSession: SparkSession): Seq[Row] = {
reorgTableSpec match {
case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) =>
optimizeByReorg(sparkSession, isPurge = true, icebergCompatVersion = None)
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
val table = getDeltaTable(target, "REORG")
upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion)
override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match {
case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) =>
optimizeByReorg(sparkSession)
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
val table = getDeltaTable(target, "REORG")
upgradeUniformIcebergCompatVersion(table, sparkSession, icebergCompatVersion)
}

protected def reorgOperation: DeltaReorgOperation = reorgTableSpec match {
case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) =>
new DeltaPurgeOperation()
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
new DeltaUpgradeUniformOperation(icebergCompatVersion)
}
}

/**
* Defines a Reorg operation to be applied during optimize.
*/
sealed trait DeltaReorgOperation {
/**
* Collects files that need to be processed by the reorg operation from the list of candidate
* files.
*/
def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile]
}

/**
* Reorg operation to purge files with soft deleted rows.
*/
class DeltaPurgeOperation extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] =
files.filter { file =>
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.numDeletedRecords > 0L
}
}

/**
* Reorg operation to upgrade the iceberg compatibility version of a table.
*/
class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = {
def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
!icebergCompatVersion.exists(_.toString == icebergCompatVersion)
}
files.filter(shouldRewriteToBeIcebergCompatible)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,30 +187,24 @@ case class OptimizeTableCommand(
/**
* Stored all runtime context information that can control the execution of optimize.
*
* @param isPurge Whether the rewriting task is only for purging soft-deleted data instead of
* for compaction. If [[isPurge]] is true, only files with DVs will be selected
* for compaction.
* @param reorg The REORG operation that triggered the rewriting task, if any.
* @param minFileSize Files which are smaller than this threshold will be selected for compaction.
* If not specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE]] will be used.
* This parameter must be set to `0` when [[isPurge]] is true.
* This parameter must be set to `0` when [[reorg]] is set.
* @param maxDeletedRowsRatio Files with a ratio of soft-deleted rows to the total rows larger than
* this threshold will be rewritten by the OPTIMIZE command. If not
* specified, [[DeltaSQLConf.DELTA_OPTIMIZE_MAX_DELETED_ROWS_RATIO]]
* will be used. This parameter must be set to `0` when [[isPurge]] is
* true.
* @param icebergCompatVersion The iceberg compatibility version used to rewrite data for
* uniform tables.
* will be used. This parameter must be set to `0` when [[reorg]] is set.
*/
case class DeltaOptimizeContext(
isPurge: Boolean = false,
reorg: Option[DeltaReorgOperation] = None,
minFileSize: Option[Long] = None,
maxFileSize: Option[Long] = None,
maxDeletedRowsRatio: Option[Double] = None,
icebergCompatVersion: Option[Int] = None) {
if (isPurge || icebergCompatVersion.isDefined) {
maxDeletedRowsRatio: Option[Double] = None) {
if (reorg.nonEmpty) {
require(
minFileSize.contains(0L) && maxDeletedRowsRatio.contains(0d),
"minFileSize and maxDeletedRowsRatio must be 0 when running PURGE.")
"minFileSize and maxDeletedRowsRatio must be 0 when running REORG TABLE.")
}
}

Expand Down Expand Up @@ -269,7 +263,10 @@ class OptimizeExecutor(
val candidateFiles = txn.filterFiles(partitionPredicate, keepNumRecords = true)
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = pruneCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles)
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq

val jobs = groupFilesIntoBins(partitionsToCompact, maxFileSize)
Expand Down Expand Up @@ -344,7 +341,7 @@ class OptimizeExecutor(
* Helper method to prune the list of selected files based on fileSize and ratio of
* deleted rows according to the deletion vector in [[AddFile]].
*/
private def pruneCandidateFileList(
private def filterCandidateFileList(
minFileSize: Long, maxDeletedRowsRatio: Double, files: Seq[AddFile]): Seq[AddFile] = {

// Select all files in case of multi-dimensional clustering
Expand All @@ -358,18 +355,9 @@ class OptimizeExecutor(
file.deletedToPhysicalRecordsRatio.getOrElse(0d) > maxDeletedRowsRatio
}

def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (optimizeContext.icebergCompatVersion.isEmpty) return false
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
!optimizeContext.icebergCompatVersion.exists(_.toString == icebergCompatVersion)
}

// Select files that are small, have too many deleted rows,
// or need to be made iceberg compatible
// Select files that are small or have too many deleted rows
files.filter(
addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile) ||
shouldRewriteToBeIcebergCompatible(addFile))
addFile => addFile.size < minFileSize || shouldCompactBecauseOfDeletedRows(addFile))
}

/**
Expand Down Expand Up @@ -414,8 +402,7 @@ class OptimizeExecutor(

bins.filter { bin =>
bin.size > 1 || // bin has more than one file or
(bin.size == 1 && bin(0).deletionVector != null) || // single file in the bin has a DV or
(bin.size == 1 && optimizeContext.icebergCompatVersion.isDefined) || // uniform reorg
bin.size == 1 && optimizeContext.reorg.nonEmpty || // always rewrite files during reorg
isMultiDimClustering // multi-clustering
}.map(b => (partition, b))
}
Expand Down Expand Up @@ -511,7 +498,7 @@ class OptimizeExecutor(

/** Create the appropriate [[Operation]] object for txn commit history */
private def getOperation(): Operation = {
if (optimizeContext.isPurge) {
if (optimizeContext.reorg.nonEmpty) {
DeltaOperations.Reorg(partitionPredicate)
} else {
DeltaOperations.Optimize(partitionPredicate, clusteringColumns, auto = isAutoCompact)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
/**
* Helper function to rewrite the table. Implemented by Reorg Table Command.
*/
def optimizeByReorg(
sparkSession: SparkSession,
isPurge: Boolean,
icebergCompatVersion: Option[Int]): Seq[Row]
def optimizeByReorg(sparkSession: SparkSession): Seq[Row]

/**
* Helper function to update the table icebergCompat properties.
Expand Down Expand Up @@ -172,11 +169,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
logInfo(s"Reorg Table ${target.tableIdentifier} to iceberg compat version = " +
s"$targetIcebergCompatVersion need rewrite data files.")
val metrics = try {
optimizeByReorg(
sparkSession,
isPurge = false,
icebergCompatVersion = Some(targetIcebergCompatVersion)
)
optimizeByReorg(sparkSession)
} catch {
case NonFatal(e) =>
throw DeltaErrors.icebergCompatDataFileRewriteFailedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait AutoCompactBase extends PostCommitHook with DeltaLogging {
recordDeltaOperation(deltaLog, s"$opType.execute") {
val txn = deltaLog.startTransaction(catalogTable)
val optimizeContext = DeltaOptimizeContext(
isPurge = false,
reorg = None,
minFileSizeOpt,
maxFileSizeOpt,
maxDeletedRowsRatio = maxDeletedRowsRatio
Expand Down
Loading