2525import android .os .IBinder ;
2626import android .os .Parcel ;
2727import android .os .Process ;
28+
29+ import com .google .common .base .Preconditions ;
2830import com .google .common .base .Ticker ;
2931import com .google .common .util .concurrent .FutureCallback ;
3032import com .google .common .util .concurrent .Futures ;
5759import io .grpc .internal .StatsTraceContext ;
5860import java .util .concurrent .Executor ;
5961import java .util .concurrent .ScheduledFuture ;
62+ import java .util .concurrent .atomic .AtomicBoolean ;
63+ import java .util .concurrent .atomic .AtomicInteger ;
6064
6165import javax .annotation .Nullable ;
6266import javax .annotation .concurrent .ThreadSafe ;
@@ -73,12 +77,13 @@ public final class BinderClientTransport extends BinderTransport
7377 private final Bindable serviceBinding ;
7478
7579 /** Number of ongoing calls which keep this transport "in-use". */
76- @ GuardedBy ("this" )
77- private int numInUseStreams ;
80+ private final AtomicInteger numInUseStreams ;
7881
7982 /** Last in-use state that was reported to the listener */
80- @ GuardedBy ("this" )
81- private boolean listenerInUse ;
83+ private final AtomicBoolean listenerInUse ;
84+
85+ /** Synchronizes transport listener callbacks */
86+ private final Object listenerNotifyLock ;
8287
8388 private final long readyTimeoutMillis ;
8489 private final PingTracker pingTracker ;
@@ -119,8 +124,10 @@ public BinderClientTransport(
119124 Boolean preAuthServerOverride = options .getEagAttributes ().get (PRE_AUTH_SERVER_OVERRIDE );
120125 this .preAuthorizeServer =
121126 preAuthServerOverride != null ? preAuthServerOverride : factory .preAuthorizeServers ;
122- numInUseStreams = 0 ;
123- listenerInUse = false ;
127+ this .numInUseStreams = new AtomicInteger ();
128+ this .listenerInUse = new AtomicBoolean ();
129+ this .listenerNotifyLock = new Object ();
130+
124131 pingTracker = new PingTracker (Ticker .systemTicker (), (id ) -> sendPing (id ));
125132
126133 serviceBinding =
@@ -265,7 +272,7 @@ public synchronized ClientStream newStream(
265272 return newFailingClientStream (failure , attributes , headers , tracers );
266273 }
267274
268- updateInUseStreamsIfNeed (inbound .countsForInUse (), 1 );
275+ updateInUseStreamsCountIfNeeded (inbound .countsForInUse (), 1 );
269276 Outbound .ClientOutbound outbound =
270277 new Outbound .ClientOutbound (this , callId , method , headers , statsTraceContext );
271278 if (method .getType ().clientSendsOneMessage ()) {
@@ -277,7 +284,7 @@ public synchronized ClientStream newStream(
277284
278285 @ Override
279286 protected void unregisterInbound (Inbound <?> inbound ) {
280- updateInUseStreamsIfNeed (inbound .countsForInUse (), -1 );
287+ updateInUseStreamsCountIfNeeded (inbound .countsForInUse (), -1 );
281288 super .unregisterInbound (inbound );
282289 }
283290
@@ -307,9 +314,8 @@ void notifyShutdown(Status status) {
307314 @ Override
308315 @ GuardedBy ("this" )
309316 void notifyTerminated () {
310- if (numInUseStreams > 0 ) {
311- numInUseStreams = 0 ;
312- listenerInUse = false ;
317+ if (numInUseStreams .getAndSet (0 ) > 0 ) {
318+ listenerInUse .set (false );
313319 clientTransportListener .transportInUse (false );
314320 }
315321 if (readyTimeoutFuture != null ) {
@@ -395,25 +401,63 @@ private synchronized void handleAuthResult(Throwable t) {
395401 Status .INTERNAL .withDescription ("Could not evaluate SecurityPolicy" ).withCause (t ), true );
396402 }
397403
398- /** Updates in-use-stream count and notifies listener only on transitions between 0 and >0 */
399- private synchronized void updateInUseStreamsIfNeed (boolean countsForInUse , int delta ) {
400- if (!countsForInUse ) {
404+ /**
405+ * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without
406+ * acquiring the transport lock.
407+ */
408+ private void updateInUseStreamsCountIfNeeded (boolean countsForInUse , int delta ) {
409+ Preconditions .checkArgument (delta == -1 || delta == 1 , "stream count delta must be -1 or +1" );
410+ if (!countsForInUse ) {
401411 return ;
402412 }
413+ int prev , next ;
403414
404- numInUseStreams += delta ;
405- if (numInUseStreams < 0 ) {
406- // Defensive: prevent negative due to unexpected double-decrement
407- numInUseStreams = 0 ;
415+ if (delta > 0 ) {
416+ next = numInUseStreams .incrementAndGet ();
417+ prev = next - 1 ;
418+ } else {
419+ prev = numInUseStreams .get ();
420+ int updated ;
421+
422+ while (true ) {
423+ int current = prev ;
424+ int newValue = current > 0 ? current - 1 : 0 ;
425+ if (numInUseStreams .compareAndSet (current , newValue )) {
426+ updated = newValue ;
427+ break ;
428+ }
429+ prev = numInUseStreams .get ();
430+ }
431+ next = updated ;
408432 }
409433
410- boolean nowInUseStream = numInUseStreams > 0 ;
411- if (nowInUseStream != listenerInUse ) {
412- listenerInUse = nowInUseStream ;
413- clientTransportListener .transportInUse (nowInUseStream );
434+ boolean prevInUse = prev > 0 ;
435+ boolean nextInUse = next > 0 ;
436+
437+ if (prevInUse != nextInUse ) {
438+ if (listenerInUse .compareAndSet (prevInUse , nextInUse )) {
439+ scheduleTransportInUseNotification (nextInUse );
440+ }
414441 }
415442 }
416443
444+ private void scheduleTransportInUseNotification (final boolean inUse ) {
445+ getScheduledExecutorService ()
446+ .execute (
447+ new Runnable () {
448+ @ Override
449+ public void run () {
450+ // Provide external synchronization as required by Listener contract,
451+ // without taking the transport lock to avoid potential deadlocks.
452+ synchronized (listenerNotifyLock ) {
453+ if (listenerInUse .get () == inUse ) {
454+ clientTransportListener .transportInUse (inUse );
455+ }
456+ }
457+ }
458+ });
459+ }
460+
417461 @ GuardedBy ("this" )
418462 @ Override
419463 protected void handlePingResponse (Parcel parcel ) {
0 commit comments