@@ -18,10 +18,10 @@ package sotw
1818import (
1919 "context"
2020 "errors"
21- "reflect"
2221 "strconv"
2322 "sync/atomic"
2423
24+ "golang.org/x/sync/errgroup"
2525 "google.golang.org/grpc/codes"
2626 "google.golang.org/grpc/status"
2727
@@ -63,15 +63,6 @@ type server struct {
6363 streamCount int64
6464}
6565
66- // Discovery response that is sent over GRPC stream
67- // We need to record what resource names are already sent to a client
68- // So if the client requests a new name we can respond back
69- // regardless current snapshot version (even if it is not changed yet)
70- type lastDiscoveryResponse struct {
71- nonce string
72- resources map [string ]struct {}
73- }
74-
7566// process handles a bi-di stream request
7667func (s * server ) process (str stream.Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
7768 // increment stream count
@@ -81,14 +72,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
8172 // ignores stale nonces. nonce is only modified within send() function.
8273 var streamNonce int64
8374
84- streamState := stream .NewStreamState (false , map [string ]string {})
85- lastDiscoveryResponses := map [string ]lastDiscoveryResponse {}
75+ streamState := stream .NewSTOWStreamState ()
8676
8777 // a collection of stack allocated watches per request type
8878 watches := newWatches ()
8979
9080 defer func () {
91- watches .close ()
9281 if s .callbacks != nil {
9382 s .callbacks .OnStreamClosed (streamID )
9483 }
@@ -109,14 +98,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
10998 streamNonce = streamNonce + 1
11099 out .Nonce = strconv .FormatInt (streamNonce , 10 )
111100
112- lastResponse := lastDiscoveryResponse {
113- nonce : out .Nonce ,
114- resources : make (map [string ]struct {}),
115- }
116- for _ , r := range resp .GetRequest ().ResourceNames {
117- lastResponse .resources [r ] = struct {}{}
118- }
119- lastDiscoveryResponses [resp .GetRequest ().TypeUrl ] = lastResponse
101+ lastResponse := stream .NewLastDiscoveryResponse (out .Nonce , resp .GetRequest ().ResourceNames )
102+ streamState .Set (resp .GetRequest ().TypeUrl , lastResponse )
120103
121104 if s .callbacks != nil {
122105 s .callbacks .OnStreamResponse (resp .GetContext (), streamID , resp .GetRequest (), out )
@@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
133116 // node may only be set on the first discovery request
134117 var node = & core.Node {}
135118
136- // recompute dynamic channels for this stream
137- watches .recompute (s .ctx , reqCh )
138-
139- for {
140- // The list of select cases looks like this:
141- // 0: <- ctx.Done
142- // 1: <- reqCh
143- // 2...: per type watches
144- index , value , ok := reflect .Select (watches .cases )
145- switch index {
146- // ctx.Done() -> if we receive a value here we return as no further computation is needed
147- case 0 :
148- return nil
149- // Case 1 handles any request inbound on the stream and handles all initialization as needed
150- case 1 :
151- // input stream ended or errored out
152- if ! ok {
153- return nil
154- }
119+ var resCh = make (chan cache.Response , 1 )
155120
156- req := value .Interface ().(* discovery.DiscoveryRequest )
157- if req == nil {
158- return status .Errorf (codes .Unavailable , "empty request" )
159- }
121+ ctx , cancel := context .WithCancel (s .ctx )
122+ eg , ctx := errgroup .WithContext (ctx )
160123
161- // node field in discovery request is delta-compressed
162- if req .Node != nil {
163- node = req .Node
164- } else {
165- req .Node = node
166- }
167-
168- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
169- nonce := req .GetResponseNonce ()
124+ eg .Go (func () error {
125+ defer func () {
126+ watches .close () // this should remove all watches from the cache
127+ close (resCh ) // close resCh and let the second eg.Go drain it
128+ }()
170129
171- // type URL is required for ADS but is implicit for xDS
172- if defaultTypeURL == resource .AnyType {
173- if req .TypeUrl == "" {
174- return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
130+ for {
131+ select {
132+ case <- ctx .Done ():
133+ return nil
134+ case req , more := <- reqCh :
135+ if ! more {
136+ return nil
175137 }
176- } else if req .TypeUrl == "" {
177- req .TypeUrl = defaultTypeURL
178- }
138+ if req == nil {
139+ return status .Errorf (codes .Unavailable , "empty request" )
140+ }
141+ // node field in discovery request is delta-compressed
142+ if req .Node != nil {
143+ node = req .Node
144+ } else {
145+ req .Node = node
146+ }
147+
148+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
149+ nonce := req .GetResponseNonce ()
179150
180- if s .callbacks != nil {
181- if err := s .callbacks .OnStreamRequest (streamID , req ); err != nil {
182- return err
151+ // type URL is required for ADS but is implicit for xDS
152+ if defaultTypeURL == resource .AnyType {
153+ if req .TypeUrl == "" {
154+ return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
155+ }
156+ } else if req .TypeUrl == "" {
157+ req .TypeUrl = defaultTypeURL
183158 }
184- }
185159
186- if lastResponse , ok := lastDiscoveryResponses [ req . TypeUrl ]; ok {
187- if lastResponse . nonce == "" || lastResponse . nonce == nonce {
188- // Let's record Resource names that a client has received.
189- streamState . SetKnownResourceNames ( req . TypeUrl , lastResponse . resources )
160+ if s . callbacks != nil {
161+ if err := s . callbacks . OnStreamRequest ( streamID , req ); err != nil {
162+ return err
163+ }
190164 }
191- }
192165
193- typeURL := req .GetTypeUrl ()
194- responder := make (chan cache.Response , 1 )
195- if w , ok := watches .responders [typeURL ]; ok {
196- // We've found a pre-existing watch, lets check and update if needed.
197- // If these requirements aren't satisfied, leave an open watch.
198- if w .nonce == "" || w .nonce == nonce {
199- w .close ()
166+ if lastResponse , ok := streamState .Get (req .TypeUrl ); ok {
167+ if lastResponse .Nonce == "" || lastResponse .Nonce == nonce {
168+ // Let's record Resource names that a client has received.
169+ streamState .SetKnownResourceNames (req .TypeUrl , lastResponse .Resources )
170+ }
171+ }
200172
173+ typeURL := req .GetTypeUrl ()
174+ if w := watches .getWatch (typeURL ); w != nil {
175+ // We've found a pre-existing watch, lets check and update if needed.
176+ // If these requirements aren't satisfied, leave an open watch.
177+ if n := w .getNonce (); n == "" || n == nonce {
178+ w .close ()
179+
180+ watches .addWatch (typeURL , & watch {
181+ cancel : s .cache .CreateWatch (req , streamState .StreamState , resCh ),
182+ })
183+ }
184+ } else {
185+ // No pre-existing watch exists, let's create one.
186+ // We need to precompute the watches first then open a watch in the cache.
201187 watches .addWatch (typeURL , & watch {
202- cancel : s .cache .CreateWatch (req , streamState , responder ),
203- response : responder ,
188+ cancel : s .cache .CreateWatch (req , streamState .StreamState , resCh ),
204189 })
205190 }
206- } else {
207- // No pre-existing watch exists, let's create one.
208- // We need to precompute the watches first then open a watch in the cache.
209- watches .addWatch (typeURL , & watch {
210- cancel : s .cache .CreateWatch (req , streamState , responder ),
211- response : responder ,
212- })
213191 }
192+ }
193+ })
214194
215- // Recompute the dynamic select cases for this stream.
216- watches .recompute (s .ctx , reqCh )
217- default :
218- // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
219- if ! ok {
220- // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
221- return status .Errorf (codes .Unavailable , "resource watch %d -> failed" , index )
195+ eg .Go (func () (err error ) {
196+ var nonce string
197+ for res := range resCh {
198+ if res == nil || err != nil {
199+ continue // this loop should not exit until resCh closed
222200 }
223-
224- res := value .Interface ().(cache.Response )
225- nonce , err := send (res )
226- if err != nil {
227- return err
201+ if nonce , err = send (res ); err == nil {
202+ if w := watches .getWatch (res .GetRequest ().TypeUrl ); w != nil {
203+ w .setNonce (nonce )
204+ }
205+ } else {
206+ cancel ()
228207 }
229-
230- watches .responders [res .GetRequest ().TypeUrl ].nonce = nonce
231208 }
232- }
209+ return err
210+ })
211+
212+ return eg .Wait ()
233213}
234214
235215// StreamHandler converts a blocking read call to channels and initiates stream processing
0 commit comments