Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.page.SerializedPage;

import javax.annotation.concurrent.GuardedBy;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -67,10 +69,9 @@ public class HttpNativeExecutionTaskResultFetcher
private final Object taskHasResult;
private final AtomicReference<Throwable> lastException = new AtomicReference<>();

@GuardedBy("this")
private ScheduledFuture<?> scheduledFuture;

private volatile boolean completed;

private long token;

public HttpNativeExecutionTaskResultFetcher(
Expand All @@ -86,15 +87,15 @@ public HttpNativeExecutionTaskResultFetcher(
this.taskHasResult = requireNonNull(taskHasResult, "taskHasResult is null");
}

public void start()
public synchronized void start()
{
scheduledFuture = scheduler.scheduleAtFixedRate(this::doGetResults,
0,
(long) FETCH_INTERVAL.getValue(),
FETCH_INTERVAL.getUnit());
}

public void stop(boolean success)
public synchronized void stop(boolean success)
{
if (scheduledFuture != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. should we do a check instead? we should enforce this class to be started before calling stop()

scheduledFuture.cancel(false);
Expand Down Expand Up @@ -129,7 +130,7 @@ public boolean hasPage()
return !pageBuffer.isEmpty();
}

private void throwIfFailed()
private synchronized void throwIfFailed()
{
if (scheduledFuture != null && scheduledFuture.isCancelled() && lastException.get() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Throwable failure = lastException.get();
Expand All @@ -140,11 +141,6 @@ private void throwIfFailed()

private void doGetResults()
{
if (completed && scheduledFuture != null) {
scheduledFuture.cancel(false);
return;
}

if (bufferMemoryBytes.longValue() >= MAX_BUFFER_SIZE.toBytes()) {
return;
}
Expand All @@ -159,7 +155,7 @@ private void doGetResults()
}
}

private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
private synchronized void onSuccess(PageBufferClient.PagesResponse pagesResponse)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 thanks for this change.

I have a bit of concern about the synchronized(taskHasResult) {...} block in this method. Since taskHasResult is a shared object used beyond this class, we need to ensure this won't cause deadlocks.

For example, if another thread holds the lock on taskHasResult and then attempts to call HttpNativeExecutionTaskResultFetcher.close(), this could lead to a deadlock situation. Any thoughts? PLMK if I have misunderstood anything. @xin-zhang2 @tanjialiang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though concerning, this shall be alright. Outside of this file the sites are only waiting on this taskHasResult object. Only this file has the responsibility on notifying. This means all synchronized(taskHasResult) outside of this file will and should have taskHasResult.wait() in the block, which releases the lock. As long as they don't call this onSuccess or onFailure methods between synchronized(taskHasResult) and taskHasResult.wait() we should be good. And we don't have such cases in the codebase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Yes, on one hand you are correct, the taskHasResult lock alone isn't a problem, as we are using it correctly. But on the other hand, the issue I'm concerned about arises with the introduction of a second lock in the current context.

And in fact, after checking the code in detail, I believe that a deadlock may occur between methods PrestoSparkNativeTaskExecutorFactory.computeNext() and HttpNativeExecutionTaskResultFetcher.doGetResults(). The two methods try to acquire and hold the two different synchronized locks in conflicting orders. By adding a delay (Thread.sleep(10000)) between their acquisition of the two locks, we should be able to reproduce the deadlock. The jstack information is as follows:

Found one Java-level deadlock:
=============================
"Executor task launch worker-0":
  waiting to lock monitor 0x000076af0c0c83f0 (object 0x00000004563ae070, a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher),
  which is held by "presto-spark-scheduled-executor-3"

"presto-spark-scheduled-executor-3":
  waiting to lock monitor 0x000076af5c162fd0 (object 0x00000004563adfd0, a java.lang.Object),
  which is held by "Executor task launch worker-0"

Java stack information for the threads listed above:
===================================================
"Executor task launch worker-0":
	at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.throwIfFailed(HttpNativeExecutionTaskResultFetcher.java:135)
	- waiting to lock <0x00000004563ae070> (a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher)
	at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.hasPage(HttpNativeExecutionTaskResultFetcher.java:129)
	at com.facebook.presto.spark.execution.task.NativeExecutionTask.hasResult(NativeExecutionTask.java:152)
	at com.facebook.presto.spark.execution.task.PrestoSparkNativeTaskExecutorFactory$PrestoSparkNativeTaskOutputIterator.computeNext(PrestoSparkNativeTaskExecutorFactory.java:623)
	- locked <0x00000004563adfd0> (a java.lang.Object)
	at com.facebook.presto.spark.execution.task.PrestoSparkNativeTaskExecutorFactory$PrestoSparkNativeTaskOutputIterator.hasNext(PrestoSparkNativeTaskExecutorFactory.java:578)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$collectAsync$1$$anonfun$apply$7.apply(AsyncRDDActions.scala:60)
	at org.apache.spark.rdd.AsyncRDDActions$$anonfun$collectAsync$1$$anonfun$apply$7.apply(AsyncRDDActions.scala:60)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:1976)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:842)
"presto-spark-scheduled-executor-3":
	at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.onSuccess(HttpNativeExecutionTaskResultFetcher.java:197)
	- waiting to lock <0x00000004563adfd0> (a java.lang.Object)
	- locked <0x00000004563ae070> (a com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher)
	at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher.doGetResults(HttpNativeExecutionTaskResultFetcher.java:151)
	at com.facebook.presto.spark.execution.nativeprocess.HttpNativeExecutionTaskResultFetcher$$Lambda$3881/0x000076b2413c7628.run(Unknown Source)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.runAndReset$$$capture([email protected]/FutureTask.java:305)
	at java.util.concurrent.FutureTask.runAndReset([email protected]/FutureTask.java)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run([email protected]/ScheduledThreadPoolExecutor.java:305)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:842)

Found 1 deadlock.

{
List<SerializedPage> pages = pagesResponse.getPages();
long bytes = 0;
Expand All @@ -185,7 +181,6 @@ private void onSuccess(PageBufferClient.PagesResponse pagesResponse)
}
token = nextToken;
if (pagesResponse.isClientComplete()) {
completed = true;
workerClient.abortResultsAsync(taskId);
if (scheduledFuture != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shall not be null at this point, right? could we force a non-null check here instead?

scheduledFuture.cancel(false);
Expand Down
Loading