Skip to content

Commit

Permalink
[SPARK-51185][CORE][3.5] Revert simplifications to PartitionedFileUti…
Browse files Browse the repository at this point in the history
…l API to reduce memory requirements

### What changes were proposed in this pull request?

This PR reverts an earlier change (#41632) that converted FileStatusWithMetadata.getPath from a def to a lazy val in order to simplify the PartitionedFileUtils helpers.

This is the 3.5 PR. The main PR for 4.0 is #49915.

### Why are the changes needed?

The conversion of getPath from a def to a lazy val increases the memory requirements because now paths need to be kept in memory as long as the FileStatusWithMetadata exists. As paths are expensive to store, this can lead to higher memory utilization and increase the risk for OOMs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

This is a small revert to code that has already existed before so the existing tests are sufficient.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49995 from LukasRupprecht/def_get-path_3.5.

Authored-by: Lukas Rupprecht <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
LukasRupprecht authored and cloud-fan committed Feb 21, 2025
1 parent 4d15f64 commit 23637fe
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values))
}.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
Expand Down Expand Up @@ -689,12 +689,15 @@ case class FileSourceScanExec(

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
if (shouldProcess(file.getPath)) {
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, file.getPath)
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
Expand All @@ -28,6 +28,7 @@ object PartitionedFileUtil {
def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
Expand All @@ -36,19 +37,20 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size)
PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), offset, size, hosts,
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts,
file.getModificationTime, file.getLen, file.metadata)
}
} else {
Seq(getPartitionedFile(file, partitionValues))
Seq(getPartitionedFile(file, filePath, partitionValues))
}
}

def getPartitionedFile(
file: FileStatusWithMetadata,
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen)
PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0, file.getLen, hosts,
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts,
file.getModificationTime, file.getLen, file.metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.sql.types.StructType
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
// Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
// NOTE: getPath() is very expensive, so we only want to call it once (if accessed at all).
lazy val getPath: Path = fileStatus.getPath
def getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ trait FileScan extends Scan
partition.values
}
partition.files.flatMap { file =>
val filePath = file.getPath
PartitionedFileUtil.splitFiles(
sparkSession = sparkSession,
file = file,
isSplitable = isSplitable(file.getPath),
filePath = filePath,
isSplitable = isSplitable(filePath),
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
)
Expand Down

0 comments on commit 23637fe

Please sign in to comment.