Skip to content

[SPARK-52409][SDP] Only use PipelineRunEventBuffer in tests #51352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private[connect] object PipelinesHandler extends Logging {
val dataflowGraphId = cmd.getDataflowGraphId
val graphElementRegistry = DataflowGraphRegistry.getDataflowGraphOrThrow(dataflowGraphId)
// We will use this variable to store the run failure event if it occurs. This will be set
// by the event callback that is executed when an event is added to the PipelineRunEventBuffer.
// by the event callback.
@volatile var runFailureEvent = Option.empty[PipelineEvent]
// Define a callback which will stream logs back to the SparkConnect client when an internal
// pipeline event is emitted during pipeline execution. We choose to pass a callback rather the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class PipelineExecution(context: PipelineUpdateContext) {
// Execute the graph.
graphExecution = Option(
new TriggeredGraphExecution(initializedGraph, context, onCompletion = terminationReason => {
context.eventBuffer.addEvent(
context.eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = None,
Expand All @@ -75,7 +75,7 @@ class PipelineExecution(context: PipelineUpdateContext) {
context.pipelineExecution.awaitCompletion()
} catch {
case e: Throwable =>
context.eventBuffer.addEvent(
context.eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.pipelines.graph

import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineRunEventBuffer}
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent}

trait PipelineUpdateContext {

Expand Down Expand Up @@ -50,8 +50,8 @@ trait PipelineUpdateContext {
UnionFlowFilter(flowFilterForTables, resetCheckpointFlows)
}

/** Buffer containing internal events that are emitted during a run of a pipeline. */
def eventBuffer: PipelineRunEventBuffer
/** Callback to invoke for internal events that are emitted during a run of a pipeline. */
def eventCallback: PipelineEvent => Unit

/** Emits internal flow progress events into the event buffer. */
def flowProgressEventLogger: FlowProgressEventLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
package org.apache.spark.sql.pipelines.graph

import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.pipelines.logging.{
FlowProgressEventLogger,
PipelineEvent,
PipelineRunEventBuffer
}
import org.apache.spark.sql.pipelines.logging.{FlowProgressEventLogger, PipelineEvent}

/**
* An implementation of the PipelineUpdateContext trait used in production.
Expand All @@ -31,17 +27,15 @@ import org.apache.spark.sql.pipelines.logging.{
*/
class PipelineUpdateContextImpl(
override val unresolvedGraph: DataflowGraph,
eventCallback: PipelineEvent => Unit
override val eventCallback: PipelineEvent => Unit
) extends PipelineUpdateContext {

override val spark: SparkSession = SparkSession.getActiveSession.getOrElse(
throw new IllegalStateException("SparkSession is not available")
)

override val eventBuffer = new PipelineRunEventBuffer(eventCallback)

override val flowProgressEventLogger: FlowProgressEventLogger =
new FlowProgressEventLogger(eventBuffer = eventBuffer)
new FlowProgressEventLogger(eventCallback = eventCallback)

override val refreshTables: TableFilter = AllTables
override val fullRefreshTables: TableFilter = NoTables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import org.apache.spark.sql.pipelines.graph.{FlowExecution, ResolutionCompletedF
* - All flow progress events other than errors/warnings will be logged at INFO level (including
* flow progress events with metrics) and error/warning messages will be logged at their level.
*
* @param eventBuffer Event log to log the flow progress events.
* @param eventCallback Callback to invoke on the flow progress events.
*/
class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Logging {
class FlowProgressEventLogger(eventCallback: PipelineEvent => Unit) extends Logging {

/**
* This map stores flow identifier to a boolean representing whether flow is running.
Expand All @@ -57,7 +57,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
* INFO level, since flows are only queued once.
*/
def recordQueued(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -76,7 +76,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
*/
def recordPlanningForBatchFlow(batchFlow: ResolvedFlow): Unit = synchronized {
if (batchFlow.df.isStreaming) return
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(batchFlow.displayName),
Expand All @@ -97,7 +97,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
* logged at METRICS. All other cases will be logged at INFO.
*/
def recordStart(flowExecution: FlowExecution): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flowExecution.displayName),
Expand All @@ -114,7 +114,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi

/** Records flow progress events with flow status as RUNNING. */
def recordRunning(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand Down Expand Up @@ -142,7 +142,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
): Unit = synchronized {
val eventLogMessage = messageOpt.getOrElse(s"Flow '${flow.displayName}' has FAILED.")

eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -165,7 +165,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
* record skipped should be used when the flow is skipped because of upstream flow failures.
*/
def recordSkippedOnUpStreamFailure(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -188,7 +188,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
* upstream failures use [[recordSkippedOnUpStreamFailure]] function.
*/
def recordSkipped(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -208,7 +208,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi

/** Records flow progress events with flow status as EXCLUDED at INFO level. */
def recordExcluded(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -232,7 +232,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
message: Option[String] = None,
cause: Option[Throwable] = None
): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -252,7 +252,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi

/** Records flow progress events with flow status as IDLE. */
def recordIdle(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand All @@ -277,7 +277,7 @@ class FlowProgressEventLogger(eventBuffer: PipelineRunEventBuffer) extends Loggi
* event.
*/
def recordCompletion(flow: ResolvedFlow): Unit = synchronized {
eventBuffer.addEvent(
eventCallback(
ConstructPipelineEvent(
origin = PipelineEventOrigin(
flowName = Option(flow.displayName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.pipelines.logging.{
FlowProgress,
FlowProgressEventLogger,
PipelineEvent,
PipelineRunEventBuffer,
RunProgress
}

Expand All @@ -60,12 +59,12 @@ trait TestPipelineUpdateContextMixin {
refreshTables: TableFilter = AllTables,
resetCheckpointFlows: FlowFilter = AllFlows
) extends PipelineUpdateContext {
val eventBuffer = new PipelineRunEventBuffer(eventCallback = _ => ())
val eventBuffer = new PipelineRunEventBuffer()

override val eventCallback: PipelineEvent => Unit = eventBuffer.addEvent

override def flowProgressEventLogger: FlowProgressEventLogger = {
new FlowProgressEventLogger(
eventBuffer = eventBuffer
)
new FlowProgressEventLogger(eventCallback = eventCallback)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,28 @@
* limitations under the License.
*/

package org.apache.spark.sql.pipelines.logging
package org.apache.spark.sql.pipelines.utils

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.pipelines.logging.PipelineEvent

/**
* An in-memory buffer which contains the internal events that are emitted during a run of a
* pipeline.
*
* @param eventCallback A callback function to be called when an event is added to the buffer.
*/
class PipelineRunEventBuffer(eventCallback: PipelineEvent => Unit) extends Logging {
class PipelineRunEventBuffer extends Logging {

/**
* A buffer to hold the events emitted during a pipeline run.
* This buffer is thread-safe and can be accessed concurrently.
*
* TODO(SPARK-52409): Deprecate this class to be used in test only and use a more
* robust event logging system in production.
*/
private val events = ArrayBuffer[PipelineEvent]()

def addEvent(event: PipelineEvent): Unit = synchronized {
val eventToAdd = event
events.append(eventToAdd)
eventCallback(event)
}

def clear(): Unit = synchronized {
Expand Down