Skip to content

Commit 584e614

Browse files
committed
xds: implement server feature fail_on_data_errors
1 parent f385add commit 584e614

File tree

10 files changed

+542
-34
lines changed

10 files changed

+542
-34
lines changed

9857d21.diff

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
diff --git a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java
2+
index 4fa75f6b335..fffa14b52da 100644
3+
--- a/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java
4+
+++ b/xds/src/main/java/io/grpc/xds/client/Bootstrapper.java
5+
@@ -65,18 +65,20 @@ public abstract static class ServerInfo {
6+
7+
public abstract boolean resourceTimerIsTransientError();
8+
9+
+ public abstract boolean failOnDataErrors();
10+
+
11+
@VisibleForTesting
12+
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
13+
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
14+
- false, false, false);
15+
+ false, false, false, false);
16+
}
17+
18+
@VisibleForTesting
19+
public static ServerInfo create(
20+
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
21+
- boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
22+
+ boolean isTrustedXdsServer, boolean resourceTimerIsTransientError, boolean failOnDataErrors) {
23+
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
24+
- ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
25+
+ ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError, failOnDataErrors);
26+
}
27+
}
28+
29+
diff --git a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
30+
index 22c794e1129..b44e32bb2d9 100644
31+
--- a/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
32+
+++ b/xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
33+
@@ -58,6 +58,7 @@ public abstract class BootstrapperImpl extends Bootstrapper {
34+
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
35+
private static final String
36+
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
37+
+ private static final String SERVER_FEATURE_FAIL_ON_DATA_ERRORS = "fail_on_data_errors";
38+
39+
@VisibleForTesting
40+
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
41+
@@ -257,6 +258,7 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
42+
43+
boolean resourceTimerIsTransientError = false;
44+
boolean ignoreResourceDeletion = false;
45+
+ boolean failOnDataErrors = false;
46+
// "For forward compatibility reasons, the client will ignore any entry in the list that it
47+
// does not understand, regardless of type."
48+
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
49+
@@ -267,12 +269,14 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
50+
}
51+
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
52+
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
53+
+ failOnDataErrors = xdsDataErrorHandlingEnabled
54+
+ && serverFeatures.contains(SERVER_FEATURE_FAIL_ON_DATA_ERRORS);
55+
}
56+
servers.add(
57+
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
58+
serverFeatures != null
59+
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
60+
- resourceTimerIsTransientError));
61+
+ resourceTimerIsTransientError, failOnDataErrors));
62+
}
63+
return servers.build();
64+
}
65+
diff --git a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
66+
index 2bf1286babc..7b481bc8b21 100644
67+
--- a/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
68+
+++ b/xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
69+
@@ -603,16 +603,18 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
70+
}
71+
72+
if (invalidResources.contains(resourceName)) {
73+
- // The resource update is invalid. Capture the error without notifying the watchers.
74+
+ // The resource update is invalid (NACK). Handle as a data error.
75+
subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
76+
- }
77+
-
78+
- if (invalidResources.contains(resourceName)) {
79+
- // The resource is missing. Reuse the cached resource if possible.
80+
- if (subscriber.data == null) {
81+
- // No cached data. Notify the watchers of an invalid update.
82+
- subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
83+
+
84+
+ // Handle data errors (NACKs) based on fail_on_data_errors server feature.
85+
+ // When fail_on_data_errors is present and we have cached data, delete it so that
86+
+ // onError will call onResourceChanged instead of onAmbientError.
87+
+ if (subscriber.data != null && args.serverInfo.failOnDataErrors()) {
88+
+ subscriber.data = null;
89+
}
90+
+ // Call onError, which will decide whether to call onResourceChanged or onAmbientError
91+
+ // based on whether data exists after the above deletion.
92+
+ subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
93+
continue;
94+
}
95+
96+
@@ -866,11 +868,13 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn
97+
return;
98+
}
99+
100+
- // Ignore deletion of State of the World resources when this feature is on,
101+
- // and the resource is reusable.
102+
+ // Handle data errors (resource deletions) based on fail_on_data_errors server feature.
103+
+ // When fail_on_data_errors is not present, we treat deletions as ambient errors and keep
104+
+ // using the cached resource. When it is present, we delete the cached resource and fail.
105+
boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
106+
- if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
107+
- if (!resourceDeletionIgnored) {
108+
+ boolean failOnDataErrors = serverInfo.failOnDataErrors();
109+
+ if (type.isFullStateOfTheWorld() && data != null && !failOnDataErrors) {
110+
+ if (ignoreResourceDeletionEnabled && !resourceDeletionIgnored) {
111+
logger.log(XdsLogLevel.FORCE_WARNING,
112+
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
113+
serverInfo.target(), type, resource);
114+
diff --git a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java
115+
index 2b7bd53d5ef..982b25f913c 100644
116+
--- a/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java
117+
+++ b/xds/src/test/java/io/grpc/xds/GrpcBootstrapperImplTest.java
118+
@@ -723,6 +723,52 @@ public void serverFeatures_ignoresUnknownValues() throws XdsInitializationExcept
119+
assertThat(serverInfo.isTrustedXdsServer()).isTrue();
120+
}
121+
122+
+ @Test
123+
+ public void serverFeatureFailOnDataErrors() throws XdsInitializationException {
124+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
125+
+ String rawData = "{\n"
126+
+ + " \"xds_servers\": [\n"
127+
+ + " {\n"
128+
+ + " \"server_uri\": \"" + SERVER_URI + "\",\n"
129+
+ + " \"channel_creds\": [\n"
130+
+ + " {\"type\": \"insecure\"}\n"
131+
+ + " ],\n"
132+
+ + " \"server_features\": [\"fail_on_data_errors\"]\n"
133+
+ + " }\n"
134+
+ + " ]\n"
135+
+ + "}";
136+
+
137+
+ bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
138+
+ BootstrapInfo info = bootstrapper.bootstrap();
139+
+ ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
140+
+ assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
141+
+ assertThat(serverInfo.implSpecificConfig()).isInstanceOf(InsecureChannelCredentials.class);
142+
+ assertThat(serverInfo.failOnDataErrors()).isTrue();
143+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
144+
+ }
145+
+
146+
+ @Test
147+
+ public void serverFeatureFailOnDataErrors_requiresEnvVar() throws XdsInitializationException {
148+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
149+
+ String rawData = "{\n"
150+
+ + " \"xds_servers\": [\n"
151+
+ + " {\n"
152+
+ + " \"server_uri\": \"" + SERVER_URI + "\",\n"
153+
+ + " \"channel_creds\": [\n"
154+
+ + " {\"type\": \"insecure\"}\n"
155+
+ + " ],\n"
156+
+ + " \"server_features\": [\"fail_on_data_errors\"]\n"
157+
+ + " }\n"
158+
+ + " ]\n"
159+
+ + "}";
160+
+
161+
+ bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
162+
+ BootstrapInfo info = bootstrapper.bootstrap();
163+
+ ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
164+
+ // Should be false when env var is not enabled
165+
+ assertThat(serverInfo.failOnDataErrors()).isFalse();
166+
+ }
167+
+
168+
@Test
169+
public void notFound() {
170+
bootstrapper.bootstrapPathFromEnvVar = null;
171+
diff --git a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
172+
index 6b9d601b2cf..524c2d7ea24 100644
173+
--- a/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
174+
+++ b/xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
175+
@@ -1448,6 +1448,66 @@ public void ldsResourceDeleted_ignoreResourceDeletion() {
176+
verifyNoMoreInteractions(ldsResourceWatcher);
177+
}
178+
179+
+ /**
180+
+ * When fail_on_data_errors server feature is on, xDS client should delete the cached listener
181+
+ * and fail RPCs when LDS resource is deleted.
182+
+ */
183+
+ @Test
184+
+ public void ldsResourceDeleted_failOnDataErrors() {
185+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
186+
+ xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, false,
187+
+ true, false, true);
188+
+ BootstrapInfo bootstrapInfo =
189+
+ Bootstrapper.BootstrapInfo.builder()
190+
+ .servers(Collections.singletonList(xdsServerInfo))
191+
+ .node(NODE)
192+
+ .authorities(ImmutableMap.of(
193+
+ "",
194+
+ AuthorityInfo.create(
195+
+ "xdstp:///envoy.config.listener.v3.Listener/%s",
196+
+ ImmutableList.of(Bootstrapper.ServerInfo.create(
197+
+ SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
198+
+ .certProviders(ImmutableMap.of())
199+
+ .build();
200+
+ xdsClient = new XdsClientImpl(
201+
+ xdsTransportFactory,
202+
+ bootstrapInfo,
203+
+ fakeClock.getScheduledExecutorService(),
204+
+ backoffPolicyProvider,
205+
+ fakeClock.getStopwatchSupplier(),
206+
+ timeProvider,
207+
+ MessagePrinter.INSTANCE,
208+
+ new TlsContextManagerImpl(bootstrapInfo),
209+
+ xdsClientMetricReporter);
210+
+
211+
+ InOrder inOrder = inOrder(ldsResourceWatcher);
212+
+ DiscoveryRpcCall call = startResourceWatcher(XdsListenerResource.getInstance(), LDS_RESOURCE,
213+
+ ldsResourceWatcher);
214+
+ verifyResourceMetadataRequested(LDS, LDS_RESOURCE);
215+
+
216+
+ // Initial LDS response.
217+
+ call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
218+
+ call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0000", NODE);
219+
+ inOrder.verify(ldsResourceWatcher).onResourceChanged(ldsUpdateCaptor.capture());
220+
+ StatusOr<LdsUpdate> statusOrUpdate = ldsUpdateCaptor.getValue();
221+
+ assertThat(statusOrUpdate.hasValue()).isTrue();
222+
+ verifyGoldenListenerVhosts(statusOrUpdate.getValue());
223+
+ verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerVhosts, VERSION_1, TIME_INCREMENT);
224+
+ verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
225+
+
226+
+ // Empty LDS response deletes the listener and fails RPCs.
227+
+ call.sendResponse(LDS, Collections.<Any>emptyList(), VERSION_2, "0001");
228+
+ call.verifyRequest(LDS, LDS_RESOURCE, VERSION_2, "0001", NODE);
229+
+ inOrder.verify(ldsResourceWatcher).onResourceChanged(ldsUpdateCaptor.capture());
230+
+ StatusOr<LdsUpdate> statusOrUpdate1 = ldsUpdateCaptor.getValue();
231+
+ assertThat(statusOrUpdate1.hasValue()).isFalse();
232+
+ assertThat(statusOrUpdate1.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
233+
+ verifyResourceMetadataDoesNotExist(LDS, LDS_RESOURCE);
234+
+ verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
235+
+
236+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
237+
+ }
238+
+
239+
@Test
240+
@SuppressWarnings("unchecked")
241+
public void multipleLdsWatchers() {
242+
@@ -2972,6 +3032,64 @@ public void cdsResourceDeleted_ignoreResourceDeletion() {
243+
verifyNoMoreInteractions(ldsResourceWatcher);
244+
}
245+
246+
+ /**
247+
+ * When fail_on_data_errors server feature is on, xDS client should delete the cached cluster
248+
+ * and fail RPCs when CDS resource is deleted.
249+
+ */
250+
+ @Test
251+
+ public void cdsResourceDeleted_failOnDataErrors() {
252+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = true;
253+
+ xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, false,
254+
+ true, false, true);
255+
+ BootstrapInfo bootstrapInfo =
256+
+ Bootstrapper.BootstrapInfo.builder()
257+
+ .servers(Collections.singletonList(xdsServerInfo))
258+
+ .node(NODE)
259+
+ .authorities(ImmutableMap.of(
260+
+ "",
261+
+ AuthorityInfo.create(
262+
+ "xdstp:///envoy.config.listener.v3.Listener/%s",
263+
+ ImmutableList.of(Bootstrapper.ServerInfo.create(
264+
+ SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS)))))
265+
+ .certProviders(ImmutableMap.of())
266+
+ .build();
267+
+ xdsClient = new XdsClientImpl(
268+
+ xdsTransportFactory,
269+
+ bootstrapInfo,
270+
+ fakeClock.getScheduledExecutorService(),
271+
+ backoffPolicyProvider,
272+
+ fakeClock.getStopwatchSupplier(),
273+
+ timeProvider,
274+
+ MessagePrinter.INSTANCE,
275+
+ new TlsContextManagerImpl(bootstrapInfo),
276+
+ xdsClientMetricReporter);
277+
+
278+
+ DiscoveryRpcCall call = startResourceWatcher(XdsClusterResource.getInstance(), CDS_RESOURCE,
279+
+ cdsResourceWatcher);
280+
+ verifyResourceMetadataRequested(CDS, CDS_RESOURCE);
281+
+
282+
+ // Initial CDS response.
283+
+ call.sendResponse(CDS, testClusterRoundRobin, VERSION_1, "0000");
284+
+ call.verifyRequest(CDS, CDS_RESOURCE, VERSION_1, "0000", NODE);
285+
+ verify(cdsResourceWatcher).onResourceChanged(cdsUpdateCaptor.capture());
286+
+ StatusOr<CdsUpdate> statusOrUpdate = cdsUpdateCaptor.getValue();
287+
+ assertThat(statusOrUpdate.hasValue()).isTrue();
288+
+ verifyGoldenClusterRoundRobin(statusOrUpdate.getValue());
289+
+ verifyResourceMetadataAcked(CDS, CDS_RESOURCE, testClusterRoundRobin, VERSION_1,
290+
+ TIME_INCREMENT);
291+
+ verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
292+
+
293+
+ // Empty CDS response deletes the cluster and fails RPCs.
294+
+ call.sendResponse(CDS, Collections.<Any>emptyList(), VERSION_2, "0001");
295+
+ call.verifyRequest(CDS, CDS_RESOURCE, VERSION_2, "0001", NODE);
296+
+ verify(cdsResourceWatcher).onResourceChanged(argThat(
297+
+ arg -> !arg.hasValue() && arg.getStatus().getDescription().contains(CDS_RESOURCE)));
298+
+ verifyResourceMetadataDoesNotExist(CDS, CDS_RESOURCE);
299+
+ verifySubscribedResourcesMetadataSizes(0, 1, 0, 0);
300+
+
301+
+ BootstrapperImpl.xdsDataErrorHandlingEnabled = false;
302+
+ }
303+
+
304+
@Test
305+
@SuppressWarnings("unchecked")
306+
public void multipleCdsWatchers() {

xds/src/main/java/io/grpc/xds/client/Bootstrapper.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,22 @@ public abstract static class ServerInfo {
6565

6666
public abstract boolean resourceTimerIsTransientError();
6767

68+
public abstract boolean failOnDataErrors();
69+
6870
@VisibleForTesting
6971
public static ServerInfo create(String target, @Nullable Object implSpecificConfig) {
7072
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
71-
false, false, false);
73+
false, false, false, false);
7274
}
7375

7476
@VisibleForTesting
7577
public static ServerInfo create(
76-
String target, Object implSpecificConfig, boolean ignoreResourceDeletion,
77-
boolean isTrustedXdsServer, boolean resourceTimerIsTransientError) {
78+
String target, Object implSpecificConfig,
79+
boolean ignoreResourceDeletion, boolean isTrustedXdsServer,
80+
boolean resourceTimerIsTransientError, boolean failOnDataErrors) {
7881
return new AutoValue_Bootstrapper_ServerInfo(target, implSpecificConfig,
79-
ignoreResourceDeletion, isTrustedXdsServer, resourceTimerIsTransientError);
82+
ignoreResourceDeletion, isTrustedXdsServer,
83+
resourceTimerIsTransientError, failOnDataErrors);
8084
}
8185
}
8286

xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public abstract class BootstrapperImpl extends Bootstrapper {
5858
private static final String SERVER_FEATURE_TRUSTED_XDS_SERVER = "trusted_xds_server";
5959
private static final String
6060
SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR = "resource_timer_is_transient_error";
61+
private static final String SERVER_FEATURE_FAIL_ON_DATA_ERRORS = "fail_on_data_errors";
6162

6263
@VisibleForTesting
6364
static boolean enableXdsFallback = GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_FALLBACK, true);
@@ -257,6 +258,7 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
257258

258259
boolean resourceTimerIsTransientError = false;
259260
boolean ignoreResourceDeletion = false;
261+
boolean failOnDataErrors = false;
260262
// "For forward compatibility reasons, the client will ignore any entry in the list that it
261263
// does not understand, regardless of type."
262264
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
@@ -267,12 +269,14 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
267269
}
268270
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
269271
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
272+
failOnDataErrors = xdsDataErrorHandlingEnabled
273+
&& serverFeatures.contains(SERVER_FEATURE_FAIL_ON_DATA_ERRORS);
270274
}
271275
servers.add(
272276
ServerInfo.create(serverUri, implSpecificConfig, ignoreResourceDeletion,
273277
serverFeatures != null
274278
&& serverFeatures.contains(SERVER_FEATURE_TRUSTED_XDS_SERVER),
275-
resourceTimerIsTransientError));
279+
resourceTimerIsTransientError, failOnDataErrors));
276280
}
277281
return servers.build();
278282
}

0 commit comments

Comments
 (0)