Skip to content

Commit d35b81c

Browse files
committed
fix: Connectivity Error Metrics
1 parent 2349908 commit d35b81c

File tree

3 files changed

+83
-7
lines changed

3 files changed

+83
-7
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.spanner;
17+
18+
import io.grpc.ClientStreamTracer;
19+
import io.grpc.Metadata;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
/**
23+
* Captures the event when a request is sent from the gRPC client. Its primary purpose is to measure
24+
* the transition time between asking gRPC to start an RPC and gRPC actually serializing that RPC.
25+
*/
26+
public class SpannerGrpcStreamTracer extends ClientStreamTracer {
27+
28+
private final AtomicBoolean outBoundMessageSent = new AtomicBoolean(false);
29+
30+
public SpannerGrpcStreamTracer() {}
31+
32+
public boolean isOutBoundMessageSent() {
33+
return outBoundMessageSent.get();
34+
}
35+
36+
/** An outbound message has been serialized and sent to the transport. */
37+
@Override
38+
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
39+
outBoundMessageSent.set(true);
40+
}
41+
42+
public static class Factory extends ClientStreamTracer.Factory {
43+
44+
SpannerGrpcStreamTracer spannerGrpcStreamTracer;
45+
46+
public Factory(SpannerGrpcStreamTracer spannerGrpcStreamTracer) {
47+
this.spannerGrpcStreamTracer = spannerGrpcStreamTracer;
48+
}
49+
50+
@Override
51+
public ClientStreamTracer newClientStreamTracer(
52+
ClientStreamTracer.StreamInfo info, Metadata headers) {
53+
return spannerGrpcStreamTracer;
54+
}
55+
}
56+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,9 @@
2828
import com.google.common.cache.Cache;
2929
import com.google.common.cache.CacheBuilder;
3030
import com.google.spanner.admin.database.v1.DatabaseName;
31-
import io.grpc.CallOptions;
32-
import io.grpc.Channel;
33-
import io.grpc.ClientCall;
34-
import io.grpc.ClientInterceptor;
31+
import io.grpc.*;
3532
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
3633
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
37-
import io.grpc.Metadata;
38-
import io.grpc.MethodDescriptor;
3934
import io.grpc.alts.AltsContextUtil;
4035
import io.opencensus.stats.MeasureMap;
4136
import io.opencensus.stats.Stats;
@@ -50,6 +45,7 @@
5045
import java.util.HashMap;
5146
import java.util.Map;
5247
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.atomic.AtomicBoolean;
5349
import java.util.logging.Level;
5450
import java.util.logging.Logger;
5551
import java.util.regex.Matcher;
@@ -100,9 +96,13 @@ class HeaderInterceptor implements ClientInterceptor {
10096
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
10197
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
10298
ApiTracer tracer = callOptions.getOption(TRACER_KEY);
99+
SpannerGrpcStreamTracer streamTracer = new SpannerGrpcStreamTracer();
100+
CallOptions newOptions =
101+
callOptions.withStreamTracerFactory(new SpannerGrpcStreamTracer.Factory(streamTracer));
103102
CompositeTracer compositeTracer =
104103
tracer instanceof CompositeTracer ? (CompositeTracer) tracer : null;
105-
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
104+
final AtomicBoolean headersReceived = new AtomicBoolean(false);
105+
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, newOptions)) {
106106
@Override
107107
public void start(Listener<RespT> responseListener, Metadata headers) {
108108
try {
@@ -124,13 +124,29 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
124124
new SimpleForwardingClientCallListener<RespT>(responseListener) {
125125
@Override
126126
public void onHeaders(Metadata metadata) {
127+
headersReceived.set(true);
127128
// Check if the call uses DirectPath by inspecting the ALTS context.
128129
boolean isDirectPathUsed = AltsContextUtil.check(getAttributes());
129130
addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed);
130131
processHeader(
131132
metadata, tagContext, attributes, span, compositeTracer, isDirectPathUsed);
132133
super.onHeaders(metadata);
133134
}
135+
136+
@Override
137+
public void onClose(Status status, Metadata trailers) {
138+
if (streamTracer.isOutBoundMessageSent() && !headersReceived.get()) {
139+
// RPC was sent, but no response headers were received. This can happen in
140+
// case of a timeout, for example.
141+
if (compositeTracer != null) {
142+
compositeTracer.recordGfeHeaderMissingCount(1L);
143+
if (GapicSpannerRpc.isEnableAFEServerTiming()) {
144+
// compositeTracer.recordAfeHeaderMissingCount(1L);
145+
}
146+
}
147+
}
148+
super.onClose(status, trailers);
149+
}
134150
},
135151
headers);
136152
} catch (ExecutionException executionException) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,10 @@ public void testNoNetworkConnection() {
347347
// Attempt count should have a failed metric point for CreateSession.
348348
assertEquals(
349349
1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
350+
351+
// Connectivity count will not increase as client did not attempt to send the request
352+
assertFalse(
353+
checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME));
350354
}
351355

352356
@Test

0 commit comments

Comments
 (0)