Skip to content

fix(txpipeline): keyless commands should take the slot of the keyed #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion command.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,55 @@ import (
"github.com/redis/go-redis/v9/internal/util"
)

// keylessCommands contains Redis commands that have empty key specifications (9th slot empty)
// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands
var keylessCommands = map[string]struct{}{
"acl": {},
"asking": {},
"auth": {},
"bgrewriteaof": {},
"bgsave": {},
"client": {},
"cluster": {},
"config": {},
"debug": {},
"discard": {},
"echo": {},
"exec": {},
"failover": {},
"function": {},
"hello": {},
"latency": {},
"lolwut": {},
"module": {},
"monitor": {},
"multi": {},
"pfselftest": {},
"ping": {},
"psubscribe": {},
"psync": {},
"publish": {},
"pubsub": {},
"punsubscribe": {},
"quit": {},
"readonly": {},
"readwrite": {},
"replconf": {},
"replicaof": {},
"role": {},
"save": {},
"script": {},
"select": {},
"shutdown": {},
"slaveof": {},
"slowlog": {},
"subscribe": {},
"swapdb": {},
"sync": {},
"unsubscribe": {},
"unwatch": {},
}

type Cmder interface {
// command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
@@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}

// cmdFirstKeyPos returns the position of the first key in the command's arguments.
// If the command does not have a key, it returns 0.
// TODO: Use the data in CommandInfo to determine the first key position.
func cmdFirstKeyPos(cmd Cmder) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}

switch cmd.Name() {
name := cmd.Name()

// first check if the command is keyless
if _, ok := keylessCommands[name]; ok {
return 0
}

switch name {
case "eval", "evalsha", "eval_ro", "evalsha_ro":
if cmd.stringArg(2) != "0" {
return 3
11 changes: 9 additions & 2 deletions internal_test.go
Original file line number Diff line number Diff line change
@@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("follows preferred random slot", func() {
cmd := NewStatusCmd(ctx, "ping")

slot := client.cmdSlot(cmd, 101)
Expect(slot).To(Equal(101))
})
})
})
112 changes: 71 additions & 41 deletions osscluster.go
Original file line number Diff line number Diff line change
@@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
}

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, -1)
var node *clusterNode
var moved bool
var ask bool
@@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
return err
}

preferredRandomSlot := -1
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
@@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
}

for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
@@ -1519,58 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err
}

cmdsMap := c.mapCmdsBySlot(cmds)
// TxPipeline does not support cross slot transaction.
if len(cmdsMap) > 1 {
keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
slot := -1
switch len(keyedCmdsBySlot) {
case 0:
slot = hashtag.RandomSlot()
case 1:
for sl := range keyedCmdsBySlot {
slot = sl
break
}
default:
// TxPipeline does not support cross slot transaction.
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}

for slot, cmds := range cmdsMap {
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
return err
}

cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
}

failedCmds := newCmdsMap()
var wg sync.WaitGroup
failedCmds := newCmdsMap()
var wg sync.WaitGroup

for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
}(node, cmds)
}
for node, cmds := range cmdsMap {
wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
defer wg.Done()
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
}(node, cmds)
}

wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
}

return cmdsFirstErr(cmds)
}

func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
// slottedKeyedCommands returns a map of slot to commands taking into account
// only commands that have keys.
func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
cmdsSlots := map[int][]Cmder{}

preferredRandomSlot := -1
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
cmdsMap[slot] = append(cmdsMap[slot], cmd)
if cmdFirstKeyPos(cmd) == 0 {
continue
}

slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}

cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
}
return cmdsMap

return cmdsSlots
}

func (c *ClusterClient) processTxPipelineNode(
@@ -1885,17 +1912,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
return info
}

func (c *ClusterClient) cmdSlot(cmd Cmder) int {
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
args := cmd.Args()
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

return cmdSlot(cmd, cmdFirstKeyPos(cmd))
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
}

func cmdSlot(cmd Cmder, pos int) int {
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
if pos == 0 {
if preferredRandomSlot != -1 {
return preferredRandomSlot
}
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)
9 changes: 9 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
@@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() {
Expect(err).To(MatchError(redis.ErrCrossSlot))
})

It("works normally with keyless commands and no CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Ping(ctx)
pipe.Set(ctx, "B{s}", "B_value", 0)
pipe.Ping(ctx)
_, err := pipe.Exec(ctx)
Expect(err).To(Not(HaveOccurred()))
})

// doesn't fail when no commands are queued
It("returns no error when there are no commands", func() {
_, err := pipe.Exec(ctx)
12 changes: 6 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
@@ -304,20 +304,20 @@ var _ = Describe("Redis Ring", func() {
ring = redis.NewRing(opt)
})
It("supports Process hook", func() {
err := ring.Ping(ctx).Err()
err := ring.Set(ctx, "key", "test", 0).Err()
Expect(err).NotTo(HaveOccurred())

var stack []string

ring.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "ring.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "ring.AfterProcess")

return err
@@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() {
shard.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "shard.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "shard.AfterProcess")

return err
@@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() {
return nil
})

err = ring.Ping(ctx).Err()
err = ring.Get(ctx, "key").Err()
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcess",