10
10
// and is provided here subject to the following:
11
11
// Copyright Project Contour Authors
12
12
// SPDX-License-Identifier: Apache-2.0
13
-
14
13
package cache
15
14
16
15
import (
@@ -20,12 +19,14 @@ import (
20
19
"math"
21
20
"strconv"
22
21
"sync"
22
+ "time"
23
23
24
24
corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25
25
discoveryv3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
26
26
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
27
27
envoylog "github.com/envoyproxy/go-control-plane/pkg/log"
28
28
serverv3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
29
+ "k8s.io/apimachinery/pkg/util/wait"
29
30
30
31
"github.com/apoxy-dev/apoxy-cli/pkg/gateway/xds/types"
31
32
"github.com/apoxy-dev/apoxy-cli/pkg/log"
@@ -52,13 +53,21 @@ type snapshotMap map[string]*cachev3.Snapshot
52
53
53
54
type nodeInfoMap map [int64 ]* corev3.Node
54
55
56
+ type nodeBackoff struct {
57
+ backoff wait.Backoff
58
+ lastAttempt time.Time
59
+ }
60
+
61
+ type backoffMap map [string ]* nodeBackoff
62
+
55
63
type snapshotCache struct {
56
64
cachev3.SnapshotCache
57
65
snapshotVersion int64
58
66
lastSnapshot snapshotMap
59
67
60
68
mu sync.Mutex
61
69
streamIDNodeInfo nodeInfoMap
70
+ nodeBackoffs backoffMap
62
71
}
63
72
64
73
// GenerateNewSnapshot takes a table of resources (the output from the IR->xDS
@@ -128,6 +137,7 @@ func NewSnapshotCache(ads bool, logger *slog.Logger) SnapshotCacheWithCallbacks
128
137
SnapshotCache : cachev3 .NewSnapshotCache (ads , & Hash , l ),
129
138
lastSnapshot : make (snapshotMap ),
130
139
streamIDNodeInfo : make (nodeInfoMap ),
140
+ nodeBackoffs : make (backoffMap ),
131
141
}
132
142
}
133
143
@@ -215,9 +225,36 @@ func (s *snapshotCache) OnStreamRequest(streamID int64, req *discoveryv3.Discove
215
225
216
226
if status := req .ErrorDetail ; status != nil {
217
227
// if Envoy rejected the last update log the details here.
218
- // TODO(youngnick): Handle NACK properly
219
228
errorCode = status .Code
220
229
errorMessage = status .Message
230
+ log .Warnf ("NACK received: code %d, message %s" , errorCode , errorMessage )
231
+
232
+ if s .nodeBackoffs [nodeID ] == nil {
233
+ s .nodeBackoffs [nodeID ] = & nodeBackoff {
234
+ backoff : wait.Backoff {
235
+ Duration : time .Second ,
236
+ Factor : 2 ,
237
+ Steps : 5 ,
238
+ Cap : 30 * time .Second ,
239
+ },
240
+ }
241
+ }
242
+
243
+ // Check if enough time has passed since last backoff and reset the backoff if so.
244
+ if time .Since (s .nodeBackoffs [nodeID ].lastAttempt ) > s .nodeBackoffs [nodeID ].backoff .Cap {
245
+ s .nodeBackoffs [nodeID ].backoff = wait.Backoff {
246
+ Duration : time .Second ,
247
+ Factor : 2 ,
248
+ Steps : 5 ,
249
+ Cap : 30 * time .Second ,
250
+ }
251
+ }
252
+
253
+ // Backoff for a bit before retrying.
254
+ s .nodeBackoffs [nodeID ].lastAttempt = time .Now ()
255
+ delay := s .nodeBackoffs [nodeID ].backoff .Step ()
256
+ log .Warnf ("Backing off for retry after NACK for node %s" , nodeID )
257
+ time .Sleep (delay )
221
258
}
222
259
223
260
log .Debugf ("handling v3 xDS resource request, version_info %s, response_nonce %s, nodeID %s, node_version %s, resource_names %v, type_url %s, errorCode %d, errorMessage %s" ,
@@ -312,9 +349,36 @@ func (s *snapshotCache) OnStreamDeltaRequest(streamID int64, req *discoveryv3.De
312
349
req .ResponseNonce , nodeID , nodeVersion )
313
350
if status := req .ErrorDetail ; status != nil {
314
351
// if Envoy rejected the last update log the details here.
315
- // TODO(youngnick): Handle NACK properly
316
352
errorCode = status .Code
317
353
errorMessage = status .Message
354
+ log .Warnf ("NACK received: code %d, message %s" , errorCode , errorMessage )
355
+
356
+ if s .nodeBackoffs [nodeID ] == nil {
357
+ s .nodeBackoffs [nodeID ] = & nodeBackoff {
358
+ backoff : wait.Backoff {
359
+ Duration : time .Second ,
360
+ Factor : 2 ,
361
+ Steps : 5 ,
362
+ Cap : 30 * time .Second ,
363
+ },
364
+ }
365
+ }
366
+
367
+ // Check if enough time has passed since last backoff and reset the backoff if so.
368
+ if time .Since (s .nodeBackoffs [nodeID ].lastAttempt ) > s .nodeBackoffs [nodeID ].backoff .Cap {
369
+ s .nodeBackoffs [nodeID ].backoff = wait.Backoff {
370
+ Duration : time .Second ,
371
+ Factor : 2 ,
372
+ Steps : 5 ,
373
+ Cap : 30 * time .Second ,
374
+ }
375
+ }
376
+
377
+ // Backoff for a bit before retrying.
378
+ s .nodeBackoffs [nodeID ].lastAttempt = time .Now ()
379
+ delay := s .nodeBackoffs [nodeID ].backoff .Step ()
380
+ log .Warnf ("Backing off for retry after NACK for node %s" , nodeID )
381
+ time .Sleep (delay )
318
382
}
319
383
log .Debugf ("handling v3 xDS resource request, response_nonce %s, nodeID %s, node_version %s, resource_names_subscribe %v, resource_names_unsubscribe %v, type_url %s, errorCode %d, errorMessage %s" ,
320
384
req .ResponseNonce ,
0 commit comments