Skip to content

Commit 3f03a1c

Browse files
Ngone51cloud-fan
authored andcommitted
[SPARK-53394][CORE] UninterruptibleLock.isInterruptible should avoid duplicated interrupt
### What changes were proposed in this pull request? This PR fixes `UninterruptibleLock.isInterruptible` to avoid duplicated interruption when the thread is already interrupted. ### Why are the changes needed? The "uninterruptible" semantic of `UninterruptibleThread`is broken (i.e., `UninterruptibleThread` is interruptible even if it's under `runUninterruptibly`) after the fix #50594. The probelm is that the state of `shouldInterruptThread` becomes unsafe when there are multiple interrupts concurrently. For example, thread A could interrupt UninterruptibleThread ut first before UninterruptibleThread enters `runUninterruptibly`. Right after that, another thread B starts to invoke ut.interrupt() and pass through `uninterruptibleLock.isInterruptible` (becasue at this point, `shouldInterruptThread = uninterruptible = false`). Before thread B invokes `super.interrupt()`, UninterruptibleThread ut enters `runUninterruptibly` and pass through `uninterruptibleLock.getAndSetUninterruptible` and set `uninterruptible = true`. Then, thread ut continues the check `uninterruptibleLock.isInterruptPending`. However, `uninterruptibleLock.isInterruptPending` return false at this point (due to `shouldInterruptThread = Thread.interrupted = true`) even though thread B is actully interrupting. *As a result, the state of `shouldInterruptThread` becomes inconsistent between thread B and thread ut.* Then, as `uninterruptibleLock.isInterruptPending` returns false, ut to continute to execute `f`. At the same time, thread B invokes `super.interrupt()`, and `f` could be interrupted ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested. The issue can be easily reproduced if we run `UninterruptibleThreadSuite.stress test` for 100 times in a row: ``` [info]   true did not equal false (UninterruptibleThreadSuite.scala:208) [info]   org.scalatest.exceptions.TestFailedException: [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info]   at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$7(UninterruptibleThreadSuite.scala:208) ... ``` And the issue is gone after the fix. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52139 from Ngone51/fix-uninterruptiable. Authored-by: Yi Wu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 78871d7) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 91bfbba commit 3f03a1c

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,11 @@ private[spark] class UninterruptibleThread(
107107
// super.interrupt() is called. In this case to prevent runUninterruptibly() from being
108108
// interrupted, we use awaitInterruptThread flag. We need to set it only if
109109
// runUninterruptibly() is not yet set uninterruptible to true (!shouldInterruptThread) and
110-
// there is no other threads that called interrupt (awaitInterruptThread is already true)
111-
if (!shouldInterruptThread && !awaitInterruptThread) {
110+
// there is no other threads that called interrupt (awaitInterruptThread is already true or
111+
// isInterrupted is true. (SPARK-53394) Otherwise, the state of shouldInterruptThread would
112+
// become inconsistent between isInterruptible() and isInterruptPending(), leading to
113+
// UninterruptibleThread be interruptible under runUninterruptibly.)
114+
if (!shouldInterruptThread && !awaitInterruptThread && !isInterrupted) {
112115
awaitInterruptThread = true
113116
true
114117
} else {

0 commit comments

Comments
 (0)