Skip to content

[wip] resp3 notification handlers #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type Conn struct {
createdAt time.Time

onClose func() error

// Push notification processor for handling push notifications on this connection
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

func NewConn(netConn net.Conn) *Conn {
Expand Down
53 changes: 48 additions & 5 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
)

var (
Expand Down Expand Up @@ -71,6 +72,11 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// Push notification processor for connections
PushNotificationProcessor interface {
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
}
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -228,6 +234,12 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {

cn := NewConn(netConn)
cn.pooled = pooled

// Set push notification processor if available
if p.cfg.PushNotificationProcessor != nil {
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
}

return cn, nil
}

Expand Down Expand Up @@ -377,9 +389,24 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
// Check if this might be push notification data
if cn.PushNotificationProcessor != nil {
// Try to process pending push notifications before discarding connection
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
if err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err)
}
// Check again if there's still unread data after processing push notifications
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
p.Remove(ctx, cn, BadConnError{})
return
}
} else {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
}
}

if !cn.pooled {
Expand Down Expand Up @@ -523,8 +550,24 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
return false
}

if connCheck(cn.netConn) != nil {
return false
// Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data and we have push notification support,
// it might be push notifications
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil {
// Try to process any pending push notifications
ctx := context.Background()
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr)
return false
}
// Check again after processing push notifications
if connCheck(cn.netConn) != nil {
return false
}
} else {
return false
}
}

cn.SetUsedAt(now)
Expand Down
148 changes: 148 additions & 0 deletions internal/pushnotif/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package pushnotif

import (
"context"
"fmt"

"github.com/redis/go-redis/v9/internal/proto"
)

// Processor handles push notifications with a registry of handlers.
type Processor struct {
registry *Registry
}

// NewProcessor creates a new push notification processor.
func NewProcessor() *Processor {
return &Processor{
registry: NewRegistry(),
}
}

// GetHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name.
func (p *Processor) GetHandler(pushNotificationName string) Handler {
return p.registry.GetHandler(pushNotificationName)
}

// RegisterHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (p *Processor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error {
return p.registry.RegisterHandler(pushNotificationName, handler, protected)
}

// UnregisterHandler removes a handler for a specific push notification name.
// Returns an error if the handler is protected or doesn't exist.
func (p *Processor) UnregisterHandler(pushNotificationName string) error {
return p.registry.UnregisterHandler(pushNotificationName)
}

// GetRegistryForTesting returns the push notification registry for testing.
// This method should only be used by tests.
func (p *Processor) GetRegistryForTesting() *Registry {
return p.registry
}

// ProcessPendingNotifications checks for and processes any pending push notifications.
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
// Check for nil reader
if rd == nil {
return nil
}

// Check if there are any buffered bytes that might contain push notifications
if rd.Buffered() == 0 {
return nil
}

// Process all available push notifications
for {
// Peek at the next reply type to see if it's a push notification
replyType, err := rd.PeekReplyType()
if err != nil {
// No more data available or error reading
break
}

// Push notifications use RespPush type in RESP3
if replyType != proto.RespPush {
break
}

// Try to read the push notification
reply, err := rd.ReadReply()
if err != nil {
return fmt.Errorf("failed to read push notification: %w", err)
}

// Convert to slice of interfaces
notification, ok := reply.([]interface{})
if !ok {
continue
}

// Handle the notification
p.registry.HandleNotification(ctx, notification)
}

return nil
}

// VoidProcessor discards all push notifications without processing them.
type VoidProcessor struct{}

// NewVoidProcessor creates a new void push notification processor.
func NewVoidProcessor() *VoidProcessor {
return &VoidProcessor{}
}

// GetHandler returns nil for void processor since it doesn't maintain handlers.
func (v *VoidProcessor) GetHandler(pushNotificationName string) Handler {
return nil
}

// RegisterHandler returns an error for void processor since it doesn't maintain handlers.
// This helps developers identify when they're trying to register handlers on disabled push notifications.
func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error {
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}

// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers.
// This method should only be used by tests.
func (v *VoidProcessor) GetRegistryForTesting() *Registry {
return nil
}

// ProcessPendingNotifications reads and discards any pending push notifications.
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
// Check for nil reader
if rd == nil {
return nil
}

// Read and discard any pending push notifications to clean the buffer
for {
// Peek at the next reply type to see if it's a push notification
replyType, err := rd.PeekReplyType()
if err != nil {
// No more data available or error reading
break
}

// Push notifications use RespPush type in RESP3
if replyType != proto.RespPush {
break
}

// Read and discard the push notification
_, err = rd.ReadReply()
if err != nil {
return fmt.Errorf("failed to read push notification for discarding: %w", err)
}

// Notification discarded - continue to next one
}

return nil
}
106 changes: 106 additions & 0 deletions internal/pushnotif/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package pushnotif

import (
"context"
"fmt"
"sync"
)

// Registry manages push notification handlers.
type Registry struct {
mu sync.RWMutex
handlers map[string]Handler
protected map[string]bool
}

// NewRegistry creates a new push notification registry.
func NewRegistry() *Registry {
return &Registry{
handlers: make(map[string]Handler),
protected: make(map[string]bool),
}
}

// RegisterHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (r *Registry) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error {
r.mu.Lock()
defer r.mu.Unlock()

if _, exists := r.handlers[pushNotificationName]; exists {
return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName)
}

r.handlers[pushNotificationName] = handler
r.protected[pushNotificationName] = protected
return nil
}

// UnregisterHandler removes a handler for a specific push notification name.
// Returns an error if the handler is protected or doesn't exist.
func (r *Registry) UnregisterHandler(pushNotificationName string) error {
r.mu.Lock()
defer r.mu.Unlock()

_, exists := r.handlers[pushNotificationName]
if !exists {
return fmt.Errorf("no handler registered for push notification: %s", pushNotificationName)
}

if r.protected[pushNotificationName] {
return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName)
}

delete(r.handlers, pushNotificationName)
delete(r.protected, pushNotificationName)
return nil
}

// GetHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name.
func (r *Registry) GetHandler(pushNotificationName string) Handler {
r.mu.RLock()
defer r.mu.RUnlock()

handler, exists := r.handlers[pushNotificationName]
if !exists {
return nil
}
return handler
}

// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
func (r *Registry) GetRegisteredPushNotificationNames() []string {
r.mu.RLock()
defer r.mu.RUnlock()

names := make([]string, 0, len(r.handlers))
for name := range r.handlers {
names = append(names, name)
}
return names
}

// HandleNotification attempts to handle a push notification using registered handlers.
// Returns true if a handler was found and successfully processed the notification.
func (r *Registry) HandleNotification(ctx context.Context, notification []interface{}) bool {
if len(notification) == 0 {
return false
}

// Extract the notification type (first element)
notificationType, ok := notification[0].(string)
if !ok {
return false
}

// Get the handler for this notification type
handler := r.GetHandler(notificationType)
if handler == nil {
return false
}

// Handle the notification
return handler.HandlePushNotification(ctx, notification)
}
30 changes: 30 additions & 0 deletions internal/pushnotif/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package pushnotif

import (
"context"

"github.com/redis/go-redis/v9/internal/proto"
)

// Handler defines the interface for push notification handlers.
type Handler interface {
// HandlePushNotification processes a push notification.
// Returns true if the notification was handled, false otherwise.
HandlePushNotification(ctx context.Context, notification []interface{}) bool
}

// ProcessorInterface defines the interface for push notification processors.
type ProcessorInterface interface {
GetHandler(pushNotificationName string) Handler
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error
}

// RegistryInterface defines the interface for push notification registries.
type RegistryInterface interface {
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error
UnregisterHandler(pushNotificationName string) error
GetHandler(pushNotificationName string) Handler
GetRegisteredPushNotificationNames() []string
HandleNotification(ctx context.Context, notification []interface{}) bool
}
Loading
Loading