Skip to content

Commit

Permalink
[SPARK-17780][SQL] Report Throwable to user in StreamExecution
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

When using an incompatible source for structured streaming, it may throw NoClassDefFoundError. It's better to just catch Throwable and report it to the user since the streaming thread is dying.

## How was this patch tested?

`test("NoClassDefFoundError from an incompatible source")`

Author: Shixiong Zhu <[email protected]>

Closes apache#15352 from zsxwing/SPARK-17780.
  • Loading branch information
zsxwing authored and marmbrus committed Oct 6, 2016
1 parent 79accf4 commit 9a48e60
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,18 @@ class StreamExecution(
})
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
case NonFatal(e) =>
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toCompositeOffset(sources)))
logError(s"Query $name terminated with error", e)
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
if (!NonFatal(e)) {
throw e
}
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.streaming

import scala.reflect.ClassTag
import scala.util.control.ControlThrowable

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock

Expand Down Expand Up @@ -236,6 +238,33 @@ class StreamSuite extends StreamTest {
}
}

testQuietly("fatal errors from a source should be sent to the user") {
for (e <- Seq(
new VirtualMachineError {},
new ThreadDeath,
new LinkageError,
new ControlThrowable {}
)) {
val source = new Source {
override def getOffset: Option[Offset] = {
throw e
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
throw e
}

override def schema: StructType = StructType(Array(StructField("value", IntegerType)))

override def stop(): Unit = {}
}
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
testStream(df)(
ExpectFailure()(ClassTag(e.getClass))
)
}
}

test("output mode API in Scala") {
val o1 = OutputMode.Append
assert(o1 === InternalOutputModes.Append)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
val causeClass: Class[T] = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
override def toString(): String = s"ExpectFailure[${causeClass.getCanonicalName}]"
override def toString(): String = s"ExpectFailure[${causeClass.getName}]"
}

/** Assert that a body is true */
Expand Down Expand Up @@ -322,7 +322,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
streamDeathCause = e
testThread.interrupt()
}
})

Expand Down

0 comments on commit 9a48e60

Please sign in to comment.