Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func newStandaloneClient(opt *ClientOption, connFn connFn, retryer retryHandler)
}
s := &standalone{
toReplicas: opt.SendToReplicas,
nodeSelector: opt.ReadNodeSelector,
replicas: make([]*singleClient, len(opt.Standalone.ReplicaAddress)),
enableRedirect: opt.Standalone.EnableRedirect,
connFn: connFn,
Expand All @@ -39,29 +40,56 @@ func newStandaloneClient(opt *ClientOption, connFn connFn, retryer retryHandler)
}
s.replicas[i] = newSingleClientWithConn(replicaConn, cmds.NewBuilder(cmds.NoSlot), !opt.DisableRetry, opt.DisableCache, retryer, false)
}
if s.opt.EnableReplicaAZInfo && (s.opt.ReadNodeSelector != nil || len(s.replicas) > 1) {
s.nodes = make([]NodeInfo, len(s.replicas)+1)
primary := s.primary.Load()
s.nodes[0] = NodeInfo{
Addr: primary.conn.Addr(),
AZ: primary.conn.AZ(),
}
for i, replica := range s.replicas {
s.nodes[i+1] = NodeInfo{
Addr: replica.conn.Addr(),
AZ: replica.conn.AZ(),
}
}
}
return s, nil
}

type standalone struct {
retryer retryHandler
toReplicas func(Completed) bool
nodeSelector func(uint16, []NodeInfo) int
primary atomic.Pointer[singleClient]
connFn connFn
opt *ClientOption
redirectCall call
replicas []*singleClient
nodes []NodeInfo
enableRedirect bool
}

func (s *standalone) B() Builder {
return s.primary.Load().B()
}

func (s *standalone) pick() int {
func (s *standalone) pick() *singleClient {
if s.nodeSelector != nil {
rIndex := s.nodeSelector(0, s.nodes)
if rIndex < 0 || rIndex >= len(s.nodes) {
rIndex = 0
}
if rIndex == 0 {
return s.primary.Load()
}
return s.replicas[rIndex-1]
}

if len(s.replicas) == 1 {
return 0
return s.replicas[0]
}
return rand.IntN(len(s.replicas))
return s.replicas[rand.IntN(len(s.replicas))]
}

func (s *standalone) redirectToPrimary(addr string) error {
Expand Down Expand Up @@ -106,7 +134,7 @@ func (s *standalone) Do(ctx context.Context, cmd Completed) (resp RedisResult) {

retry:
if s.toReplicas != nil && s.toReplicas(cmd) {
resp = s.replicas[s.pick()].Do(ctx, cmd)
resp = s.pick().Do(ctx, cmd)
} else {
resp = s.primary.Load().Do(ctx, cmd)
}
Expand Down Expand Up @@ -144,7 +172,7 @@ retry:
}
}
if toReplica {
resp = s.replicas[s.pick()].DoMulti(ctx, multi...)
resp = s.pick().DoMulti(ctx, multi...)
} else {
resp = s.primary.Load().DoMulti(ctx, multi...)
}
Expand All @@ -171,7 +199,7 @@ retry:

func (s *standalone) Receive(ctx context.Context, subscribe Completed, fn func(msg PubSubMessage)) error {
if s.toReplicas != nil && s.toReplicas(subscribe) {
return s.replicas[s.pick()].Receive(ctx, subscribe, fn)
return s.pick().Receive(ctx, subscribe, fn)
}
return s.primary.Load().Receive(ctx, subscribe, fn)
}
Expand Down Expand Up @@ -241,7 +269,7 @@ retry:
func (s *standalone) DoStream(ctx context.Context, cmd Completed) RedisResultStream {
var stream RedisResultStream
if s.toReplicas != nil && s.toReplicas(cmd) {
stream = s.replicas[s.pick()].DoStream(ctx, cmd)
stream = s.pick().DoStream(ctx, cmd)
} else {
stream = s.primary.Load().DoStream(ctx, cmd)
}
Expand All @@ -258,7 +286,7 @@ func (s *standalone) DoMultiStream(ctx context.Context, multi ...Completed) Mult
}
}
if toReplica {
stream = s.replicas[s.pick()].DoMultiStream(ctx, multi...)
stream = s.pick().DoMultiStream(ctx, multi...)
} else {
stream = s.primary.Load().DoMultiStream(ctx, multi...)
}
Expand Down
88 changes: 80 additions & 8 deletions standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,10 @@ func TestStandalonePickReplica(t *testing.T) {
}
defer s.Close()

// Test that pick() returns 0 for single replica
index := s.pick()
if index != 0 {
t.Errorf("expected index 0, got %d", index)
// Test that pick() returns the single replica
client := s.pick()
if client != s.replicas[0] {
t.Errorf("expected replica client, got different client")
}
}

Expand Down Expand Up @@ -681,11 +681,11 @@ func TestStandalonePickMultipleReplicas(t *testing.T) {
}
defer s.Close()

// Test that pick() returns a valid index for multiple replicas
// Test that pick() returns a valid replica for multiple replicas
for i := 0; i < 10; i++ {
index := s.pick()
if index < 0 || index >= 2 {
t.Errorf("expected index 0 or 1, got %d", index)
client := s.pick()
if client != s.replicas[0] && client != s.replicas[1] {
t.Errorf("expected one of the replica clients, got different client")
}
}
}
Expand Down Expand Up @@ -995,3 +995,75 @@ func TestStandaloneDoMultiCacheWithRedirectRetryFailure(t *testing.T) {
t.Errorf("expected REDIRECT error, got %v", results[0].Error())
}
}

func TestStandaloneReadNodeSelector(t *testing.T) {
defer ShouldNotLeak(SetupLeakDetection())

t.Run("ReadNodeSelector", func(t *testing.T) {
primaryNodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "primary"), nil)
},
AZFn: func() string {
return "us-east-1a"
},
}
replica1NodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "replica1"), nil)
},
AZFn: func() string {
return "us-east-1a" // Same AZ as client
},
}
replica2NodeConn := &mockConn{
DoFn: func(cmd Completed) RedisResult {
return newResult(strmsg('+', "replica2"), nil)
},
AZFn: func() string {
return "us-east-1b" // Different AZ
},
}

client, err := newStandaloneClient(&ClientOption{
InitAddress: []string{"primary"},
Standalone: StandaloneOption{
ReplicaAddress: []string{"replica1", "replica2"},
},
EnableReplicaAZInfo: true,
SendToReplicas: func(cmd Completed) bool {
return cmd.IsReadOnly()
},
ReadNodeSelector: func(slot uint16, nodes []NodeInfo) int {
for i := 1; i < len(nodes); i++ {
if nodes[i].AZ == "us-east-1a" {
return i
}
}
return -1
},
DisableRetry: true,
}, func(dst string, opt *ClientOption) conn {
if dst == "primary" {
return primaryNodeConn
}
if dst == "replica1" {
return replica1NodeConn
}
return replica2NodeConn
}, newRetryer(defaultRetryDelayFn))

if err != nil {
t.Fatalf("unexpected err %v", err)
}
defer client.Close()

// Verify same-AZ replica (replica1) is selected when executing GET command
for i := 0; i < 10; i++ {
result := client.Do(context.Background(), client.B().Get().Key("key").Build())
if val, _ := result.ToString(); val != "replica1" {
t.Fatalf("expected same-AZ replica1 to be selected, got %s", val)
}
}
})
}
Loading