Skip to content

fix(txpipeline): keyless commands should take the slot of the keyed #3411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
61 changes: 60 additions & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,55 @@ import (
"github.com/redis/go-redis/v9/internal/util"
)

// keylessCommands contains Redis commands that have empty key specifications (9th slot empty)
// Only includes core Redis commands, excludes FT.*, ts.*, timeseries.*, search.* and subcommands
var keylessCommands = map[string]struct{}{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other implementations ascertain this information dynamically with a call to https://redis.io/docs/latest/commands/command/ at initialization time. I don't necessarily think go-redis should do that, since it would require a command to execute successfully just for the library to initialize its internal state correctly, but it could be an interesting angle to consider, which would allow you to avoid needing to hardcode this list and maintain this source of truth manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LINKIWI I agree. We are considering this since there is other data in the Commands output that can and should be used. We will probably start with a static list and update it on initialization, but it will be part of a separate feature.

"acl": {},
"asking": {},
"auth": {},
"bgrewriteaof": {},
"bgsave": {},
"client": {},
"cluster": {},
"config": {},
"debug": {},
"discard": {},
"echo": {},
"exec": {},
"failover": {},
"function": {},
"hello": {},
"latency": {},
"lolwut": {},
"module": {},
"monitor": {},
"multi": {},
"pfselftest": {},
"ping": {},
"psubscribe": {},
"psync": {},
"publish": {},
"pubsub": {},
"punsubscribe": {},
"quit": {},
"readonly": {},
"readwrite": {},
"replconf": {},
"replicaof": {},
"role": {},
"save": {},
"script": {},
"select": {},
"shutdown": {},
"slaveof": {},
"slowlog": {},
"subscribe": {},
"swapdb": {},
"sync": {},
"unsubscribe": {},
"unwatch": {},
}

type Cmder interface {
// command name.
// e.g. "set k v ex 10" -> "set", "cluster info" -> "cluster".
Expand Down Expand Up @@ -75,12 +124,22 @@ func writeCmd(wr *proto.Writer, cmd Cmder) error {
return wr.WriteArgs(cmd.Args())
}

// cmdFirstKeyPos returns the position of the first key in the command's arguments.
// If the command does not have a key, it returns 0.
// TODO: Use the data in CommandInfo to determine the first key position.
func cmdFirstKeyPos(cmd Cmder) int {
if pos := cmd.firstKeyPos(); pos != 0 {
return int(pos)
}

switch cmd.Name() {
name := cmd.Name()

// first check if the command is keyless
if _, ok := keylessCommands[name]; ok {
return 0
}

switch name {
case "eval", "evalsha", "eval_ro", "evalsha_ro":
if cmd.stringArg(2) != "0" {
return 3
Expand Down
11 changes: 9 additions & 2 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
It("select slot from args for GETKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("select slot from args for COUNTKEYSINSLOT command", func() {
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)

slot := client.cmdSlot(cmd)
slot := client.cmdSlot(cmd, -1)
Expect(slot).To(Equal(100))
})

It("follows preferred random slot", func() {
cmd := NewStatusCmd(ctx, "ping")

slot := client.cmdSlot(cmd, 101)
Expect(slot).To(Equal(101))
})
})
})
70 changes: 62 additions & 8 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
}

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, -1)
var node *clusterNode
var moved bool
var ask bool
Expand Down Expand Up @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
return err
}

preferredRandomSlot := -1
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := c.slotReadOnlyNode(state, slot)
if err != nil {
return err
Expand All @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
}

for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
node, err := state.slotMasterNode(slot)
if err != nil {
return err
Expand Down Expand Up @@ -1519,8 +1526,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err
}

cmdsMap := c.mapCmdsBySlot(cmds)
cmdsMap := map[int][]Cmder{}
Copy link
Preview

Copilot AI Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider refactoring the slot determination logic for keyed commands (from lines 1528-1556) into a separate helper function to enhance readability and reduce complexity.

