Skip to content

Commit e96c873

Browse files
committed
fix(peek): non-blocking peek
1 parent 604c8e3 commit e96c873

File tree

4 files changed

+35
-9
lines changed

4 files changed

+35
-9
lines changed

internal/pool/conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func (cn *Conn) SetNetConn(netConn net.Conn) {
5858
cn.bw.Reset(netConn)
5959
}
6060

61+
func (cn *Conn) GetNetConn() net.Conn {
62+
return cn.netConn
63+
}
64+
6165
func (cn *Conn) Write(b []byte) (int, error) {
6266
return cn.netConn.Write(b)
6367
}

internal/pool/pool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
384384
if cn.rd.Buffered() > 0 {
385385
// Check if this might be push notification data
386386
if p.cfg.Protocol == 3 {
387+
// we know that there is something in the buffer, so peek at the next reply type without
388+
// the potential to block
387389
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
388390
// For push notifications, we allow some buffered data
389391
// The client will process these notifications before using the connection
@@ -546,6 +548,8 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
546548
// However, push notification processing is now handled by the client
547549
// before WithReader to ensure proper context is available to handlers
548550
if err == errUnexpectedRead && p.cfg.Protocol == 3 {
551+
// we know that there is something in the buffer, so peek at the next reply type without
552+
// the potential to block
549553
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
550554
// For RESP3 connections with push notifications, we allow some buffered data
551555
// The client will process these notifications before using the connection

push/processor.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package push
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/redis/go-redis/v9/internal"
78
"github.com/redis/go-redis/v9/internal/proto"
@@ -51,8 +52,19 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5152
if rd == nil {
5253
return nil
5354
}
55+
conn := handlerCtx.Conn
56+
if conn == nil {
57+
return nil
58+
}
59+
netConn := handlerCtx.Conn.GetNetConn()
60+
if netConn == nil {
61+
return nil
62+
}
5463

5564
for {
65+
// Set a short read deadline to check for available data
66+
// otherwise we may block on Peek if there is no data available
67+
netConn.SetReadDeadline(time.Now().Add(1))
5668
// Check if there's data available to read
5769
replyType, err := rd.PeekReplyType()
5870
if err != nil {
@@ -104,6 +116,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
104116
}
105117
}
106118

119+
netConn.SetReadDeadline(time.Time{})
107120
return nil
108121
}
109122

@@ -133,12 +146,23 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
133146
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
134147
// are only available in RESP3 and this processor is used for RESP2 connections.
135148
// This avoids unnecessary buffer scanning overhead.
136-
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error {
149+
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error {
137150
// read and discard all push notifications
138151
if rd == nil {
139152
return nil
140153
}
154+
conn := handlerCtx.Conn
155+
if conn == nil {
156+
return nil
157+
}
158+
netConn := handlerCtx.Conn.GetNetConn()
159+
if netConn == nil {
160+
return nil
161+
}
141162
for {
163+
// Set a short read deadline to check for available data
164+
netConn.SetReadDeadline(time.Now().Add(1))
165+
// Check if there's data available to read
142166
replyType, err := rd.PeekReplyType()
143167
if err != nil {
144168
// No more data available or error reading
@@ -166,6 +190,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
166190
return nil
167191
}
168192
}
193+
netConn.SetReadDeadline(time.Time{})
169194
return nil
170195
}
171196

@@ -174,7 +199,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
174199
func willHandleNotificationInClient(notificationType string) bool {
175200
switch notificationType {
176201
// Pub/Sub notifications - handled by pub/sub system
177-
case "message", // Regular pub/sub message
202+
case "message", // Regular pub/sub message
178203
"pmessage", // Pattern pub/sub message
179204
"subscribe", // Subscription confirmation
180205
"unsubscribe", // Unsubscription confirmation

redis.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -733,13 +733,6 @@ func (c *baseClient) txPipelineProcessCmds(
733733
}
734734

735735
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
736-
// To be sure there are no buffered push notifications, we process them before reading the reply
737-
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
738-
// Log the error but don't fail the command execution
739-
// Push notification processing errors shouldn't break normal Redis operations
740-
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
741-
}
742-
743736
statusCmd := cmds[0].(*StatusCmd)
744737
// Trim multi and exec.
745738
trimmedCmds := cmds[1 : len(cmds)-1]

0 commit comments

Comments
 (0)