Skip to content

Commit 493ec66

Browse files
committed
fix(tests): debug logger
1 parent d780401 commit 493ec66

File tree

2 files changed

+32
-29
lines changed

2 files changed

+32
-29
lines changed

internal/pool/pool.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,9 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
392392
}
393393
}
394394
// For non-RESP3 or data that is not a push notification, buffered data is unexpected
395-
internal.Logger.Printf(ctx, "Conn has unread data")
395+
internal.Logger.Printf(ctx, "Conn has unread data: %d bytes, closing it", cn.rd.Buffered())
396+
repl, err := cn.rd.ReadReply()
397+
internal.Logger.Printf(ctx, "Data: %v, ERR: %v", repl, err)
396398
p.Remove(ctx, cn, BadConnError{})
397399
return
398400
}

push/processor.go

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5252
return nil
5353
}
5454

55-
for {
55+
for rd.Buffered() > 0 {
5656
// Check if there's data available to read
5757
replyType, err := rd.PeekReplyType()
5858
if err != nil {
@@ -130,36 +130,37 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
130130
// This avoids unnecessary buffer scanning overhead.
131131
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error {
132132
// read and discard all push notifications
133-
if rd != nil {
134-
for {
135-
replyType, err := rd.PeekReplyType()
136-
if err != nil {
137-
// No more data available or error reading
138-
break
139-
}
133+
if rd == nil {
134+
return nil
135+
}
136+
for rd.Buffered() > 0 {
137+
replyType, err := rd.PeekReplyType()
138+
if err != nil {
139+
// No more data available or error reading
140+
break
141+
}
140142

141-
// Only process push notifications (arrays starting with >)
142-
if replyType != proto.RespPush {
143-
break
144-
}
145-
// see if we should skip this notification
146-
notificationName, err := rd.PeekPushNotificationName()
147-
if err != nil {
143+
// Only process push notifications (arrays starting with >)
144+
if replyType != proto.RespPush {
145+
break
146+
}
147+
// see if we should skip this notification
148+
notificationName, err := rd.PeekPushNotificationName()
149+
if err != nil {
150+
break
151+
}
152+
if shouldSkipNotification(notificationName) {
153+
// discard the notification
154+
if err := rd.DiscardNext(); err != nil {
148155
break
149156
}
150-
if shouldSkipNotification(notificationName) {
151-
// discard the notification
152-
if err := rd.DiscardNext(); err != nil {
153-
break
154-
}
155-
continue
156-
}
157+
continue
158+
}
157159

158-
// Read the push notification
159-
_, err = rd.ReadReply()
160-
if err != nil {
161-
return nil
162-
}
160+
// Read the push notification
161+
_, err = rd.ReadReply()
162+
if err != nil {
163+
return nil
163164
}
164165
}
165166
return nil
@@ -170,7 +171,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
170171
func shouldSkipNotification(notificationType string) bool {
171172
switch notificationType {
172173
// Pub/Sub notifications - handled by pub/sub system
173-
case "message", // Regular pub/sub message
174+
case "message", // Regular pub/sub message
174175
"pmessage", // Pattern pub/sub message
175176
"subscribe", // Subscription confirmation
176177
"unsubscribe", // Unsubscription confirmation

0 commit comments

Comments
 (0)