Copilot uses AI. Check for mistakes.

slot := -1
// get only the keyed commands
keyedCmds := c.keyedCmds(cmds)
if len(keyedCmds) == 0 {
// no keyed commands try random slot
slot = hashtag.RandomSlot()
} else {
// keyed commands, get slot from them
// if more than one slot, return cross slot error
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
if len(cmdsBySlot) > 1 {
// cross slot error, we have more than one slot for keyed commands
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}
// get the slot, should be only one
for sl := range cmdsBySlot {
slot = sl
break
}
}
// slot was not determined, try random one
if slot == -1 {
slot = hashtag.RandomSlot()
}
cmdsMap[slot] = cmds

// TxPipeline does not support cross slot transaction.
// double check the commands are in the same slot
if len(cmdsMap) > 1 {
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
Expand Down Expand Up @@ -1566,13 +1601,29 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err

func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
cmdsMap := make(map[int][]Cmder)
preferredRandomSlot := -1
for _, cmd := range cmds {
slot := c.cmdSlot(cmd)
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap
}

// keyedCmds returns all the keyed commands from the cmds slice
// it determines keyed commands by checking if the command has a first key position
func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder {
keyedCmds := make([]Cmder, 0, len(cmds))
for _, cmd := range cmds {
if cmdFirstKeyPos(cmd) != 0 {
keyedCmds = append(keyedCmds, cmd)
}
}
return keyedCmds
}

func (c *ClusterClient) processTxPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
) {
Expand Down Expand Up @@ -1885,17 +1936,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
return info
}

func (c *ClusterClient) cmdSlot(cmd Cmder) int {
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
args := cmd.Args()
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
return args[2].(int)
}

return cmdSlot(cmd, cmdFirstKeyPos(cmd))
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
}

func cmdSlot(cmd Cmder, pos int) int {
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
if pos == 0 {
if preferredRandomSlot != -1 {
return preferredRandomSlot
}
return hashtag.RandomSlot()
}
firstKey := cmd.stringArg(pos)
Expand Down
9 changes: 9 additions & 0 deletions osscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,15 @@ var _ = Describe("ClusterClient", func() {
Expect(err).To(MatchError(redis.ErrCrossSlot))
})

It("works normally with keyless commands and no CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Ping(ctx)
pipe.Set(ctx, "B{s}", "B_value", 0)
pipe.Ping(ctx)
_, err := pipe.Exec(ctx)
Expect(err).To(Not(HaveOccurred()))
})

// doesn't fail when no commands are queued
It("returns no error when there are no commands", func() {
_, err := pipe.Exec(ctx)
Expand Down
12 changes: 6 additions & 6 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,20 +304,20 @@ var _ = Describe("Redis Ring", func() {
ring = redis.NewRing(opt)
})
It("supports Process hook", func() {
err := ring.Ping(ctx).Err()
err := ring.Set(ctx, "key", "test", 0).Err()
Expect(err).NotTo(HaveOccurred())

var stack []string

ring.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "ring.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "ring.AfterProcess")

return err
Expand All @@ -329,12 +329,12 @@ var _ = Describe("Redis Ring", func() {
shard.AddHook(&hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
Expect(cmd.String()).To(Equal("ping: "))
Expect(cmd.String()).To(Equal("get key: "))
stack = append(stack, "shard.BeforeProcess")

err := hook(ctx, cmd)

Expect(cmd.String()).To(Equal("ping: PONG"))
Expect(cmd.String()).To(Equal("get key: test"))
stack = append(stack, "shard.AfterProcess")

return err
Expand All @@ -344,7 +344,7 @@ var _ = Describe("Redis Ring", func() {
return nil
})

err = ring.Ping(ctx).Err()
err = ring.Get(ctx, "key").Err()
Expect(err).NotTo(HaveOccurred())
Expect(stack).To(Equal([]string{
"ring.BeforeProcess",
Expand Down
Loading