Skip to content

Commit b23f43c

Browse files
committed
fix(peek): non-blocking peek
1 parent 604c8e3 commit b23f43c

File tree

4 files changed

+41
-10
lines changed

4 files changed

+41
-10
lines changed

internal/pool/conn.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func (cn *Conn) SetNetConn(netConn net.Conn) {
5858
cn.bw.Reset(netConn)
5959
}
6060

61+
func (cn *Conn) GetNetConn() net.Conn {
62+
return cn.netConn
63+
}
64+
6165
func (cn *Conn) Write(b []byte) (int, error) {
6266
return cn.netConn.Write(b)
6367
}

internal/pool/pool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,8 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
384384
if cn.rd.Buffered() > 0 {
385385
// Check if this might be push notification data
386386
if p.cfg.Protocol == 3 {
387+
// we know that there is something in the buffer, so peek at the next reply type without
388+
// the potential to block
387389
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
388390
// For push notifications, we allow some buffered data
389391
// The client will process these notifications before using the connection
@@ -546,6 +548,8 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
546548
// However, push notification processing is now handled by the client
547549
// before WithReader to ensure proper context is available to handlers
548550
if err == errUnexpectedRead && p.cfg.Protocol == 3 {
551+
// we know that there is something in the buffer, so peek at the next reply type without
552+
// the potential to block
549553
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
550554
// For RESP3 connections with push notifications, we allow some buffered data
551555
// The client will process these notifications before using the connection

push/processor.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package push
22

33
import (
44
"context"
5+
"time"
56

67
"github.com/redis/go-redis/v9/internal"
78
"github.com/redis/go-redis/v9/internal/proto"
@@ -51,8 +52,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
5152
if rd == nil {
5253
return nil
5354
}
55+
conn := handlerCtx.Conn
56+
if conn == nil {
57+
return nil
58+
}
59+
netConn := handlerCtx.Conn.GetNetConn()
60+
if netConn == nil {
61+
return nil
62+
}
5463

5564
for {
65+
// Set a short read deadline to check for available data
66+
// otherwise we may block on Peek if there is no data available
67+
err := netConn.SetReadDeadline(time.Now().Add(1))
68+
if err != nil {
69+
return err
70+
}
71+
5672
// Check if there's data available to read
5773
replyType, err := rd.PeekReplyType()
5874
if err != nil {
@@ -104,7 +120,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
104120
}
105121
}
106122

107-
return nil
123+
return netConn.SetReadDeadline(time.Time{})
108124
}
109125

110126
// VoidProcessor discards all push notifications without processing them
@@ -133,12 +149,26 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
133149
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
134150
// are only available in RESP3 and this processor is used for RESP2 connections.
135151
// This avoids unnecessary buffer scanning overhead.
136-
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error {
152+
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error {
137153
// read and discard all push notifications
138154
if rd == nil {
139155
return nil
140156
}
157+
conn := handlerCtx.Conn
158+
if conn == nil {
159+
return nil
160+
}
161+
netConn := handlerCtx.Conn.GetNetConn()
162+
if netConn == nil {
163+
return nil
164+
}
141165
for {
166+
// Set a short read deadline to check for available data
167+
err := netConn.SetReadDeadline(time.Now().Add(1))
168+
if err != nil {
169+
return err
170+
}
171+
// Check if there's data available to read
142172
replyType, err := rd.PeekReplyType()
143173
if err != nil {
144174
// No more data available or error reading
@@ -166,7 +196,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific
166196
return nil
167197
}
168198
}
169-
return nil
199+
return netConn.SetReadDeadline(time.Time{})
170200
}
171201

172202
// willHandleNotificationInClient checks if a notification type should be ignored by the push notification

redis.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -733,13 +733,6 @@ func (c *baseClient) txPipelineProcessCmds(
733733
}
734734

735735
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
736-
// To be sure there are no buffered push notifications, we process them before reading the reply
737-
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
738-
// Log the error but don't fail the command execution
739-
// Push notification processing errors shouldn't break normal Redis operations
740-
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
741-
}
742-
743736
statusCmd := cmds[0].(*StatusCmd)
744737
// Trim multi and exec.
745738
trimmedCmds := cmds[1 : len(cmds)-1]

0 commit comments

Comments
 (0)