Skip to content

Commit 05f42e2

Browse files
authored
fix(txpipeline): keyless commands should take the slot of the keyed (#3411)
* fix(txpipeline): keyless commands should take the slot of the keyed commands * fix(txpipeline): extract only keyed cmds from all cmds * chore(test): Add tests for keyless cmds and txpipeline * fix(cmdSlot): Add preferred random slot * fix(cmdSlot): Add shortlist of keyless cmds * chore(test): Fix ring test * fix(keylessCommands): Add list of keyless commands Add list of keyless Commands based on the Commands output for redis 8 * chore(txPipeline): refactor slottedCommands impl * fix(osscluster): typo
1 parent 884f997 commit 05f42e2

File tree

5 files changed

+155
-50
lines changed

5 files changed

+155
-50
lines changed

command.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,55 @@ import (
1717
"github.com/redis/go-redis/v9/internal/util"
1818
)
1919

20+
// keylessCommands contains Redis commands that have empty key specifications (9th slot empty)
21+
// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands
22+
var keylessCommands = map[string]struct{}{
23+
"acl": {},
24+
"asking": {},
25+
"auth": {},
26+
"bgrewriteaof": {},
27+
"bgsave": {},
28+
"client": {},
29+
"cluster": {},
30+
"config": {},
31+
"debug": {},
32+
"discard": {},
33+
"echo": {},
34+
"exec": {},
35+
"failover": {},
36+
"function": {},
37+
"hello": {},
38+
"latency": {},
39+
"lolwut": {},
40+
"module": {},
41+
"monitor": {},
42+
"multi": {},
43+
"pfselftest": {},
44+
"ping": {},
45+
"psubscribe": {},
46+
"psync": {},
47+
"publish": {},
48+
"pubsub": {},
49+
"punsubscribe": {},
50+
"quit": {},
51+
"readonly": {},
52+
"readwrite": {},
53+
"replconf": {},
54+
"replicaof": {},
55+
"role": {},
56+
"save": {},
57+
"script": {},
58+
"select": {},
59+
"shutdown": {},
60+
"slaveof": {},
61+
"slowlog": {},
62+
"subscribe": {},
63+
"swapdb": {},
64+
"sync": {},
65+
"unsubscribe": {},
66+
"unwatch": {},
67+
}
68+
2069
type Cmder interface {
2170
// command name.
2271
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
@@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
75124
return wr.WriteArgs(cmd.Args())
76125
}
77126

127+
// cmdFirstKeyPos returns the position of the first key in the command's arguments.
128+
// If the command does not have a key, it returns 0.
129+
// TODO: Use the data in CommandInfo to determine the first key position.
78130
func cmdFirstKeyPos(cmd Cmder) int {
79131
if pos := cmd.firstKeyPos(); pos != 0 {
80132
return int(pos)
81133
}
82134

83-
switch cmd.Name() {
135+
name := cmd.Name()
136+
137+
// first check if the command is keyless
138+
if _, ok := keylessCommands[name]; ok {
139+
return 0
140+
}
141+
142+
switch name {
84143
case "eval", "evalsha", "eval_ro", "evalsha_ro":
85144
if cmd.stringArg(2) != "0" {
86145
return 3

internal_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
364364
It("select slot from args for GETKEYSINSLOT command", func() {
365365
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
366366

367-
slot := client.cmdSlot(cmd)
367+
slot := client.cmdSlot(cmd, -1)
368368
Expect(slot).To(Equal(100))
369369
})
370370

371371
It("select slot from args for COUNTKEYSINSLOT command", func() {
372372
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
373373

374-
slot := client.cmdSlot(cmd)
374+
slot := client.cmdSlot(cmd, -1)
375375
Expect(slot).To(Equal(100))
376376
})
377+
378+
It("follows preferred random slot", func() {
379+
cmd := NewStatusCmd(ctx, "ping")
380+
381+
slot := client.cmdSlot(cmd, 101)
382+
Expect(slot).To(Equal(101))
383+
})
377384
})
378385
})

osscluster.go

Lines changed: 71 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
998998
}
999999

