-
Notifications
You must be signed in to change notification settings - Fork 5.5k
fix: Make scheduledFuture thread-safe in HttpNativeExecutionTaskResultFetcher #26649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's guide (collapsed on small PRs)Reviewer's GuideThis PR makes scheduledFuture access in HttpNativeExecutionTaskResultFetcher thread-safe by synchronizing core methods, removes the unused completed flag, and cleans up redundant completion logic in result fetching. Class diagram for updated HttpNativeExecutionTaskResultFetcher thread safetyclassDiagram
class HttpNativeExecutionTaskResultFetcher {
-Object taskHasResult
-AtomicReference<Throwable> lastException
-ScheduledFuture<?> scheduledFuture
-long token
+synchronized void start()
+synchronized void stop(boolean success)
+boolean hasPage()
+synchronized void throwIfFailed()
-void doGetResults()
+synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
}
class ScheduledFuture {
}
HttpNativeExecutionTaskResultFetcher --> ScheduledFuture
class AtomicReference {
}
HttpNativeExecutionTaskResultFetcher --> AtomicReference
class PageBufferClient {
}
class PagesResponse {
}
HttpNativeExecutionTaskResultFetcher --> PagesResponse
PageBufferClient --> PagesResponse
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:133` </location>
<code_context>
}
- private void throwIfFailed()
+ private synchronized void throwIfFailed()
{
if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) {
</code_context>
<issue_to_address>
**suggestion:** Synchronizing 'throwIfFailed' may be unnecessary if only reading atomic references.
As 'lastException' is an AtomicReference and only reads are performed, you may be able to remove synchronization unless specific ordering is required.
```suggestion
private void throwIfFailed()
```
</issue_to_address>
### Comment 2
<location> `presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java:158` </location>
<code_context>
}
- private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
+ private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
{
List<SerializedPage> pages = pagesResponse.getPages();
</code_context>
<issue_to_address>
**suggestion (performance):** Synchronizing 'onSuccess' may block other operations; review for performance impact.
If 'onSuccess' is invoked often or handles large batches, method-level synchronization may cause delays. Evaluate if finer-grained synchronization is possible.
Suggested implementation:
```java
private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
{
List<SerializedPage> pages = pagesResponse.getPages();
long bytes = 0;
// Synchronize only the critical section that modifies shared state
synchronized (this) {
token = nextToken;
}
if (pagesResponse.isClientComplete()) {
workerClient.abortResultsAsync(taskId);
synchronized (this) {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
}
```
You should review all shared mutable state accessed in `onSuccess` and ensure only those modifications are synchronized. If other variables (e.g., `token`, `scheduledFuture`) are shared across threads, synchronize their access as shown. If more shared state is present in the full method, wrap only those assignments in `synchronized` blocks. This will minimize blocking and improve performance.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
.../com/facebook/presto/spark/execution/nativeprocess/HttpNativeExecutionTaskResultFetcher.java
Show resolved
Hide resolved
|
Thanks @xin-zhang2 for the improvement. Left some comments. |
Thanks @tanjialiang, but I don’t seem to see the comments. Would you mind checking again? |
| } | ||
|
|
||
| private void onSuccess(PageBufferClient.PagesResponse pagesResponse) | ||
| private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xin-zhang2 thanks for this change.
I have a bit of concern about the synchronized(taskHasResult) {...} block in this method. Since taskHasResult is a shared object used beyond this class, we need to ensure this won't cause deadlocks.
For example, if another thread holds the lock on taskHasResult and then attempts to call HttpNativeExecutionTaskResultFetcher.close(), this could lead to a deadlock situation. Any thoughts? PLMK if I have misunderstood anything. @xin-zhang2 @tanjialiang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though concerning, this shall be alright. Outside of this file the sites are only waiting on this taskHasResult object. Only this file has the responsibility on notifying. This means all synchronized(taskHasResult) outside of this file will and should have taskHasResult.wait() in the block, which releases the lock. As long as they don't call this onSuccess or onFailure methods between synchronized(taskHasResult) and taskHasResult.wait() we should be good. And we don't have such cases in the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Yes, on one hand you are correct, the taskHasResult lock alone isn't a problem, as we are using it correctly. But on the other hand, the issue I'm concerned about arises with the introduction of a second lock in the current context.
And in fact, after checking the code in detail, I believe that a deadlock may occur between methods PrestoSparkNativeTaskExecutorFactory.computeNext() and HttpNativeExecutionTaskResultFetcher.doGetResults(). The two methods try to acquire and hold the two different synchronized locks in conflicting orders. By adding a delay (Thread.sleep(10000)) between their acquisition of the two locks, we should be able to reproduce the deadlock. The jstack information is as follows:
Found one Java-level deadlock:
=============================
"Executor task launch worker-0":
waiting to lock monitor 0x000076af0c0c83f0 (object 0x00000004563ae070, a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher),
which is held by "presto-spark-scheduled-executor-3"
"presto-spark-scheduled-executor-3":
waiting to lock monitor 0x000076af5c162fd0 (object 0x00000004563adfd0, a java.lang.Object),
which is held by "Executor task launch worker-0"
Java stack information for the threads listed above:
===================================================
"Executor task launch worker-0":
at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.throwIfFailed(HttpNativeExecutionTaskResultFetcher.java:135)
- waiting to lock <0x00000004563ae070> (a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher)
at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.hasPage(HttpNativeExecutionTaskResultFetcher.java:129)
at com.facebook.presto.spark.execution.task.NativeExecutionTask.hasResult(NativeExecutionTask.java:152)
at com.facebook.presto.spark.execution.task.PrestoSparkNativeTaskExecutorFactory$PrestoSparkNativeTaskOutputIterator.computeNext(PrestoSparkNativeTaskExecutorFactory.java:623)
- locked <0x00000004563adfd0> (a java.lang.Object)
at com.facebook.presto.spark.execution.task.PrestoSparkNativeTaskExecutorFactory$PrestoSparkNativeTaskOutputIterator.hasNext(PrestoSparkNativeTaskExecutorFactory.java:578)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$collectAsync$1$$anonfun$apply$7.apply(AsyncRDDActions.scala:60)
at org.apache.spark.rdd.AsyncRDDActions$$anonfun$collectAsync$1$$anonfun$apply$7.apply(AsyncRDDActions.scala:60)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:842)
"presto-spark-scheduled-executor-3":
at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.onSuccess(HttpNativeExecutionTaskResultFetcher.java:197)
- waiting to lock <0x00000004563adfd0> (a java.lang.Object)
- locked <0x00000004563ae070> (a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher)
at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.doGetResults(HttpNativeExecutionTaskResultFetcher.java:151)
at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher$$Lambda$3881/0x000076b2413c7628.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
at java.util.concurrent.FutureTask.runAndReset$$$capture([email protected]/FutureTask.java:305)
at java.util.concurrent.FutureTask.runAndReset([email protected]/FutureTask.java)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:305)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
at java.lang.Thread.run([email protected]/Thread.java:842)
Found 1 deadlock.
| if (pagesResponse.isClientComplete()) { | ||
| completed = true; | ||
| workerClient.abortResultsAsync(taskId); | ||
| if (scheduledFuture != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shall not be null at this point, right? could we force a non-null check here instead?
| public void stop(boolean success) | ||
| public synchronized void stop(boolean success) | ||
| { | ||
| if (scheduledFuture != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. should we do a check instead? we should enforce this class to be started before calling stop()
| private void throwIfFailed() | ||
| private synchronized void throwIfFailed() | ||
| { | ||
| if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Sorry that these comments were in pending stage.. Now I've published them. |
Description
As disscussed in #26550 (comment), we will use synchronized methods to fix the potential thread safety issues in HttpNativeExecutionTaskResultFetcher.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.