@@ -462,8 +462,7 @@ var _ = Describe("ClusterClient", func() {
462
462
Describe ("pipelining" , func () {
463
463
var pipe * redis.Pipeline
464
464
465
- assertPipeline := func () {
466
- keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
465
+ assertPipeline := func (keys []string ) {
467
466
468
467
It ("follows redirects" , func () {
469
468
if ! failover {
@@ -482,13 +481,12 @@ var _ = Describe("ClusterClient", func() {
482
481
Expect (err ).NotTo (HaveOccurred ())
483
482
Expect (cmds ).To (HaveLen (14 ))
484
483
485
- _ = client .ForEachShard (ctx , func (ctx context.Context , node * redis.Client ) error {
486
- defer GinkgoRecover ()
487
- Eventually (func () int64 {
488
- return node .DBSize (ctx ).Val ()
489
- }, 30 * time .Second ).ShouldNot (BeZero ())
490
- return nil
491
- })
484
+ // Check that all keys are set.
485
+ for _ , key := range keys {
486
+ Eventually (func () string {
487
+ return client .Get (ctx , key ).Val ()
488
+ }, 30 * time .Second ).Should (Equal (key + "_value" ))
489
+ }
492
490
493
491
if ! failover {
494
492
for _ , key := range keys {
@@ -517,14 +515,14 @@ var _ = Describe("ClusterClient", func() {
517
515
})
518
516
519
517
It ("works with missing keys" , func () {
520
- pipe .Set (ctx , "A" , "A_value" , 0 )
521
- pipe .Set (ctx , "C" , "C_value" , 0 )
518
+ pipe .Set (ctx , "A{s} " , "A_value" , 0 )
519
+ pipe .Set (ctx , "C{s} " , "C_value" , 0 )
522
520
_ , err := pipe .Exec (ctx )
523
521
Expect (err ).NotTo (HaveOccurred ())
524
522
525
- a := pipe .Get (ctx , "A" )
526
- b := pipe .Get (ctx , "B" )
527
- c := pipe .Get (ctx , "C" )
523
+ a := pipe .Get (ctx , "A{s} " )
524
+ b := pipe .Get (ctx , "B{s} " )
525
+ c := pipe .Get (ctx , "C{s} " )
528
526
cmds , err := pipe .Exec (ctx )
529
527
Expect (err ).To (Equal (redis .Nil ))
530
528
Expect (cmds ).To (HaveLen (3 ))
@@ -547,7 +545,8 @@ var _ = Describe("ClusterClient", func() {
547
545
548
546
AfterEach (func () {})
549
547
550
- assertPipeline ()
548
+ keys := []string {"A" , "B" , "C" , "D" , "E" , "F" , "G" }
549
+ assertPipeline (keys )
551
550
552
551
It ("doesn't fail node with context.Canceled error" , func () {
553
552
ctx , cancel := context .WithCancel (context .Background ())
@@ -590,7 +589,10 @@ var _ = Describe("ClusterClient", func() {
590
589
591
590
AfterEach (func () {})
592
591
593
- assertPipeline ()
592
+ // TxPipeline doesn't support cross slot commands.
593
+ // Use hashtag to force all keys to the same slot.
594
+ keys := []string {"A{s}" , "B{s}" , "C{s}" , "D{s}" , "E{s}" , "F{s}" , "G{s}" }
595
+ assertPipeline (keys )
594
596
})
595
597
})
596
598
0 commit comments