@@ -85,10 +85,10 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
85
85
lastDiscoveryResponses := map [string ]lastDiscoveryResponse {}
86
86
87
87
// a collection of stack allocated watches per request type
88
- watches := newWatches (reqCh )
88
+ watches := newWatches ()
89
89
90
90
defer func () {
91
- watches .Cancel ()
91
+ watches .close ()
92
92
if s .callbacks != nil {
93
93
s .callbacks .OnStreamClosed (streamID )
94
94
}
@@ -124,14 +124,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
124
124
return out .Nonce , str .Send (out )
125
125
}
126
126
127
- open := func (w * watch , req * discovery.DiscoveryRequest , responder chan cache.Response ) {
128
- w .cancel = s .cache .CreateWatch (req , streamState , responder )
129
- watches .cases [w .index ] = reflect.SelectCase {
130
- Dir : reflect .SelectRecv ,
131
- Chan : reflect .ValueOf (responder ),
132
- }
133
- }
134
-
135
127
if s .callbacks != nil {
136
128
if err := s .callbacks .OnStreamOpen (str .Context (), streamID , defaultTypeURL ); err != nil {
137
129
return err
@@ -142,7 +134,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
142
134
var node = & core.Node {}
143
135
144
136
// recompute dynamic channels for this stream
145
- watches .RecomputeWatches (s .ctx , reqCh )
137
+ watches .recompute (s .ctx , reqCh )
146
138
147
139
for {
148
140
// The list of select cases looks like this:
@@ -204,27 +196,36 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq
204
196
// We've found a pre-existing watch, lets check and update if needed.
205
197
// If these requirements aren't satisfied, leave an open watch.
206
198
if w .nonce == "" || w .nonce == nonce {
207
- w .Cancel ()
199
+ w .close ()
208
200
209
- open (w , req , responder )
201
+ watches .addWatch (typeURL , & watch {
202
+ cancel : s .cache .CreateWatch (req , streamState , responder ),
203
+ response : responder ,
204
+ })
210
205
}
211
206
} else {
212
207
// No pre-existing watch exists, let's create one.
213
208
// We need to precompute the watches first then open a watch in the cache.
214
209
watches .responders [typeURL ] = & watch {}
215
210
w = watches .responders [typeURL ]
216
- watches .RecomputeWatches (s .ctx , reqCh )
211
+ watches .recompute (s .ctx , reqCh )
217
212
218
- open (w , req , responder )
213
+ watches .addWatch (typeURL , & watch {
214
+ cancel : s .cache .CreateWatch (req , streamState , responder ),
215
+ response : responder ,
216
+ })
219
217
}
218
+
219
+ // Recompute the dynamic select cases for this stream.
220
+ watches .recompute (s .ctx , reqCh )
220
221
default :
221
222
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
222
223
if ! ok {
223
- return status .Errorf (codes .Unavailable , "resource watch failed" )
224
+ return status .Errorf (codes .Unavailable , "resource watch %d -> failed" , index )
224
225
}
225
226
226
227
res := value .Interface ().(cache.Response )
227
- nonce , err := send (value . Interface ().(cache. Response ) )
228
+ nonce , err := send (res )
228
229
if err != nil {
229
230
return err
230
231
}
0 commit comments