Skip to content

[wip] resp3 notification handlers #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Conn struct {
createdAt time.Time

onClose func() error

// Push notification processor for handling push notifications on this connection
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

func NewConn(netConn net.Conn) *Conn {
Expand Down
53 changes: 48 additions & 5 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
)

var (
Expand Down Expand Up @@ -71,6 +72,11 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// Push notification processor for connections
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -228,6 +234,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {

cn := NewConn(netConn)
cn.pooled = pooled

// Set push notification processor if available
if p.cfg.PushNotificationProcessor != nil {
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
}

return cn, nil
}

Expand Down Expand Up @@ -377,9 +389,24 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
// Check if this might be push notification data
if cn.PushNotificationProcessor != nil {
// Try to process pending push notifications before discarding connection
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
if err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
}
// Check again if there's still unread data after processing push notifications
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
p.Remove(ctx, cn, BadConnError{})
return
}
} else {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
}

if !cn.pooled {
Expand Down Expand Up @@ -523,8 +550,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
return false
}

if connCheck(cn.netConn) != nil {
return false
// Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data and we have push notification support,
// it might be push notifications
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil {
// Try to process any pending push notifications
ctx := context.Background()
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)
return false
}
// Check again after processing push notifications
if connCheck(cn.netConn) != nil {
return false
}
} else {
return false
}
}

cn.SetUsedAt(now)
Expand Down
17 changes: 17 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,21 @@ type Options struct {
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
UnstableResp3 bool

// PushNotifications enables general push notification processing.
// When enabled, the client will process RESP3 push notifications and
// route them to registered handlers.
//
// For RESP3 connections (Protocol: 3), push notifications are automatically enabled.
// To disable push notifications for RESP3, use Protocol: 2 instead.
// For RESP2 connections, push notifications are not available.
//
// default: automatically enabled for RESP3, disabled for RESP2
PushNotifications bool

// PushNotificationProcessor is the processor for handling push notifications.
// If nil, a default processor will be created when PushNotifications is enabled.
PushNotificationProcessor PushNotificationProcessorInterface
}

func (opt *Options) init() {
Expand Down Expand Up @@ -592,5 +607,7 @@ func newConnPool(
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
// Pass push notification processor for connection initialization
PushNotificationProcessor: opt.PushNotificationProcessor,
})
}
39 changes: 38 additions & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ type PubSub struct {
chOnce sync.Once
msgCh *channel
allCh *channel

// Push notification processor for handling generic push notifications
pushProcessor PushNotificationProcessorInterface
}

func (c *PubSub) init() {
c.exit = make(chan struct{})
}

// SetPushNotificationProcessor sets the push notification processor for handling
// generic push notifications received on this PubSub connection.
func (c *PubSub) SetPushNotificationProcessor(processor PushNotificationProcessorInterface) {
c.pushProcessor = processor
}

func (c *PubSub) String() string {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -367,6 +376,18 @@ func (p *Pong) String() string {
return "Pong"
}

// PushNotificationMessage represents a generic push notification received on a PubSub connection.
type PushNotificationMessage struct {
// Command is the push notification command (e.g., "MOVING", "CUSTOM_EVENT").
Command string
// Args are the arguments following the command.
Args []interface{}
}

func (m *PushNotificationMessage) String() string {
return fmt.Sprintf("push: %s", m.Command)
}

func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
switch reply := reply.(type) {
case string:
Expand Down Expand Up @@ -413,6 +434,19 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
Payload: reply[1].(string),
}, nil
default:
// Try to handle as generic push notification
ctx := c.getContext()
registry := c.pushProcessor.GetRegistry()
if registry != nil {
handled := registry.HandleNotification(ctx, reply)
if handled {
// Return a special message type to indicate it was handled
return &PushNotificationMessage{
Command: kind,
Args: reply[1:],
}, nil
}
}
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
}
default:
Expand Down Expand Up @@ -658,6 +692,9 @@ func (c *channel) initMsgChan() {
// Ignore.
case *Pong:
// Ignore.
case *PushNotificationMessage:
// Ignore push notifications in message-only channel
// They are already handled by the push notification processor
case *Message:
timer.Reset(c.chanSendTimeout)
select {
Expand Down Expand Up @@ -712,7 +749,7 @@ func (c *channel) initAllChan() {
switch msg := msg.(type) {
case *Pong:
// Ignore.
case *Subscription, *Message:
case *Subscription, *Message, *PushNotificationMessage:
timer.Reset(c.chanSendTimeout)
select {
case c.allCh <- msg:
Expand Down
Loading
Loading