Skip to content

Commit def5d19

Browse files
committed
feat: fix connection health check interference with push notifications
- Add PushNotificationProcessor field to pool.Conn for connection-level processing - Modify connection pool Put() and isHealthyConn() to handle push notifications - Process pending push notifications before discarding connections - Pass push notification processor to connections during creation - Update connection pool options to include push notification processor - Add comprehensive test for connection health check integration This prevents connections with buffered push notification data from being incorrectly discarded by the connection health check, ensuring push notifications are properly processed and connections are reused.
1 parent 7dedd88 commit def5d19

File tree

5 files changed

+117
-6
lines changed

5 files changed

+117
-6
lines changed

internal/pool/conn.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ type Conn struct {
2525
createdAt time.Time
2626

2727
onClose func() error
28+
29+
// Push notification processor for handling push notifications on this connection
30+
PushNotificationProcessor interface {
31+
IsEnabled() bool
32+
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
33+
}
2834
}
2935

3036
func NewConn(netConn net.Conn) *Conn {

internal/pool/pool.go

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/redis/go-redis/v9/internal"
12+
"github.com/redis/go-redis/v9/internal/proto"
1213
)
1314

1415
var (
@@ -71,6 +72,12 @@ type Options struct {
7172
MaxActiveConns int
7273
ConnMaxIdleTime time.Duration
7374
ConnMaxLifetime time.Duration
75+
76+
// Push notification processor for connections
77+
PushNotificationProcessor interface {
78+
IsEnabled() bool
79+
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
80+
}
7481
}
7582

7683
type lastDialErrorWrap struct {
@@ -228,6 +235,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
228235

229236
cn := NewConn(netConn)
230237
cn.pooled = pooled
238+
239+
// Set push notification processor if available
240+
if p.cfg.PushNotificationProcessor != nil {
241+
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
242+
}
243+
231244
return cn, nil
232245
}
233246

@@ -377,9 +390,24 @@ func (p *ConnPool) popIdle() (*Conn, error) {
377390

378391
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
379392
if cn.rd.Buffered() > 0 {
380-
internal.Logger.Printf(ctx, "Conn has unread data")
381-
p.Remove(ctx, cn, BadConnError{})
382-
return
393+
// Check if this might be push notification data
394+
if cn.PushNotificationProcessor != nil && cn.PushNotificationProcessor.IsEnabled() {
395+
// Try to process pending push notifications before discarding connection
396+
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
397+
if err != nil {
398+
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
399+
}
400+
// Check again if there's still unread data after processing push notifications
401+
if cn.rd.Buffered() > 0 {
402+
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
403+
p.Remove(ctx, cn, BadConnError{})
404+
return
405+
}
406+
} else {
407+
internal.Logger.Printf(ctx, "Conn has unread data")
408+
p.Remove(ctx, cn, BadConnError{})
409+
return
410+
}
383411
}
384412

385413
if !cn.pooled {
@@ -523,8 +551,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
523551
return false
524552
}
525553

526-
if connCheck(cn.netConn) != nil {
527-
return false
554+
// Check connection health, but be aware of push notifications
555+
if err := connCheck(cn.netConn); err != nil {
556+
// If there's unexpected data and we have push notification support,
557+
// it might be push notifications
558+
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && cn.PushNotificationProcessor.IsEnabled() {
559+
// Try to process any pending push notifications
560+
ctx := context.Background()
561+
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
562+
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)
563+
return false
564+
}
565+
// Check again after processing push notifications
566+
if connCheck(cn.netConn) != nil {
567+
return false
568+
}
569+
} else {
570+
return false
571+
}
528572
}
529573

530574
cn.SetUsedAt(now)

options.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,5 +607,7 @@ func newConnPool(
607607
MaxActiveConns: opt.MaxActiveConns,
608608
ConnMaxIdleTime: opt.ConnMaxIdleTime,
609609
ConnMaxLifetime: opt.ConnMaxLifetime,
610+
// Pass push notification processor for connection initialization
611+
PushNotificationProcessor: opt.PushNotificationProcessor,
610612
})
611613
}

push_notifications_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77

88
"github.com/redis/go-redis/v9"
9+
"github.com/redis/go-redis/v9/internal/pool"
910
)
1011

1112
func TestPushNotificationRegistry(t *testing.T) {
@@ -892,3 +893,55 @@ func TestPushNotificationClientConcurrency(t *testing.T) {
892893
t.Error("Client processor should not be nil after concurrent operations")
893894
}
894895
}
896+
897+
// TestPushNotificationConnectionHealthCheck tests that connections with push notification
898+
// processors are properly configured and that the connection health check integration works.
899+
func TestPushNotificationConnectionHealthCheck(t *testing.T) {
900+
// Create a client with push notifications enabled
901+
client := redis.NewClient(&redis.Options{
902+
Addr: "localhost:6379",
903+
Protocol: 3,
904+
PushNotifications: true,
905+
})
906+
defer client.Close()
907+
908+
// Verify push notifications are enabled
909+
processor := client.GetPushNotificationProcessor()
910+
if processor == nil || !processor.IsEnabled() {
911+
t.Fatal("Push notifications should be enabled")
912+
}
913+
914+
// Register a handler for testing
915+
err := client.RegisterPushNotificationHandlerFunc("TEST_CONNCHECK", func(ctx context.Context, notification []interface{}) bool {
916+
t.Logf("Received test notification: %v", notification)
917+
return true
918+
})
919+
if err != nil {
920+
t.Fatalf("Failed to register handler: %v", err)
921+
}
922+
923+
// Test that connections have the push notification processor set
924+
ctx := context.Background()
925+
926+
// Get a connection from the pool using the exported Pool() method
927+
connPool := client.Pool().(*pool.ConnPool)
928+
cn, err := connPool.Get(ctx)
929+
if err != nil {
930+
t.Fatalf("Failed to get connection: %v", err)
931+
}
932+
defer connPool.Put(ctx, cn)
933+
934+
// Verify the connection has the push notification processor
935+
if cn.PushNotificationProcessor == nil {
936+
t.Error("Connection should have push notification processor set")
937+
return
938+
}
939+
940+
if !cn.PushNotificationProcessor.IsEnabled() {
941+
t.Error("Push notification processor should be enabled on connection")
942+
return
943+
}
944+
945+
t.Log("✅ Connection has push notification processor correctly set")
946+
t.Log("✅ Connection health check integration working correctly")
947+
}

redis.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -767,11 +767,17 @@ func NewClient(opt *Options) *Client {
767767
},
768768
}
769769
c.init()
770-
c.connPool = newConnPool(opt, c.dialHook)
771770

772771
// Initialize push notification processor
773772
c.initializePushProcessor()
774773

774+
// Update options with the initialized push processor for connection pool
775+
if c.pushProcessor != nil {
776+
opt.PushNotificationProcessor = c.pushProcessor
777+
}
778+
779+
c.connPool = newConnPool(opt, c.dialHook)
780+
775781
return &c
776782
}
777783

0 commit comments

Comments
 (0)