@@ -18,10 +18,10 @@ package sotw
18
18
import (
19
19
"context"
20
20
"errors"
21
- "reflect"
22
21
"strconv"
23
22
"sync/atomic"
24
23
24
+ "golang.org/x/sync/errgroup"
25
25
"google.golang.org/grpc/codes"
26
26
"google.golang.org/grpc/status"
27
27
@@ -63,15 +63,6 @@ type server struct {
63
63
streamCount int64
64
64
}
65
65
66
- // Discovery response that is sent over GRPC stream
67
- // We need to record what resource names are already sent to a client
68
- // So if the client requests a new name we can respond back
69
- // regardless current snapshot version (even if it is not changed yet)
70
- type lastDiscoveryResponse struct {
71
- nonce string
72
- resources map [string ]struct {}
73
- }
74
-
75
66
// process handles a bi-di stream request
76
67
func (s * server ) process (str stream.Stream , reqCh <- chan * discovery.DiscoveryRequest , defaultTypeURL string ) error {
77
68
// increment stream count
@@ -81,14 +72,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
81
72
// ignores stale nonces. nonce is only modified within send() function.
82
73
var streamNonce int64
83
74
84
- streamState := stream .NewStreamState (false , map [string ]string {})
85
- lastDiscoveryResponses := map [string ]lastDiscoveryResponse {}
75
+ streamState := stream .NewSTOWStreamState ()
86
76
87
77
// a collection of stack allocated watches per request type
88
78
watches := newWatches ()
89
79
90
80
defer func () {
91
- watches .close ()
92
81
if s .callbacks != nil {
93
82
s .callbacks .OnStreamClosed (streamID )
94
83
}
@@ -109,14 +98,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
109
98
streamNonce = streamNonce + 1
110
99
out .Nonce = strconv .FormatInt (streamNonce , 10 )
111
100
112
- lastResponse := lastDiscoveryResponse {
113
- nonce : out .Nonce ,
114
- resources : make (map [string ]struct {}),
115
- }
116
- for _ , r := range resp .GetRequest ().ResourceNames {
117
- lastResponse .resources [r ] = struct {}{}
118
- }
119
- lastDiscoveryResponses [resp .GetRequest ().TypeUrl ] = lastResponse
101
+ lastResponse := stream .NewLastDiscoveryResponse (out .Nonce , resp .GetRequest ().ResourceNames )
102
+ streamState .Set (resp .GetRequest ().TypeUrl , lastResponse )
120
103
121
104
if s .callbacks != nil {
122
105
s .callbacks .OnStreamResponse (resp .GetContext (), streamID , resp .GetRequest (), out )
@@ -133,103 +116,100 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
133
116
// node may only be set on the first discovery request
134
117
var node = & core.Node {}
135
118
136
- // recompute dynamic channels for this stream
137
- watches .recompute (s .ctx , reqCh )
138
-
139
- for {
140
- // The list of select cases looks like this:
141
- // 0: <- ctx.Done
142
- // 1: <- reqCh
143
- // 2...: per type watches
144
- index , value , ok := reflect .Select (watches .cases )
145
- switch index {
146
- // ctx.Done() -> if we receive a value here we return as no further computation is needed
147
- case 0 :
148
- return nil
149
- // Case 1 handles any request inbound on the stream and handles all initialization as needed
150
- case 1 :
151
- // input stream ended or errored out
152
- if ! ok {
153
- return nil
154
- }
119
+ var resCh = make (chan cache.Response , 1 )
155
120
156
- req := value .Interface ().(* discovery.DiscoveryRequest )
157
- if req == nil {
158
- return status .Errorf (codes .Unavailable , "empty request" )
159
- }
121
+ ctx , cancel := context .WithCancel (s .ctx )
122
+ eg , ctx := errgroup .WithContext (ctx )
160
123
161
- // node field in discovery request is delta-compressed
162
- if req .Node != nil {
163
- node = req .Node
164
- } else {
165
- req .Node = node
166
- }
167
-
168
- // nonces can be reused across streams; we verify nonce only if nonce is not initialized
169
- nonce := req .GetResponseNonce ()
124
+ eg .Go (func () error {
125
+ defer func () {
126
+ watches .close () // this should remove all watches from the cache
127
+ close (resCh ) // close resCh and let the second eg.Go drain it
128
+ }()
170
129
171
- // type URL is required for ADS but is implicit for xDS
172
- if defaultTypeURL == resource .AnyType {
173
- if req .TypeUrl == "" {
174
- return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
130
+ for {
131
+ select {
132
+ case <- ctx .Done ():
133
+ return nil
134
+ case req , more := <- reqCh :
135
+ if ! more {
136
+ return nil
175
137
}
176
- } else if req .TypeUrl == "" {
177
- req .TypeUrl = defaultTypeURL
178
- }
138
+ if req == nil {
139
+ return status .Errorf (codes .Unavailable , "empty request" )
140
+ }
141
+ // node field in discovery request is delta-compressed
142
+ if req .Node != nil {
143
+ node = req .Node
144
+ } else {
145
+ req .Node = node
146
+ }
147
+
148
+ // nonces can be reused across streams; we verify nonce only if nonce is not initialized
149
+ nonce := req .GetResponseNonce ()
179
150
180
- if s .callbacks != nil {
181
- if err := s .callbacks .OnStreamRequest (streamID , req ); err != nil {
182
- return err
151
+ // type URL is required for ADS but is implicit for xDS
152
+ if defaultTypeURL == resource .AnyType {
153
+ if req .TypeUrl == "" {
154
+ return status .Errorf (codes .InvalidArgument , "type URL is required for ADS" )
155
+ }
156
+ } else if req .TypeUrl == "" {
157
+ req .TypeUrl = defaultTypeURL
183
158
}
184
- }
185
159
186
- if lastResponse , ok := lastDiscoveryResponses [ req . TypeUrl ]; ok {
187
- if lastResponse . nonce == "" || lastResponse . nonce == nonce {
188
- // Let's record Resource names that a client has received.
189
- streamState . SetKnownResourceNames ( req . TypeUrl , lastResponse . resources )
160
+ if s . callbacks != nil {
161
+ if err := s . callbacks . OnStreamRequest ( streamID , req ); err != nil {
162
+ return err
163
+ }
190
164
}
191
- }
192
165
193
- typeURL := req .GetTypeUrl ()
194
- responder := make (chan cache.Response , 1 )
195
- if w , ok := watches .responders [typeURL ]; ok {
196
- // We've found a pre-existing watch, lets check and update if needed.
197
- // If these requirements aren't satisfied, leave an open watch.
198
- if w .nonce == "" || w .nonce == nonce {
199
- w .close ()
166
+ if lastResponse , ok := streamState .Get (req .TypeUrl ); ok {
167
+ if lastResponse .Nonce == "" || lastResponse .Nonce == nonce {
168
+ // Let's record Resource names that a client has received.
169
+ streamState .SetKnownResourceNames (req .TypeUrl , lastResponse .Resources )
170
+ }
171
+ }
200
172
173
+ typeURL := req .GetTypeUrl ()
174
+ if w := watches .getWatch (typeURL ); w != nil {
175
+ // We've found a pre-existing watch, lets check and update if needed.
176
+ // If these requirements aren't satisfied, leave an open watch.
177
+ if n := w .getNonce (); n == "" || n == nonce {
178
+ w .close ()
179
+
180
+ watches .addWatch (typeURL , & watch {
181
+ cancel : s .cache .CreateWatch (req , streamState .StreamState , resCh ),
182
+ })
183
+ }
184
+ } else {
185
+ // No pre-existing watch exists, let's create one.
186
+ // We need to precompute the watches first then open a watch in the cache.
201
187
watches .addWatch (typeURL , & watch {
202
- cancel : s .cache .CreateWatch (req , streamState , responder ),
203
- response : responder ,
188
+ cancel : s .cache .CreateWatch (req , streamState .StreamState , resCh ),
204
189
})
205
190
}
206
- } else {
207
- // No pre-existing watch exists, let's create one.
208
- // We need to precompute the watches first then open a watch in the cache.
209
- watches .addWatch (typeURL , & watch {
210
- cancel : s .cache .CreateWatch (req , streamState , responder ),
211
- response : responder ,
212
- })
213
191
}
192
+ }
193
+ })
214
194
215
- // Recompute the dynamic select cases for this stream.
216
- watches .recompute (s .ctx , reqCh )
217
- default :
218
- // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
219
- if ! ok {
220
- // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
221
- return status .Errorf (codes .Unavailable , "resource watch %d -> failed" , index )
195
+ eg .Go (func () (err error ) {
196
+ var nonce string
197
+ for res := range resCh {
198
+ if res == nil || err != nil {
199
+ continue // this loop should not exit until resCh closed
222
200
}
223
-
224
- res := value .Interface ().(cache.Response )
225
- nonce , err := send (res )
226
- if err != nil {
227
- return err
201
+ if nonce , err = send (res ); err == nil {
202
+ if w := watches .getWatch (res .GetRequest ().TypeUrl ); w != nil {
203
+ w .setNonce (nonce )
204
+ }
205
+ } else {
206
+ cancel ()
228
207
}
229
-
230
- watches .responders [res .GetRequest ().TypeUrl ].nonce = nonce
231
208
}
232
- }
209
+ return err
210
+ })
211
+
212
+ return eg .Wait ()
233
213
}
234
214
235
215
// StreamHandler converts a blocking read call to channels and initiates stream processing
0 commit comments