@@ -18,8 +18,11 @@ import (
18
18
"context"
19
19
"errors"
20
20
"fmt"
21
+ "math/rand"
21
22
"reflect"
23
+ "strconv"
22
24
"sync"
25
+ "sync/atomic"
23
26
"testing"
24
27
"time"
25
28
@@ -667,3 +670,63 @@ func TestCallbackError(t *testing.T) {
667
670
})
668
671
}
669
672
}
673
+
674
+ func TestSOTWLinearCacheIntegrationDeadLock (t * testing.T ) {
675
+ for _ , typ := range testTypes {
676
+ t .Run (typ , func (t * testing.T ) {
677
+ t .Log ("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked" )
678
+
679
+ nonce := int64 (0 )
680
+ ver , targetVer := uint64 (0 ), uint64 (100000 )
681
+ untilVerExceed := func (exceed uint64 , fn func (current uint64 )) {
682
+ for current := atomic .LoadUint64 (& ver ); current < exceed ; current = atomic .LoadUint64 (& ver ) {
683
+ fn (current )
684
+ }
685
+ }
686
+
687
+ config := cache .NewLinearCache (typ )
688
+ s := server .NewServer (context .Background (), config , server.CallbackFuncs {})
689
+
690
+ resources := make ([]string , 10 )
691
+ for i := range resources {
692
+ resources [i ] = strconv .Itoa (i )
693
+ }
694
+
695
+ mStream := makeMockStream (t )
696
+
697
+ go func () {
698
+ untilVerExceed (targetVer , func (current uint64 ) {
699
+ mStream .recv <- & discovery.DiscoveryRequest {
700
+ Node : node ,
701
+ TypeUrl : typ ,
702
+ ResourceNames : resources ,
703
+ VersionInfo : strconv .FormatUint (current , 10 ),
704
+ ResponseNonce : strconv .FormatInt (atomic .LoadInt64 (& nonce ), 10 ),
705
+ }
706
+ })
707
+ close (mStream .recv )
708
+ }()
709
+
710
+ go func () {
711
+ untilVerExceed (targetVer , func (current uint64 ) {
712
+ config .SetResources (map [string ]types.Resource {
713
+ resources [rand .Intn (len (resources ))]: opaque , //nolint
714
+ })
715
+ })
716
+ }()
717
+
718
+ go func () {
719
+ for resp := range mStream .sent {
720
+ v , _ := strconv .ParseUint (resp .VersionInfo , 10 , 64 )
721
+ atomic .StoreUint64 (& ver , v )
722
+ n , _ := strconv .ParseInt (resp .Nonce , 10 , 64 )
723
+ atomic .StoreInt64 (& nonce , n )
724
+ }
725
+ }()
726
+
727
+ err := s .StreamAggregatedResources (mStream )
728
+ assert .Nil (t , err )
729
+ close (mStream .sent )
730
+ })
731
+ }
732
+ }
0 commit comments