-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[wip] resp3 notification handlers #3418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ndyakov
wants to merge
30
commits into
master
Choose a base branch
from
ndyakov/CAE-1088-resp3-notification-handlers
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,476
−11
Open
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov 1ff0ded
feat: enforce single handler per notification type
ndyakov e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov d7fbe18
feat: fix connection health check interference with push notifications
ndyakov 1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov 4747610
test: add comprehensive unit tests for 100% coverage
ndyakov 70231ae
refactor: simplify push notification interface
ndyakov 958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov 79f6df2
remove: push-notification-demo
ndyakov c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov 8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov a2de263
fix: copy push notification processor to transaction baseClient
ndyakov ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov 03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov 9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov ada72ce
refactor: move push notification logic to pusnotif package
ndyakov 91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov e31987f
Fixes tests:
ndyakov 075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov f7948b5
fix: address pr review
ndyakov 3473c1e
fix: simplify api
ndyakov d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov b6e712b
feat: add proactive push notification processing to WithReader
ndyakov f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package pushnotif | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/redis/go-redis/v9/internal/proto" | ||
) | ||
|
||
// Processor handles push notifications with a registry of handlers. | ||
type Processor struct { | ||
registry *Registry | ||
} | ||
|
||
// NewProcessor creates a new push notification processor. | ||
func NewProcessor() *Processor { | ||
return &Processor{ | ||
registry: NewRegistry(), | ||
} | ||
} | ||
|
||
// GetHandler returns the handler for a specific push notification name. | ||
// Returns nil if no handler is registered for the given name. | ||
func (p *Processor) GetHandler(pushNotificationName string) Handler { | ||
return p.registry.GetHandler(pushNotificationName) | ||
} | ||
|
||
// RegisterHandler registers a handler for a specific push notification name. | ||
// Returns an error if a handler is already registered for this push notification name. | ||
// If protected is true, the handler cannot be unregistered. | ||
func (p *Processor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { | ||
return p.registry.RegisterHandler(pushNotificationName, handler, protected) | ||
} | ||
|
||
// UnregisterHandler removes a handler for a specific push notification name. | ||
// Returns an error if the handler is protected or doesn't exist. | ||
func (p *Processor) UnregisterHandler(pushNotificationName string) error { | ||
return p.registry.UnregisterHandler(pushNotificationName) | ||
} | ||
|
||
// GetRegistryForTesting returns the push notification registry for testing. | ||
// This method should only be used by tests. | ||
func (p *Processor) GetRegistryForTesting() *Registry { | ||
return p.registry | ||
} | ||
|
||
// ProcessPendingNotifications checks for and processes any pending push notifications. | ||
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { | ||
// Check for nil reader | ||
if rd == nil { | ||
return nil | ||
} | ||
|
||
// Check if there are any buffered bytes that might contain push notifications | ||
if rd.Buffered() == 0 { | ||
return nil | ||
} | ||
|
||
// Process all available push notifications | ||
for { | ||
// Peek at the next reply type to see if it's a push notification | ||
replyType, err := rd.PeekReplyType() | ||
if err != nil { | ||
// No more data available or error reading | ||
break | ||
} | ||
|
||
// Push notifications use RespPush type in RESP3 | ||
if replyType != proto.RespPush { | ||
break | ||
} | ||
|
||
// Try to read the push notification | ||
reply, err := rd.ReadReply() | ||
if err != nil { | ||
return fmt.Errorf("failed to read push notification: %w", err) | ||
} | ||
|
||
// Convert to slice of interfaces | ||
notification, ok := reply.([]interface{}) | ||
if !ok { | ||
continue | ||
} | ||
|
||
// Handle the notification | ||
p.registry.HandleNotification(ctx, notification) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// VoidProcessor discards all push notifications without processing them. | ||
type VoidProcessor struct{} | ||
|
||
// NewVoidProcessor creates a new void push notification processor. | ||
func NewVoidProcessor() *VoidProcessor { | ||
return &VoidProcessor{} | ||
} | ||
|
||
// GetHandler returns nil for void processor since it doesn't maintain handlers. | ||
func (v *VoidProcessor) GetHandler(pushNotificationName string) Handler { | ||
return nil | ||
} | ||
|
||
// RegisterHandler returns an error for void processor since it doesn't maintain handlers. | ||
// This helps developers identify when they're trying to register handlers on disabled push notifications. | ||
func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { | ||
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) | ||
} | ||
|
||
// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. | ||
// This method should only be used by tests. | ||
func (v *VoidProcessor) GetRegistryForTesting() *Registry { | ||
return nil | ||
} | ||
|
||
// ProcessPendingNotifications reads and discards any pending push notifications. | ||
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { | ||
// Check for nil reader | ||
if rd == nil { | ||
return nil | ||
} | ||
|
||
// Read and discard any pending push notifications to clean the buffer | ||
for { | ||
// Peek at the next reply type to see if it's a push notification | ||
replyType, err := rd.PeekReplyType() | ||
if err != nil { | ||
// No more data available or error reading | ||
break | ||
} | ||
|
||
// Push notifications use RespPush type in RESP3 | ||
if replyType != proto.RespPush { | ||
break | ||
} | ||
|
||
// Read and discard the push notification | ||
_, err = rd.ReadReply() | ||
if err != nil { | ||
return fmt.Errorf("failed to read push notification for discarding: %w", err) | ||
} | ||
|
||
// Notification discarded - continue to next one | ||
} | ||
|
||
return nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package pushnotif | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
) | ||
|
||
// Registry manages push notification handlers. | ||
type Registry struct { | ||
mu sync.RWMutex | ||
handlers map[string]Handler | ||
protected map[string]bool | ||
} | ||
|
||
// NewRegistry creates a new push notification registry. | ||
func NewRegistry() *Registry { | ||
return &Registry{ | ||
handlers: make(map[string]Handler), | ||
protected: make(map[string]bool), | ||
} | ||
} | ||
|
||
// RegisterHandler registers a handler for a specific push notification name. | ||
// Returns an error if a handler is already registered for this push notification name. | ||
// If protected is true, the handler cannot be unregistered. | ||
func (r *Registry) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
if _, exists := r.handlers[pushNotificationName]; exists { | ||
return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName) | ||
} | ||
|
||
r.handlers[pushNotificationName] = handler | ||
r.protected[pushNotificationName] = protected | ||
return nil | ||
} | ||
|
||
// UnregisterHandler removes a handler for a specific push notification name. | ||
// Returns an error if the handler is protected or doesn't exist. | ||
func (r *Registry) UnregisterHandler(pushNotificationName string) error { | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
_, exists := r.handlers[pushNotificationName] | ||
if !exists { | ||
return fmt.Errorf("no handler registered for push notification: %s", pushNotificationName) | ||
} | ||
|
||
if r.protected[pushNotificationName] { | ||
return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) | ||
} | ||
|
||
delete(r.handlers, pushNotificationName) | ||
delete(r.protected, pushNotificationName) | ||
return nil | ||
} | ||
|
||
// GetHandler returns the handler for a specific push notification name. | ||
// Returns nil if no handler is registered for the given name. | ||
func (r *Registry) GetHandler(pushNotificationName string) Handler { | ||
r.mu.RLock() | ||
defer r.mu.RUnlock() | ||
|
||
handler, exists := r.handlers[pushNotificationName] | ||
if !exists { | ||
return nil | ||
} | ||
return handler | ||
} | ||
|
||
// GetRegisteredPushNotificationNames returns a list of all registered push notification names. | ||
func (r *Registry) GetRegisteredPushNotificationNames() []string { | ||
r.mu.RLock() | ||
defer r.mu.RUnlock() | ||
|
||
names := make([]string, 0, len(r.handlers)) | ||
for name := range r.handlers { | ||
names = append(names, name) | ||
} | ||
return names | ||
} | ||
|
||
// HandleNotification attempts to handle a push notification using registered handlers. | ||
// Returns true if a handler was found and successfully processed the notification. | ||
func (r *Registry) HandleNotification(ctx context.Context, notification []interface{}) bool { | ||
if len(notification) == 0 { | ||
return false | ||
} | ||
|
||
// Extract the notification type (first element) | ||
notificationType, ok := notification[0].(string) | ||
if !ok { | ||
return false | ||
} | ||
|
||
// Get the handler for this notification type | ||
handler := r.GetHandler(notificationType) | ||
if handler == nil { | ||
return false | ||
} | ||
|
||
// Handle the notification | ||
return handler.HandlePushNotification(ctx, notification) | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package pushnotif | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/redis/go-redis/v9/internal/proto" | ||
) | ||
|
||
// Handler defines the interface for push notification handlers. | ||
type Handler interface { | ||
// HandlePushNotification processes a push notification. | ||
// Returns true if the notification was handled, false otherwise. | ||
HandlePushNotification(ctx context.Context, notification []interface{}) bool | ||
} | ||
|
||
// ProcessorInterface defines the interface for push notification processors. | ||
type ProcessorInterface interface { | ||
GetHandler(pushNotificationName string) Handler | ||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error | ||
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error | ||
} | ||
|
||
// RegistryInterface defines the interface for push notification registries. | ||
type RegistryInterface interface { | ||
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error | ||
UnregisterHandler(pushNotificationName string) error | ||
GetHandler(pushNotificationName string) Handler | ||
GetRegisteredPushNotificationNames() []string | ||
HandleNotification(ctx context.Context, notification []interface{}) bool | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.