10001000
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
1001-
slot := c.cmdSlot(cmd)
1001+
slot := c.cmdSlot(cmd, -1)
10021002
var node *clusterNode
10031003
var moved bool
10041004
var ask bool
@@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13441344
return err
13451345
}
13461346

1347+
preferredRandomSlot := -1
13471348
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
13481349
for _, cmd := range cmds {
1349-
slot := c.cmdSlot(cmd)
1350+
slot := c.cmdSlot(cmd, preferredRandomSlot)
1351+
if preferredRandomSlot == -1 {
1352+
preferredRandomSlot = slot
1353+
}
13501354
node, err := c.slotReadOnlyNode(state, slot)
13511355
if err != nil {
13521356
return err
@@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
13571361
}
13581362

13591363
for _, cmd := range cmds {
1360-
slot := c.cmdSlot(cmd)
1364+
slot := c.cmdSlot(cmd, preferredRandomSlot)
1365+
if preferredRandomSlot == -1 {
1366+
preferredRandomSlot = slot
1367+
}
13611368
node, err := state.slotMasterNode(slot)
13621369
if err != nil {
13631370
return err
@@ -1519,58 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
15191526
return err
15201527
}
15211528

1522-
cmdsMap := c.mapCmdsBySlot(cmds)
1523-
// TxPipeline does not support cross slot transaction.
1524-
if len(cmdsMap) > 1 {
1529+
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
1530+
slot := -1
1531+
switch len(keyedCmdsBySlot) {
1532+
case 0:
1533+
slot = hashtag.RandomSlot()
1534+
case 1:
1535+
for sl := range keyedCmdsBySlot {
1536+
slot = sl
1537+
break
1538+
}
1539+
default:
1540+
// TxPipeline does not support cross slot transaction.
15251541
setCmdsErr(cmds, ErrCrossSlot)
15261542
return ErrCrossSlot
15271543
}
15281544

1529-
for slot, cmds := range cmdsMap {
1530-
node, err := state.slotMasterNode(slot)
1531-
if err != nil {
1532-
setCmdsErr(cmds, err)
1533-
continue
1534-
}
1545+
node, err := state.slotMasterNode(slot)
1546+
if err != nil {
1547+
setCmdsErr(cmds, err)
1548+
return err
1549+
}
15351550

1536-
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1537-
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1538-
if attempt > 0 {
1539-
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1540-
setCmdsErr(cmds, err)
1541-
return err
1542-
}
1551+
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1552+
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1553+
if attempt > 0 {
1554+
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1555+
setCmdsErr(cmds, err)
1556+
return err
15431557
}
1558+
}
15441559

1545-
failedCmds := newCmdsMap()
1546-
var wg sync.WaitGroup
1560+
failedCmds := newCmdsMap()
1561+
var wg sync.WaitGroup
15471562

1548-
for node, cmds := range cmdsMap {
1549-
wg.Add(1)
1550-
go func(node *clusterNode, cmds []Cmder) {
1551-
defer wg.Done()
1552-
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1553-
}(node, cmds)
1554-
}
1563+
for node, cmds := range cmdsMap {
1564+
wg.Add(1)
1565+
go func(node *clusterNode, cmds []Cmder) {
1566+
defer wg.Done()
1567+
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
1568+
}(node, cmds)
1569+
}
15551570

1556-
wg.Wait()
1557-
if len(failedCmds.m) == 0 {
1558-
break
1559-
}
1560-
cmdsMap = failedCmds.m
1571+
wg.Wait()
1572+
if len(failedCmds.m) == 0 {
1573+
break
15611574
}
1575+
cmdsMap = failedCmds.m
15621576
}
15631577

15641578
return cmdsFirstErr(cmds)
15651579
}
15661580

1567-
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1568-
cmdsMap := make(map[int][]Cmder)
1581+
// slottedKeyedCommands returns a map of slot to commands taking into account
1582+
// only commands that have keys.
1583+
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
1584+
cmdsSlots := map[int][]Cmder{}
1585+
1586+
preferredRandomSlot := -1
15691587
for _, cmd := range cmds {
1570-
slot := c.cmdSlot(cmd)
1571-
cmdsMap[slot] = append(cmdsMap[slot], cmd)
1588+
if cmdFirstKeyPos(cmd) == 0 {
1589+
continue
1590+
}
1591+
1592+
slot := c.cmdSlot(cmd, preferredRandomSlot)
1593+
if preferredRandomSlot == -1 {
1594+
preferredRandomSlot = slot
1595+
}
1596+
1597+
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
15721598
}
1573-
return cmdsMap
1599+
1600+
return cmdsSlots
15741601
}
15751602

