From edb6283eb5481be8aaec2a487813052920eef399 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 7 May 2025 16:41:02 +0300 Subject: [PATCH 1/4] xinfo-groups: support nil lag in XINFO GROUPS --- command.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command.go b/command.go index 3253af6cc..b8bbdb9bc 100644 --- a/command.go +++ b/command.go @@ -2189,6 +2189,8 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { // to the group's consumers, or a NULL(Nil) when that number can't be determined. if err != nil && err != Nil { return err + } else if err == Nil { + group.Lag = -1 } default: return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key) From fc7939534f26d89112d3f30c3ec5c1d87eaf9f69 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 7 May 2025 17:00:05 +0300 Subject: [PATCH 2/4] Add test --- commands_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/commands_test.go b/commands_test.go index 8b2aa37d4..5256a6fbf 100644 --- a/commands_test.go +++ b/commands_test.go @@ -6772,6 +6772,36 @@ var _ = Describe("Commands", func() { })) }) + It("should return -1 for nil lag in XINFO GROUPS", func() { + _, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result() + Expect(err).NotTo(HaveOccurred()) + + client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}}) + Expect(err).NotTo(HaveOccurred()) + client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}}) + Expect(err).NotTo(HaveOccurred()) + + err = client.XGroupCreate(ctx, "s", "g", "0").Err() + Expect(err).NotTo(HaveOccurred()) + err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err() + Expect(err).NotTo(HaveOccurred()) + + client.XDel(ctx, "s", "0-2") + + res, err := client.XInfoGroups(ctx, "s").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(Equal([]redis.XInfoGroup{ + { + Name: "g", + Consumers: 1, + Pending: 1, + LastDeliveredID: "0-1", + EntriesRead: 1, + Lag: -1, // nil lag from Redis is reported as -1 + }, + })) + }) + It("should XINFO CONSUMERS", func() { res, err := client.XInfoConsumers(ctx, "stream", "group1").Result() Expect(err).NotTo(HaveOccurred()) From 40eb89cbdedb4edd8eeab19ac77dfe77c0aee966 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Tue, 13 May 2025 15:36:43 +0300 Subject: [PATCH 3/4] docs: clarify XInfoGroup.Lag field behavior with Nil values --- command.go | 1 + 1 file changed, 1 insertion(+) diff --git a/command.go b/command.go index b8bbdb9bc..c39b2c194 100644 --- a/command.go +++ b/command.go @@ -2187,6 +2187,7 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error { // lag: the number of entries in the stream that are still waiting to be delivered // to the group's consumers, or a NULL(Nil) when that number can't be determined. + // In that case, we return -1. if err != nil && err != Nil { return err } else if err == Nil { From a1f128fc3dbd72c28659d265712694f7b7b0c350 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Wed, 21 May 2025 10:12:04 +0300 Subject: [PATCH 4/4] docs: clarify XInfoGroup.Lag field behavior --- command.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/command.go b/command.go index c39b2c194..5fa347f43 100644 --- a/command.go +++ b/command.go @@ -2104,7 +2104,9 @@ type XInfoGroup struct { Pending int64 LastDeliveredID string EntriesRead int64 - Lag int64 + // Lag represents the number of pending messages in the stream not yet + // delivered to this consumer group. Returns -1 when the lag cannot be determined. + Lag int64 } var _ Cmder = (*XInfoGroupsCmd)(nil)