@@ -23,20 +23,31 @@ package signaling
23
23
24
24
import (
25
25
"context"
26
+ "encoding/json"
26
27
"fmt"
27
28
"log"
28
29
"net"
29
30
"strings"
30
31
"sync"
32
+ "sync/atomic"
33
+ "time"
31
34
32
35
"github.com/dlintw/goconf"
36
+ clientv3 "go.etcd.io/etcd/client/v3"
33
37
"google.golang.org/grpc"
34
38
codes "google.golang.org/grpc/codes"
35
39
"google.golang.org/grpc/credentials"
36
40
"google.golang.org/grpc/credentials/insecure"
37
41
status "google.golang.org/grpc/status"
38
42
)
39
43
44
+ const (
45
+ GrpcTargetTypeStatic = "static"
46
+ GrpcTargetTypeEtcd = "etcd"
47
+
48
+ DefaultGrpcTargetType = GrpcTargetTypeStatic
49
+ )
50
+
40
51
func init () {
41
52
RegisterGrpcClientStats ()
42
53
}
@@ -140,20 +151,32 @@ type GrpcClients struct {
140
151
141
152
clientsMap map [string ]* GrpcClient
142
153
clients []* GrpcClient
154
+
155
+ etcdClient * EtcdClient
156
+ targetPrefix string
157
+ targetSelf string
158
+ targetInformation map [string ]* GrpcTargetInformationEtcd
159
+ dialOptions atomic.Value // []grpc.DialOption
160
+
161
+ initializedCtx context.Context
162
+ initializedFunc context.CancelFunc
163
+ wakeupChanForTesting chan bool
143
164
}
144
165
145
- func NewGrpcClients (config * goconf.ConfigFile ) (* GrpcClients , error ) {
146
- result := & GrpcClients {}
166
+ func NewGrpcClients (config * goconf.ConfigFile , etcdClient * EtcdClient ) (* GrpcClients , error ) {
167
+ initializedCtx , initializedFunc := context .WithCancel (context .Background ())
168
+ result := & GrpcClients {
169
+ etcdClient : etcdClient ,
170
+ initializedCtx : initializedCtx ,
171
+ initializedFunc : initializedFunc ,
172
+ }
147
173
if err := result .load (config ); err != nil {
148
174
return nil , err
149
175
}
150
176
return result , nil
151
177
}
152
178
153
179
func (c * GrpcClients ) load (config * goconf.ConfigFile ) error {
154
- c .mu .Lock ()
155
- defer c .mu .Unlock ()
156
-
157
180
var opts []grpc.DialOption
158
181
caFile , _ := config .GetString ("grpc" , "ca" )
159
182
if caFile != "" {
@@ -168,6 +191,25 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error {
168
191
opts = append (opts , grpc .WithTransportCredentials (insecure .NewCredentials ()))
169
192
}
170
193
194
+ targetType , _ := config .GetString ("grpc" , "targettype" )
195
+ if targetType == "" {
196
+ targetType = DefaultGrpcTargetType
197
+ }
198
+
199
+ switch targetType {
200
+ case GrpcTargetTypeStatic :
201
+ return c .loadTargetsStatic (config , opts ... )
202
+ case GrpcTargetTypeEtcd :
203
+ return c .loadTargetsEtcd (config , opts ... )
204
+ default :
205
+ return fmt .Errorf ("unknown GRPC target type: %s" , targetType )
206
+ }
207
+ }
208
+
209
+ func (c * GrpcClients ) loadTargetsStatic (config * goconf.ConfigFile , opts ... grpc.DialOption ) error {
210
+ c .mu .Lock ()
211
+ defer c .mu .Unlock ()
212
+
171
213
clientsMap := make (map [string ]* GrpcClient )
172
214
var clients []* GrpcClient
173
215
removeTargets := make (map [string ]bool , len (c .clientsMap ))
@@ -216,10 +258,185 @@ func (c *GrpcClients) load(config *goconf.ConfigFile) error {
216
258
217
259
c .clients = clients
218
260
c .clientsMap = clientsMap
261
+ c .initializedFunc ()
219
262
statsGrpcClients .Set (float64 (len (clients )))
220
263
return nil
221
264
}
222
265
266
+ func (c * GrpcClients ) loadTargetsEtcd (config * goconf.ConfigFile , opts ... grpc.DialOption ) error {
267
+ if ! c .etcdClient .IsConfigured () {
268
+ return fmt .Errorf ("No etcd endpoints configured" )
269
+ }
270
+
271
+ targetPrefix , _ := config .GetString ("grpc" , "targetprefix" )
272
+ if targetPrefix == "" {
273
+ return fmt .Errorf ("No GRPC target prefix configured" )
274
+ }
275
+ c .targetPrefix = targetPrefix
276
+ if c .targetInformation == nil {
277
+ c .targetInformation = make (map [string ]* GrpcTargetInformationEtcd )
278
+ }
279
+
280
+ targetSelf , _ := config .GetString ("grpc" , "targetself" )
281
+ c .targetSelf = targetSelf
282
+
283
+ if opts == nil {
284
+ opts = make ([]grpc.DialOption , 0 )
285
+ }
286
+ c .dialOptions .Store (opts )
287
+
288
+ c .etcdClient .AddListener (c )
289
+ return nil
290
+ }
291
+
292
+ func (c * GrpcClients ) EtcdClientCreated (client * EtcdClient ) {
293
+ go func () {
294
+ if err := client .Watch (context .Background (), c .targetPrefix , c , clientv3 .WithPrefix ()); err != nil {
295
+ log .Printf ("Error processing watch for %s: %s" , c .targetPrefix , err )
296
+ }
297
+ }()
298
+
299
+ go func () {
300
+ client .WaitForConnection ()
301
+
302
+ waitDelay := initialWaitDelay
303
+ for {
304
+ response , err := c .getGrpcTargets (client , c .targetPrefix )
305
+ if err != nil {
306
+ if err == context .DeadlineExceeded {
307
+ log .Printf ("Timeout getting initial list of GRPC targets, retry in %s" , waitDelay )
308
+ } else {
309
+ log .Printf ("Could not get initial list of GRPC targets, retry in %s: %s" , waitDelay , err )
310
+ }
311
+
312
+ time .Sleep (waitDelay )
313
+ waitDelay = waitDelay * 2
314
+ if waitDelay > maxWaitDelay {
315
+ waitDelay = maxWaitDelay
316
+ }
317
+ continue
318
+ }
319
+
320
+ for _ , ev := range response .Kvs {
321
+ c .EtcdKeyUpdated (client , string (ev .Key ), ev .Value )
322
+ }
323
+ c .initializedFunc ()
324
+ return
325
+ }
326
+ }()
327
+ }
328
+
329
+ func (c * GrpcClients ) getGrpcTargets (client * EtcdClient , targetPrefix string ) (* clientv3.GetResponse , error ) {
330
+ ctx , cancel := context .WithTimeout (context .Background (), time .Second )
331
+ defer cancel ()
332
+
333
+ return client .Get (ctx , targetPrefix , clientv3 .WithPrefix ())
334
+ }
335
+
336
+ func (c * GrpcClients ) EtcdKeyUpdated (client * EtcdClient , key string , data []byte ) {
337
+ var info GrpcTargetInformationEtcd
338
+ if err := json .Unmarshal (data , & info ); err != nil {
339
+ log .Printf ("Could not decode GRPC target %s=%s: %s" , key , string (data ), err )
340
+ return
341
+ }
342
+ if err := info .CheckValid (); err != nil {
343
+ log .Printf ("Received invalid GRPC target %s=%s: %s" , key , string (data ), err )
344
+ return
345
+ }
346
+
347
+ c .mu .Lock ()
348
+ defer c .mu .Unlock ()
349
+
350
+ prev , found := c .targetInformation [key ]
351
+ if found && prev .Address != info .Address {
352
+ // Address of endpoint has changed, remove old one.
353
+ c .removeEtcdClientLocked (key )
354
+ }
355
+
356
+ if c .targetSelf != "" && info .Address == c .targetSelf {
357
+ log .Printf ("GRPC target %s is this server, ignoring %s" , info .Address , key )
358
+ c .wakeupForTesting ()
359
+ return
360
+ }
361
+
362
+ if _ , found := c .clientsMap [info .Address ]; found {
363
+ log .Printf ("GRPC target %s already exists, ignoring %s" , info .Address , key )
364
+ return
365
+ }
366
+
367
+ opts := c .dialOptions .Load ().([]grpc.DialOption )
368
+ cl , err := NewGrpcClient (info .Address , opts ... )
369
+ if err != nil {
370
+ log .Printf ("Could not create GRPC client for target %s: %s" , info .Address , err )
371
+ return
372
+ }
373
+
374
+ log .Printf ("Adding %s as GRPC target" , info .Address )
375
+
376
+ if c .clientsMap == nil {
377
+ c .clientsMap = make (map [string ]* GrpcClient )
378
+ }
379
+ c .clientsMap [info .Address ] = cl
380
+ c .clients = append (c .clients , cl )
381
+ c .targetInformation [key ] = & info
382
+ statsGrpcClients .Inc ()
383
+ c .wakeupForTesting ()
384
+ }
385
+
386
+ func (c * GrpcClients ) EtcdKeyDeleted (client * EtcdClient , key string ) {
387
+ c .mu .Lock ()
388
+ defer c .mu .Unlock ()
389
+
390
+ c .removeEtcdClientLocked (key )
391
+ }
392
+
393
+ func (c * GrpcClients ) removeEtcdClientLocked (key string ) {
394
+ info , found := c .targetInformation [key ]
395
+ if ! found {
396
+ log .Printf ("No connection found for %s, ignoring" , key )
397
+ c .wakeupForTesting ()
398
+ return
399
+ }
400
+
401
+ delete (c .targetInformation , key )
402
+ client , found := c .clientsMap [info .Address ]
403
+ if ! found {
404
+ return
405
+ }
406
+
407
+ log .Printf ("Removing connection to %s (from %s)" , info .Address , key )
408
+ if err := client .Close (); err != nil {
409
+ log .Printf ("Error closing client to %s: %s" , client .Target (), err )
410
+ }
411
+ delete (c .clientsMap , info .Address )
412
+ c .clients = make ([]* GrpcClient , 0 , len (c .clientsMap ))
413
+ for _ , client := range c .clientsMap {
414
+ c .clients = append (c .clients , client )
415
+ }
416
+ statsGrpcClients .Dec ()
417
+ c .wakeupForTesting ()
418
+ }
419
+
420
+ func (c * GrpcClients ) WaitForInitialized (ctx context.Context ) error {
421
+ select {
422
+ case <- ctx .Done ():
423
+ return ctx .Err ()
424
+ case <- c .initializedCtx .Done ():
425
+ return nil
426
+ }
427
+ }
428
+
429
+ func (c * GrpcClients ) wakeupForTesting () {
430
+ if c .wakeupChanForTesting == nil {
431
+ return
432
+ }
433
+
434
+ select {
435
+ case c .wakeupChanForTesting <- true :
436
+ default :
437
+ }
438
+ }
439
+
223
440
func (c * GrpcClients ) Reload (config * goconf.ConfigFile ) {
224
441
if err := c .load (config ); err != nil {
225
442
log .Printf ("Could not reload RPC clients: %s" , err )
@@ -238,6 +455,10 @@ func (c *GrpcClients) Close() {
238
455
239
456
c .clients = nil
240
457
c .clientsMap = nil
458
+
459
+ if c .etcdClient != nil {
460
+ c .etcdClient .RemoveListener (c )
461
+ }
241
462
}
242
463
243
464
func (c * GrpcClients ) GetClients () []* GrpcClient {
0 commit comments