Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50854][SS] Make path fully qualified before passing it to FileStreamSink #49654

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/streaming/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](../sql-migration-gui

- Since Spark 4.0, Spark falls back to single batch execution if any source in the query does not support `Trigger.AvailableNow`. This is to avoid any possible correctness, duplication, and dataloss issue due to incompatibility between source and wrapper implementation. (See [SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more details.)
- Since Spark 4.0, new configuration `spark.sql.streaming.ratioExtraSpaceAllowedInCheckpoint` (default: `0.3`) controls the amount of additional space allowed in the checkpoint directory to store stale version files for batch deletion inside maintenance task. This is to amortize the cost of listing in cloud store. Setting this to `0` defaults to the old behavior. (See [SPARK-48931](https://issues.apache.org/jira/browse/SPARK-48931) for more details.)
- Since Spark 4.0, when relative path is used to output data in `DataStreamWriter` the resolution to absolute path is done in the Spark Driver and is not deferred to Spark Executor. This is to make Structured Streaming behavior similar to DataFrame API (`DataFrameWriter`). (See [SPARK-50854](https://issues.apache.org/jira/browse/SPARK-50854) for more details.)

## Upgrading from Structured Streaming 3.3 to 3.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2950,4 +2950,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
messageParameters = Map("option" -> option)
)
}

def notAbsolutePathError(path: Path): SparkException = {
SparkException.internalError(s"$path is not absolute path.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ case class DataSource(
private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)

private def makeQualified(path: Path): Path = {
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}

lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -319,9 +324,9 @@ case class DataSource(
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)

case fileFormat: FileFormat =>
val path = caseInsensitiveOptions.getOrElse("path", {
val path = makeQualified(new Path(caseInsensitiveOptions.getOrElse("path", {
throw QueryExecutionErrors.dataPathNotSpecifiedError()
})
}))).toString
if (outputMode != OutputMode.Append) {
throw QueryCompilationErrors.dataSourceOutputModeUnsupportedError(className, outputMode)
}
Expand Down Expand Up @@ -456,9 +461,7 @@ case class DataSource(
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
makeQualified(new Path(allPaths.head))
} else {
throw QueryExecutionErrors.multiplePathsSpecifiedError(allPaths)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ class FileStreamSink(

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
if (!basePath.isAbsolute) {
throw QueryExecutionErrors.notAbsolutePathError(basePath)
}
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val retention = options.get("retention").map(Utils.timeStringAsMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
.start("/tmp")
}

e.getMessage should equal("Sink FileSink[/tmp] does not support async progress tracking")
e.getMessage should equal("Sink FileSink[file:/tmp] does not support async progress tracking")
}

test("with log purging") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
Expand All @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.JobContext

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
Expand All @@ -36,6 +36,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -292,35 +293,48 @@ abstract class FileStreamSinkSuite extends StreamTest {
test("parquet") {
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
testFormat(None, relativizeOutputPath = true)
testFormat(Some("parquet"), relativizeOutputPath = true)
}

test("orc") {
testFormat(Some("orc"))
testFormat(Some("orc"), relativizeOutputPath = true)
}

test("text") {
testFormat(Some("text"))
testFormat(Some("text"), relativizeOutputPath = true)
}

test("json") {
testFormat(Some("json"))
testFormat(Some("json"), relativizeOutputPath = true)
}

def testFormat(format: Option[String]): Unit = {
val inputData = MemoryStream[Int]
def testFormat(format: Option[String], relativizeOutputPath: Boolean = false): Unit = {
val inputData = MemoryStream[String] // text format only supports String
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val tempDir = Utils.createTempDir(namePrefix = "stream.output")
val outputPath = if (relativizeOutputPath) {
Paths.get("").toAbsolutePath.relativize(tempDir.toPath).toString
} else {
tempDir.getCanonicalPath
}
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null
val writer = ds.toDF("value").writeStream
.option("checkpointLocation", checkpointDir)
if (format.nonEmpty) {
writer.format(format.get)
}

var query: StreamingQuery = null
try {
val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
query = writer.start(outputPath)
inputData.addData("data")
query.processAllAvailable()
} finally {
if (query != null) {
query.stop()
Expand Down Expand Up @@ -664,6 +678,16 @@ abstract class FileStreamSinkSuite extends StreamTest {
s" $path."))
}
}

test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") {
val fileFormat = new ParquetFileFormat() // any valid FileFormat
val partitionColumnNames = Seq.empty[String]
val options = Map.empty[String, String]
val exception = intercept[SparkException] {
new FileStreamSink(spark, "test.parquet", fileFormat, partitionColumnNames, options)
}
assert(exception.getMessage.contains("is not absolute path."))
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down