Skip to content

Commit

Permalink
changes lazy val to def
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasRupprecht committed Feb 20, 2025
1 parent 4d15f64 commit 09b5075
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ case class FileSourceScanExec(
logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
val filesGroupedToBuckets =
selectedPartitions.flatMap { p =>
p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, p.values))
p.files.map(f => PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values))
}.groupBy { f =>
BucketingUtils
.getBucketId(f.toPath.getName)
Expand Down Expand Up @@ -689,12 +689,15 @@ case class FileSourceScanExec(

val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
if (shouldProcess(file.getPath)) {
// getPath() is very expensive so we only want to call it once in this block:
val filePath = file.getPath
if (shouldProcess(filePath)) {
val isSplitable = relation.fileFormat.isSplitable(
relation.sparkSession, relation.options, file.getPath)
relation.sparkSession, relation.options, filePath)
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
filePath = filePath,
isSplitable = isSplitable,
maxSplitBytes = maxSplitBytes,
partitionValues = partition.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus}
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.SparkSession
Expand All @@ -28,6 +28,7 @@ object PartitionedFileUtil {
def splitFiles(
sparkSession: SparkSession,
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
partitionValues: InternalRow): Seq[PartitionedFile] = {
Expand All @@ -36,19 +37,20 @@ object PartitionedFileUtil {
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size)
PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), offset, size, hosts,
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts,
file.getModificationTime, file.getLen, file.metadata)
}
} else {
Seq(getPartitionedFile(file, partitionValues))
Seq(getPartitionedFile(file, filePath, partitionValues))
}
}

def getPartitionedFile(
file: FileStatusWithMetadata,
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen)
PartitionedFile(partitionValues, SparkPath.fromPath(file.getPath), 0, file.getLen, hosts,
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts,
file.getModificationTime, file.getLen, file.metadata)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import org.apache.spark.sql.types.StructType
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
// Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
// NOTE: getPath() is very expensive, so we only want to call it once (if accessed at all).
lazy val getPath: Path = fileStatus.getPath
def getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ trait FileScan extends Scan
partition.values
}
partition.files.flatMap { file =>
val filePath = file.getPath
PartitionedFileUtil.splitFiles(
sparkSession = sparkSession,
file = file,
isSplitable = isSplitable(file.getPath),
filePath = filePath,
isSplitable = isSplitable(filePath),
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
)
Expand Down

0 comments on commit 09b5075

Please sign in to comment.