Skip to content

Commit 02e98a8

Browse files
committed
core: Fix shutdown failing accepted RPCs
This fixes a race where RPCs could fail with "UNAVAILABLE: Channel shutdown invoked" even though they were created before channel.shutdown(). This basically adopts the internalStart() logic from DelayedStream, although the stream is a bit different because it has APIs that can be called before start() and doesn't need to handle cancel() without start(). The ManagedChannelImpltest had the number of due tasks increase because start() running earlier creates a DelayedStream. Previously the stream wasn't created until runDueTasks() so the mockPicker had already been installed and it could use a real stream from the beginning. But that's specific to the test; in practice it'd be a delayed stream before and after this change. See #12536
1 parent 6f3d3f6 commit 02e98a8

File tree

3 files changed

+26
-11
lines changed

3 files changed

+26
-11
lines changed

core/src/main/java/io/grpc/internal/DelayedClientCall.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
6464
* order, but also used if an error occurs before {@code realCall} is set.
6565
*/
6666
private Listener<RespT> listener;
67+
// No need to synchronize; start() synchronization provides a happens-before
68+
private Metadata startHeaders;
6769
// Must hold {@code this} lock when setting.
6870
private ClientCall<ReqT, RespT> realCall;
6971
@GuardedBy("this")
@@ -161,13 +163,23 @@ public void run() {
161163
*/
162164
// When this method returns, passThrough is guaranteed to be true
163165
public final Runnable setCall(ClientCall<ReqT, RespT> call) {
166+
Listener<RespT> savedDelayedListener;
164167
synchronized (this) {
165168
// If realCall != null, then either setCall() or cancel() has been called.
166169
if (realCall != null) {
167170
return null;
168171
}
169172
setRealCall(checkNotNull(call, "call"));
173+
// start() not yet called
174+
if (delayedListener == null) {
175+
assert pendingRunnables.isEmpty();
176+
pendingRunnables = null;
177+
passThrough = true;
178+
return null;
179+
}
180+
savedDelayedListener = this.delayedListener;
170181
}
182+
internalStart(savedDelayedListener);
171183
return new ContextRunnable(context) {
172184
@Override
173185
public void runInContext() {
@@ -176,8 +188,15 @@ public void runInContext() {
176188
};
177189
}
178190

191+
private void internalStart(Listener<RespT> listener) {
192+
Metadata savedStartHeaders = this.startHeaders;
193+
this.startHeaders = null;
194+
context.run(() -> realCall.start(listener, savedStartHeaders));
195+
}
196+
179197
@Override
180198
public final void start(Listener<RespT> listener, final Metadata headers) {
199+
checkNotNull(headers, "headers");
181200
checkState(this.listener == null, "already started");
182201
Status savedError;
183202
boolean savedPassThrough;
@@ -188,6 +207,7 @@ public final void start(Listener<RespT> listener, final Metadata headers) {
188207
savedPassThrough = passThrough;
189208
if (!savedPassThrough) {
190209
listener = delayedListener = new DelayedListener<>(listener);
210+
startHeaders = headers;
191211
}
192212
}
193213
if (savedError != null) {
@@ -196,15 +216,7 @@ public final void start(Listener<RespT> listener, final Metadata headers) {
196216
}
197217
if (savedPassThrough) {
198218
realCall.start(listener, headers);
199-
} else {
200-
final Listener<RespT> finalListener = listener;
201-
delayOrExecute(new Runnable() {
202-
@Override
203-
public void run() {
204-
realCall.start(finalListener, headers);
205-
}
206-
});
207-
}
219+
} // else realCall.start() will be called by setCall
208220
}
209221

210222
// When this method returns, passThrough is guaranteed to be true
@@ -253,6 +265,7 @@ public void run() {
253265
if (listenerToClose != null) {
254266
callExecutor.execute(new CloseListenerRunnable(listenerToClose, status));
255267
}
268+
internalStart(listenerToClose); // listener instance doesn't matter
256269
drainPendingCalls();
257270
}
258271
callCancelled();

core/src/test/java/io/grpc/internal/DelayedClientCallTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,12 @@ public void startThenSetCall() {
151151
delayedClientCall.request(1);
152152
Runnable r = delayedClientCall.setCall(mockRealCall);
153153
assertThat(r).isNotNull();
154-
r.run();
155154
@SuppressWarnings("unchecked")
156155
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(Listener.class);
156+
// start() must be called before setCall() returns (not in runnable), to ensure the in-use
157+
// counts keeping the channel alive after shutdown() don't momentarily decrease to zero.
157158
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
159+
r.run();
158160
Listener<Integer> realCallListener = listenerCaptor.getValue();
159161
verify(mockRealCall).request(1);
160162
realCallListener.onMessage(1);

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1343,7 +1343,7 @@ public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata header
13431343
PickResult.withSubchannel(subchannel));
13441344

13451345
updateBalancingStateSafely(helper, READY, mockPicker);
1346-
assertEquals(2, executor.runDueTasks());
1346+
assertEquals(3, executor.runDueTasks());
13471347

13481348
verify(mockPicker).pickSubchannel(any(PickSubchannelArgs.class));
13491349
verify(mockTransport).newStream(

0 commit comments

Comments
 (0)