3232import  java .nio .channels .Channels ;
3333import  java .nio .channels .WritableByteChannel ;
3434import  java .util .concurrent .CompletableFuture ;
35+ import  java .util .concurrent .CompletionStage ;
3536import  java .util .concurrent .atomic .AtomicBoolean ;
37+ import  java .util .function .Consumer ;
3638
3739public  class  LogWatchCallback  implements  LogWatch , AutoCloseable  {
3840
@@ -41,9 +43,11 @@ public class LogWatchCallback implements LogWatch, AutoCloseable {
4143  private  final  OutputStream  out ;
4244  private  WritableByteChannel  outChannel ;
4345  private  volatile  InputStream  output ;
46+  
4447
4548  private  final  AtomicBoolean  closed  = new  AtomicBoolean (false );
4649  private  final  CompletableFuture <AsyncBody > asyncBody  = new  CompletableFuture <>();
50+   private  final  CompletableFuture <Throwable > onCloseFuture  = new  CompletableFuture <>();
4751  private  final  SerialExecutor  serialExecutor ;
4852
4953  public  LogWatchCallback (OutputStream  out , OperationContext  context ) {
@@ -53,18 +57,26 @@ public LogWatchCallback(OutputStream out, OperationContext context) {
5357    }
5458    this .serialExecutor  = new  SerialExecutor (context .getExecutor ());
5559  }
60+   
61+   @ Override 
62+   public  CompletionStage <Throwable > onClose () {
63+ 	return  onCloseFuture .minimalCompletionStage ();
64+   }
5665
5766  @ Override 
5867  public  void  close () {
59-     cleanUp ();
68+     cleanUp (null );
6069  }
6170
62-   private  void  cleanUp () {
71+   private  void  cleanUp (Throwable   u ) {
6372    if  (!closed .compareAndSet (false , true )) {
6473      return ;
6574    }
75+    
6676    asyncBody .thenAccept (AsyncBody ::cancel );
77+     onCloseFuture .complete (u );
6778    serialExecutor .shutdownNow ();
79+    
6880  }
6981
7082  public  LogWatchCallback  callAndWait (HttpClient  client , URL  url ) {
@@ -111,7 +123,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) {
111123            if  (t  != null ) {
112124              onFailure (t );
113125            } else  {
114-               cleanUp ();
126+               cleanUp (null );
115127            }
116128          }, serialExecutor ));
117129        }
@@ -131,9 +143,9 @@ public void onFailure(Throwable u) {
131143    if  (closed .get ()) {
132144      return ;
133145    }
134- 
146+      
135147    LOGGER .error ("Log Callback Failure." , u );
136-     cleanUp ();
148+     cleanUp (u );
137149  }
138150
139151}
0 commit comments