@@ -18,8 +18,11 @@ import (
18
18
"context"
19
19
"errors"
20
20
"fmt"
21
+ "math/rand"
21
22
"reflect"
23
+ "strconv"
22
24
"sync"
25
+ "sync/atomic"
23
26
"testing"
24
27
"time"
25
28
@@ -682,3 +685,63 @@ func TestCallbackError(t *testing.T) {
682
685
})
683
686
}
684
687
}
688
+
689
+ func TestSOTWLinearCacheIntegrationDeadLock (t * testing.T ) {
690
+ for _ , typ := range testTypes {
691
+ t .Run (typ , func (t * testing.T ) {
692
+ t .Log ("Integrating LinearCache with SOTW server. If this take too long, they might be dead locked" )
693
+
694
+ nonce := int64 (0 )
695
+ ver , targetVer := uint64 (0 ), uint64 (100000 )
696
+ untilVerExceed := func (exceed uint64 , fn func (current uint64 )) {
697
+ for current := atomic .LoadUint64 (& ver ); current < exceed ; current = atomic .LoadUint64 (& ver ) {
698
+ fn (current )
699
+ }
700
+ }
701
+
702
+ config := cache .NewLinearCache (typ )
703
+ s := server .NewServer (context .Background (), config , server.CallbackFuncs {})
704
+
705
+ resources := make ([]string , 10 )
706
+ for i := range resources {
707
+ resources [i ] = strconv .Itoa (i )
708
+ }
709
+
710
+ mStream := makeMockStream (t )
711
+
712
+ go func () {
713
+ untilVerExceed (targetVer , func (current uint64 ) {
714
+ mStream .recv <- & discovery.DiscoveryRequest {
715
+ Node : node ,
716
+ TypeUrl : typ ,
717
+ ResourceNames : resources ,
718
+ VersionInfo : strconv .FormatUint (current , 10 ),
719
+ ResponseNonce : strconv .FormatInt (atomic .LoadInt64 (& nonce ), 10 ),
720
+ }
721
+ })
722
+ close (mStream .recv )
723
+ }()
724
+
725
+ go func () {
726
+ untilVerExceed (targetVer , func (current uint64 ) {
727
+ config .SetResources (map [string ]types.Resource {
728
+ resources [rand .Intn (len (resources ))]: opaque , //nolint
729
+ })
730
+ })
731
+ }()
732
+
733
+ go func () {
734
+ for resp := range mStream .sent {
735
+ v , _ := strconv .ParseUint (resp .VersionInfo , 10 , 64 )
736
+ atomic .StoreUint64 (& ver , v )
737
+ n , _ := strconv .ParseInt (resp .Nonce , 10 , 64 )
738
+ atomic .StoreInt64 (& nonce , n )
739
+ }
740
+ }()
741
+
742
+ err := s .StreamAggregatedResources (mStream )
743
+ assert .Nil (t , err )
744
+ close (mStream .sent )
745
+ })
746
+ }
747
+ }
0 commit comments