15761603
func (c *ClusterClient) processTxPipelineNode(
@@ -1885,17 +1912,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
18851912
return info
18861913
}
18871914

1888-
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
1915+
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
18891916
args := cmd.Args()
18901917
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
18911918
return args[2].(int)
18921919
}
18931920

1894-
return cmdSlot(cmd, cmdFirstKeyPos(cmd))
1921+
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
18951922
}
18961923

1897-
func cmdSlot(cmd Cmder, pos int) int {
1924+
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
18981925
if pos == 0 {
1926+
if preferredRandomSlot != -1 {
1927+
return preferredRandomSlot
1928+
}
18991929
return hashtag.RandomSlot()
19001930
}
19011931
firstKey := cmd.stringArg(pos)

osscluster_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() {
603603
Expect(err).To(MatchError(redis.ErrCrossSlot))
604604
})
605605

606+
It("works normally with keyless commands and no CrossSlot error", func() {
607+
pipe.Set(ctx, "A{s}", "A_value", 0)
608+
pipe.Ping(ctx)
609+
pipe.Set(ctx, "B{s}", "B_value", 0)
610+
pipe.Ping(ctx)
611+
_, err := pipe.Exec(ctx)
612+
Expect(err).To(Not(HaveOccurred()))
613+
})
614+
606615
// doesn't fail when no commands are queued
607616
It("returns no error when there are no commands", func() {
608617
_, err := pipe.Exec(ctx)

ring_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,20 +304,20 @@ var _ = Describe("Redis Ring", func() {
304304
ring = redis.NewRing(opt)
305305
})
306306
It("supports Process hook", func() {
307-
err := ring.Ping(ctx).Err()
307+
err := ring.Set(ctx, "key", "test", 0).Err()
308308
Expect(err).NotTo(HaveOccurred())
309309

310310
var stack []string
311311

312312
ring.AddHook(&hook{
313313
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
314314
return func(ctx context.Context, cmd redis.Cmder) error {
315-
Expect(cmd.String()).To(Equal("ping: "))
315+
Expect(cmd.String()).To(Equal("get key: "))
316316
stack = append(stack, "ring.BeforeProcess")
317317

318318
err := hook(ctx, cmd)
319319

320-
Expect(cmd.String()).To(Equal("ping: PONG"))
320+
Expect(cmd.String()).To(Equal("get key: test"))
321321
stack = append(stack, "ring.AfterProcess")
322322

323323
return err
@@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() {
329329
shard.AddHook(&hook{
330330
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
331331
return func(ctx context.Context, cmd redis.Cmder) error {
332-
Expect(cmd.String()).To(Equal("ping: "))
332+
Expect(cmd.String()).To(Equal("get key: "))
333333
stack = append(stack, "shard.BeforeProcess")
334334

335335
err := hook(ctx, cmd)
336336

337-
Expect(cmd.String()).To(Equal("ping: PONG"))
337+
Expect(cmd.String()).To(Equal("get key: test"))
338338
stack = append(stack, "shard.AfterProcess")
339339

340340
return err
@@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() {
344344
return nil
345345
})
346346

347-
err = ring.Ping(ctx).Err()
347+
err = ring.Get(ctx, "key").Err()
348348
Expect(err).NotTo(HaveOccurred())
349349
Expect(stack).To(Equal([]string{
350350
"ring.BeforeProcess",

0 commit comments

Comments
 (0)