Skip to content

Commit

Permalink
[SPARK-45815][SQL][STREAMING] Provide an interface for other Streamin…
Browse files Browse the repository at this point in the history
…g sources to add `_metadata` columns

### What changes were proposed in this pull request?

Currently, only the native V1 file-based streaming source can read the `_metadata` column: https://github.com/apache/spark/blob/370870b7a0303e4a2c4b3dea1b479b4fcbc93f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala#L63

Our goal is to create an interface that allows other streaming sources to add `_metadata` columns. For instance, we would like the Delta Streaming source, which you can find here: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L49, to extend this interface and provide the `_metadata` column for its underlying storage format, such as Parquet.

### Why are the changes needed?
A generic interface to enable other streaming sources to expose and add `_metadata` columns.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43692 from Yaohua628/spark-45815.

Authored-by: Yaohua Zhao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Yaohua628 authored and cloud-fan committed Nov 9, 2023
1 parent 5ac88b1 commit 1e93c40
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat}
import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
Expand Down Expand Up @@ -60,11 +61,11 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))

override lazy val metadataOutput: Seq[AttributeReference] = {
dataSource.providingClass match {
// If the dataSource provided class is a same or subclass of FileFormat class
case f if classOf[FileFormat].isAssignableFrom(f) =>
metadataOutputWithOutConflicts(
Seq(dataSource.providingInstance().asInstanceOf[FileFormat].createFileMetadataCol()))
dataSource.providingInstance() match {
case f: FileFormat => metadataOutputWithOutConflicts(Seq(f.createFileMetadataCol()))
case s: SupportsStreamSourceMetadataColumns =>
metadataOutputWithOutConflicts(s.getMetadataOutput(
dataSource.sparkSession, dataSource.options, dataSource.userSpecifiedSchema))
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,34 @@ trait InsertableRelation {
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
}

/**
* Implemented by StreamSourceProvider objects that can generate file metadata columns.
* This trait extends the basic StreamSourceProvider by allowing the addition of metadata
* columns to the schema of the Stream Data Source.
*/
trait SupportsStreamSourceMetadataColumns extends StreamSourceProvider {

/**
* Returns the metadata columns that should be added to the schema of the Stream Source.
* These metadata columns supplement the columns
* defined in the sourceSchema() of the StreamSourceProvider.
*
* The final schema for the Stream Source, therefore, consists of the source schema as
* defined by StreamSourceProvider.sourceSchema(), with the metadata columns added at the end.
* The caller is responsible for resolving any naming conflicts with the source schema.
*
* An example of using this streaming source metadata output interface is
* when a customized file-based streaming source needs to expose file metadata columns,
* leveraging the hidden file metadata columns from its underlying storage format.
*
* @param spark The SparkSession used for the operation.
* @param options A map of options of the Stream Data Source.
* @param userSpecifiedSchema An optional user-provided schema of the Stream Data Source.
* @return A Seq of AttributeReference representing the metadata output attributes.
*/
def getMetadataOutput(
spark: SparkSession,
options: Map[String, String],
userSpecifiedSchema: Option[StructType]): Seq[AttributeReference]
}

0 comments on commit 1e93c40

Please sign in to comment.