From 1349de325fe05b779cf2fea9665025c6fd6c1a40 Mon Sep 17 00:00:00 2001 From: DengY11 <212294929@qq.com> Date: Fri, 23 May 2025 21:39:03 +0800 Subject: [PATCH 1/2] feat: expose shard information in redis.Ring - Add GetShards() method to retrieve a list of active shard clients. - Add GetShardByKey(key string) method to get the shard client for a specific key. - These methods enable users to manage Pub/Sub operations more effectively by accessing shard-specific clients. --- ring.go | 19 ++++++++++++++ ring_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/ring.go b/ring.go index fe8a6dc47..8f3defa65 100644 --- a/ring.go +++ b/ring.go @@ -847,3 +847,22 @@ func (c *Ring) Close() error { return c.sharding.Close() } + +func (c *Ring) GetShards() []*Client { + shards := c.sharding.List() + clients := make([]*Client, 0, len(shards)) + for _, shard := range shards { + if shard.IsUp() { + clients = append(clients, shard.Client) + } + } + return clients +} + +func (c *Ring) GetShardByKey(key string) (*Client, error) { + shard, err := c.sharding.GetByKey(key) + if err != nil { + return nil, err + } + return shard.Client, nil +} diff --git a/ring_test.go b/ring_test.go index cfd545c17..bddbc31ba 100644 --- a/ring_test.go +++ b/ring_test.go @@ -766,3 +766,74 @@ var _ = Describe("Ring Tx timeout", func() { testTimeout() }) }) + +var _ = Describe("Ring GetShards and GetShardByKey", func() { + var ring *redis.Ring + + BeforeEach(func() { + ring = redis.NewRing(&redis.RingOptions{ + Addrs: map[string]string{ + "shard1": ":6379", + "shard2": ":6380", + }, + }) + }) + + AfterEach(func() { + Expect(ring.Close()).NotTo(HaveOccurred()) + }) + + It("GetShards returns active shard clients", func() { + shards := ring.GetShards() + if len(shards) == 0 { + // Expected if Redis servers are not running + Skip("No active shards found (Redis servers not running)") + } else { + Expect(len(shards)).To(BeNumerically(">", 0)) + for _, client := range shards { + Expect(client).NotTo(BeNil()) + } + } + }) + + It("GetShardByKey returns correct shard for keys", func() { + testKeys := []string{"key1", "key2", "user:123", "channel:test"} + + for _, key := range testKeys { + client, err := ring.GetShardByKey(key) + Expect(err).NotTo(HaveOccurred()) + Expect(client).NotTo(BeNil()) + } + }) + + It("GetShardByKey is consistent for same key", func() { + key := "test:consistency" + + var firstClient *redis.Client + for i := 0; i < 5; i++ { + client, err := ring.GetShardByKey(key) + Expect(err).NotTo(HaveOccurred()) + Expect(client).NotTo(BeNil()) + + if i == 0 { + firstClient = client + } else { + Expect(client.String()).To(Equal(firstClient.String())) + } + } + }) + + It("GetShardByKey distributes keys across shards", func() { + testKeys := []string{"key1", "key2", "key3", "key4", "key5"} + shardMap := make(map[string]int) + + for _, key := range testKeys { + client, err := ring.GetShardByKey(key) + Expect(err).NotTo(HaveOccurred()) + shardMap[client.String()]++ + } + + Expect(len(shardMap)).To(BeNumerically(">=", 1)) + Expect(len(shardMap)).To(BeNumerically("<=", 2)) // At most 2 shards (our setup) + }) +}) From 36558c9b23105685ba6e77b99db0a5be7953092f Mon Sep 17 00:00:00 2001 From: DengY11 <212294929@qq.com> Date: Tue, 27 May 2025 21:53:10 +0800 Subject: [PATCH 2/2] rename GetShardClients and GetShardClientForKey --- ring.go | 8 ++++++-- ring_test.go | 26 +++++++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/ring.go b/ring.go index 8f3defa65..8a004b8c0 100644 --- a/ring.go +++ b/ring.go @@ -848,7 +848,9 @@ func (c *Ring) Close() error { return c.sharding.Close() } -func (c *Ring) GetShards() []*Client { +// GetShardClients returns a list of all shard clients in the ring. +// This can be used to create dedicated connections (e.g., PubSub) for each shard. +func (c *Ring) GetShardClients() []*Client { shards := c.sharding.List() clients := make([]*Client, 0, len(shards)) for _, shard := range shards { @@ -859,7 +861,9 @@ func (c *Ring) GetShards() []*Client { return clients } -func (c *Ring) GetShardByKey(key string) (*Client, error) { +// GetShardClientForKey returns the shard client that would handle the given key. +// This can be used to determine which shard a particular key/channel would be routed to. +func (c *Ring) GetShardClientForKey(key string) (*Client, error) { shard, err := c.sharding.GetByKey(key) if err != nil { return nil, err diff --git a/ring_test.go b/ring_test.go index 3bfd67e4e..aaac74dc9 100644 --- a/ring_test.go +++ b/ring_test.go @@ -783,7 +783,7 @@ var _ = Describe("Ring Tx timeout", func() { }) }) -var _ = Describe("Ring GetShards and GetShardByKey", func() { +var _ = Describe("Ring GetShardClients and GetShardClientForKey", func() { var ring *redis.Ring BeforeEach(func() { @@ -799,8 +799,12 @@ var _ = Describe("Ring GetShards and GetShardByKey", func() { Expect(ring.Close()).NotTo(HaveOccurred()) }) - It("GetShards returns active shard clients", func() { - shards := ring.GetShards() + It("GetShardClients returns active shard clients", func() { + shards := ring.GetShardClients() + // Note: This test will pass even if Redis servers are not running, + // because GetShardClients only returns clients that are marked as "up", + // and newly created shards start as "up" until the first health check fails. + if len(shards) == 0 { // Expected if Redis servers are not running Skip("No active shards found (Redis servers not running)") @@ -812,22 +816,24 @@ var _ = Describe("Ring GetShards and GetShardByKey", func() { } }) - It("GetShardByKey returns correct shard for keys", func() { + It("GetShardClientForKey returns correct shard for keys", func() { testKeys := []string{"key1", "key2", "user:123", "channel:test"} for _, key := range testKeys { - client, err := ring.GetShardByKey(key) + client, err := ring.GetShardClientForKey(key) Expect(err).NotTo(HaveOccurred()) Expect(client).NotTo(BeNil()) } }) - It("GetShardByKey is consistent for same key", func() { + It("GetShardClientForKey is consistent for same key", func() { key := "test:consistency" + // Call GetShardClientForKey multiple times with the same key + // Should always return the same shard var firstClient *redis.Client for i := 0; i < 5; i++ { - client, err := ring.GetShardByKey(key) + client, err := ring.GetShardClientForKey(key) Expect(err).NotTo(HaveOccurred()) Expect(client).NotTo(BeNil()) @@ -839,17 +845,19 @@ var _ = Describe("Ring GetShards and GetShardByKey", func() { } }) - It("GetShardByKey distributes keys across shards", func() { + It("GetShardClientForKey distributes keys across shards", func() { testKeys := []string{"key1", "key2", "key3", "key4", "key5"} shardMap := make(map[string]int) for _, key := range testKeys { - client, err := ring.GetShardByKey(key) + client, err := ring.GetShardClientForKey(key) Expect(err).NotTo(HaveOccurred()) shardMap[client.String()]++ } + // Should have at least 1 shard (could be all keys go to same shard due to hashing) Expect(len(shardMap)).To(BeNumerically(">=", 1)) + // But with multiple keys, we expect some distribution Expect(len(shardMap)).To(BeNumerically("<=", 2)) // At most 2 shards (our setup) }) })