Skip to content

Commit a52f3b7

Browse files
committed
[SPARK-50854][SS] Make path fully qualified before passing it to FileStreamSink
1 parent ad8222a commit a52f3b7

File tree

5 files changed

+51
-17
lines changed

5 files changed

+51
-17
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2950,4 +2950,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
29502950
messageParameters = Map("option" -> option)
29512951
)
29522952
}
2953+
2954+
def notAbsolutePathError(path: Path): SparkException = {
2955+
SparkException.internalError(s"$path is not absolute path.")
2956+
}
29532957
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ case class DataSource(
120120
private def newHadoopConfiguration(): Configuration =
121121
sparkSession.sessionState.newHadoopConfWithOptions(options)
122122

123+
private def makeQualified(path: Path): Path = {
124+
val fs = path.getFileSystem(newHadoopConfiguration())
125+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
126+
}
127+
123128
lazy val sourceInfo: SourceInfo = sourceSchema()
124129
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
125130
private val equality = sparkSession.sessionState.conf.resolver
@@ -319,9 +324,9 @@ case class DataSource(
319324
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
320325

321326
case fileFormat: FileFormat =>
322-
val path = caseInsensitiveOptions.getOrElse("path", {
327+
val path = makeQualified(new Path(caseInsensitiveOptions.getOrElse("path", {
323328
throw QueryExecutionErrors.dataPathNotSpecifiedError()
324-
})
329+
}))).toString
325330
if (outputMode != OutputMode.Append) {
326331
throw QueryCompilationErrors.dataSourceOutputModeUnsupportedError(className, outputMode)
327332
}
@@ -456,9 +461,7 @@ case class DataSource(
456461
// 3. It's OK that the output path doesn't exist yet;
457462
val allPaths = paths ++ caseInsensitiveOptions.get("path")
458463
val outputPath = if (allPaths.length == 1) {
459-
val path = new Path(allPaths.head)
460-
val fs = path.getFileSystem(newHadoopConfiguration())
461-
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
464+
makeQualified(new Path(allPaths.head))
462465
} else {
463466
throw QueryExecutionErrors.multiplePathsSpecifiedError(allPaths)
464467
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class FileStreamSink(
134134

135135
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
136136
private val basePath = new Path(path)
137+
if (!basePath.isAbsolute) {
138+
throw QueryExecutionErrors.notAbsolutePathError(basePath)
139+
}
137140
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
138141
sparkSession.sessionState.conf)
139142
private val retention = options.get("retention").map(Utils.timeStringAsMs)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
11011101
.start("/tmp")
11021102
}
11031103

1104-
e.getMessage should equal("Sink FileSink[/tmp] does not support async progress tracking")
1104+
e.getMessage should equal("Sink FileSink[file:/tmp] does not support async progress tracking")
11051105
}
11061106

11071107
test("with log purging") {

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.streaming
1919

2020
import java.io.{File, IOException}
21-
import java.nio.file.Files
21+
import java.nio.file.{Files, Paths}
2222
import java.util.Locale
2323

2424
import scala.collection.mutable.ArrayBuffer
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
2727
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
2828
import org.apache.hadoop.mapreduce.JobContext
2929

30-
import org.apache.spark.SparkConf
30+
import org.apache.spark.{SparkConf, SparkException}
3131
import org.apache.spark.deploy.SparkHadoopUtil
3232
import org.apache.spark.internal.io.FileCommitProtocol
3333
import org.apache.spark.paths.SparkPath
@@ -36,6 +36,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame}
3636
import org.apache.spark.sql.catalyst.util.stringToFile
3737
import org.apache.spark.sql.execution.DataSourceScanExec
3838
import org.apache.spark.sql.execution.datasources._
39+
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
3940
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
4041
import org.apache.spark.sql.execution.streaming._
4142
import org.apache.spark.sql.functions._
@@ -292,35 +293,48 @@ abstract class FileStreamSinkSuite extends StreamTest {
292293
test("parquet") {
293294
testFormat(None) // should not throw error as default format parquet when not specified
294295
testFormat(Some("parquet"))
296+
testFormat(None, relativizeOutputPath = true)
297+
testFormat(Some("parquet"), relativizeOutputPath = true)
295298
}
296299

297300
test("orc") {
298301
testFormat(Some("orc"))
302+
testFormat(Some("orc"), relativizeOutputPath = true)
299303
}
300304

301305
test("text") {
302306
testFormat(Some("text"))
307+
testFormat(Some("text"), relativizeOutputPath = true)
303308
}
304309

305310
test("json") {
306311
testFormat(Some("json"))
312+
testFormat(Some("json"), relativizeOutputPath = true)
307313
}
308314

309-
def testFormat(format: Option[String]): Unit = {
310-
val inputData = MemoryStream[Int]
315+
def testFormat(format: Option[String], relativizeOutputPath: Boolean = false): Unit = {
316+
val inputData = MemoryStream[String] // text format only supports String
311317
val ds = inputData.toDS()
312318

313-
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
319+
val tempDir = Utils.createTempDir(namePrefix = "stream.output")
320+
val outputPath = if (relativizeOutputPath) {
321+
Paths.get("").toAbsolutePath.relativize(tempDir.toPath).toString
322+
} else {
323+
tempDir.getCanonicalPath
324+
}
314325
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
315326

316-
var query: StreamingQuery = null
327+
val writer = ds.toDF("value").writeStream
328+
.option("checkpointLocation", checkpointDir)
329+
if (format.nonEmpty) {
330+
writer.format(format.get)
331+
}
317332

333+
var query: StreamingQuery = null
318334
try {
319-
val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
320-
if (format.nonEmpty) {
321-
writer.format(format.get)
322-
}
323-
query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
335+
query = writer.start(outputPath)
336+
inputData.addData("data")
337+
query.processAllAvailable()
324338
} finally {
325339
if (query != null) {
326340
query.stop()
@@ -664,6 +678,16 @@ abstract class FileStreamSinkSuite extends StreamTest {
664678
s" $path."))
665679
}
666680
}
681+
682+
test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") {
683+
val fileFormat = new ParquetFileFormat() // any valid FileFormat
684+
val partitionColumnNames = Seq.empty[String]
685+
val options = Map.empty[String, String]
686+
val exception = intercept[SparkException] {
687+
new FileStreamSink(spark, "test.parquet", fileFormat, partitionColumnNames, options)
688+
}
689+
assert(exception.getMessage.contains("is not absolute path."))
690+
}
667691
}
668692

669693
object PendingCommitFilesTrackingManifestFileCommitProtocol {

0 commit comments

Comments
 (0)