Skip to content

Commit 8d2aa3c

Browse files
committed
feat: add general push notification system
- Add PushNotificationRegistry for managing notification handlers - Add PushNotificationProcessor for processing RESP3 push notifications - Add client methods for registering push notification handlers - Add PubSub integration for handling generic push notifications - Add comprehensive test suite with 100% coverage - Add push notification demo example This system allows handling any arbitrary RESP3 push notification with registered handlers, not just specific notification types.
1 parent 0decfdc commit 8d2aa3c

File tree

6 files changed

+1633
-2
lines changed

6 files changed

+1633
-2
lines changed
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
8+
"github.com/redis/go-redis/v9"
9+
)
10+
11+
func main() {
12+
fmt.Println("Redis Go Client - General Push Notification System Demo")
13+
fmt.Println("======================================================")
14+
15+
// Example 1: Basic push notification setup
16+
basicPushNotificationExample()
17+
18+
// Example 2: Custom push notification handlers
19+
customHandlersExample()
20+
21+
// Example 3: Global push notification handlers
22+
globalHandlersExample()
23+
24+
// Example 4: Custom push notifications
25+
customPushNotificationExample()
26+
27+
// Example 5: Multiple notification types
28+
multipleNotificationTypesExample()
29+
30+
// Example 6: Processor API demonstration
31+
demonstrateProcessorAPI()
32+
}
33+
34+
func basicPushNotificationExample() {
35+
fmt.Println("\n=== Basic Push Notification Example ===")
36+
37+
// Create a Redis client with push notifications enabled
38+
client := redis.NewClient(&redis.Options{
39+
Addr: "localhost:6379",
40+
Protocol: 3, // RESP3 required for push notifications
41+
PushNotifications: true, // Enable general push notification processing
42+
})
43+
defer client.Close()
44+
45+
// Register a handler for custom notifications
46+
client.RegisterPushNotificationHandlerFunc("CUSTOM_EVENT", func(ctx context.Context, notification []interface{}) bool {
47+
fmt.Printf("Received CUSTOM_EVENT: %v\n", notification)
48+
return true
49+
})
50+
51+
fmt.Println("✅ Push notifications enabled and handler registered")
52+
fmt.Println(" The client will now process any CUSTOM_EVENT push notifications")
53+
}
54+
55+
func customHandlersExample() {
56+
fmt.Println("\n=== Custom Push Notification Handlers Example ===")
57+
58+
client := redis.NewClient(&redis.Options{
59+
Addr: "localhost:6379",
60+
Protocol: 3,
61+
PushNotifications: true,
62+
})
63+
defer client.Close()
64+
65+
// Register handlers for different notification types
66+
client.RegisterPushNotificationHandlerFunc("USER_LOGIN", func(ctx context.Context, notification []interface{}) bool {
67+
if len(notification) >= 3 {
68+
username := notification[1]
69+
timestamp := notification[2]
70+
fmt.Printf("🔐 User login: %v at %v\n", username, timestamp)
71+
}
72+
return true
73+
})
74+
75+
client.RegisterPushNotificationHandlerFunc("CACHE_INVALIDATION", func(ctx context.Context, notification []interface{}) bool {
76+
if len(notification) >= 2 {
77+
cacheKey := notification[1]
78+
fmt.Printf("🗑️ Cache invalidated: %v\n", cacheKey)
79+
}
80+
return true
81+
})
82+
83+
client.RegisterPushNotificationHandlerFunc("SYSTEM_ALERT", func(ctx context.Context, notification []interface{}) bool {
84+
if len(notification) >= 3 {
85+
alertLevel := notification[1]
86+
message := notification[2]
87+
fmt.Printf("🚨 System alert [%v]: %v\n", alertLevel, message)
88+
}
89+
return true
90+
})
91+
92+
fmt.Println("✅ Multiple custom handlers registered:")
93+
fmt.Println(" - USER_LOGIN: Handles user authentication events")
94+
fmt.Println(" - CACHE_INVALIDATION: Handles cache invalidation events")
95+
fmt.Println(" - SYSTEM_ALERT: Handles system alert notifications")
96+
}
97+
98+
func globalHandlersExample() {
99+
fmt.Println("\n=== Global Push Notification Handler Example ===")
100+
101+
client := redis.NewClient(&redis.Options{
102+
Addr: "localhost:6379",
103+
Protocol: 3,
104+
PushNotifications: true,
105+
})
106+
defer client.Close()
107+
108+
// Register a global handler that receives ALL push notifications
109+
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
110+
if len(notification) > 0 {
111+
command := notification[0]
112+
fmt.Printf("📡 Global handler received: %v (args: %d)\n", command, len(notification)-1)
113+
}
114+
return true
115+
})
116+
117+
// Register specific handlers as well
118+
client.RegisterPushNotificationHandlerFunc("SPECIFIC_EVENT", func(ctx context.Context, notification []interface{}) bool {
119+
fmt.Printf("🎯 Specific handler for SPECIFIC_EVENT: %v\n", notification)
120+
return true
121+
})
122+
123+
fmt.Println("✅ Global and specific handlers registered:")
124+
fmt.Println(" - Global handler will receive ALL push notifications")
125+
fmt.Println(" - Specific handler will receive only SPECIFIC_EVENT notifications")
126+
fmt.Println(" - Both handlers will be called for SPECIFIC_EVENT notifications")
127+
}
128+
129+
func customPushNotificationExample() {
130+
fmt.Println("\n=== Custom Push Notifications Example ===")
131+
132+
// Create a client with custom push notifications
133+
client := redis.NewClient(&redis.Options{
134+
Addr: "localhost:6379",
135+
Protocol: 3, // RESP3 required
136+
PushNotifications: true, // Enable general push notifications
137+
})
138+
defer client.Close()
139+
140+
// Register custom handlers for application events
141+
client.RegisterPushNotificationHandlerFunc("APPLICATION_EVENT", func(ctx context.Context, notification []interface{}) bool {
142+
fmt.Printf("📱 Application event: %v\n", notification)
143+
return true
144+
})
145+
146+
// Register a global handler to monitor all notifications
147+
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
148+
if len(notification) > 0 {
149+
command := notification[0]
150+
switch command {
151+
case "MOVING", "MIGRATING", "MIGRATED":
152+
fmt.Printf("🔄 Cluster notification: %v\n", command)
153+
default:
154+
fmt.Printf("📨 Other notification: %v\n", command)
155+
}
156+
}
157+
return true
158+
})
159+
160+
fmt.Println("✅ Custom push notifications enabled:")
161+
fmt.Println(" - MOVING, MIGRATING, MIGRATED notifications → Cluster handlers")
162+
fmt.Println(" - APPLICATION_EVENT notifications → Custom handler")
163+
fmt.Println(" - All notifications → Global monitoring handler")
164+
}
165+
166+
func multipleNotificationTypesExample() {
167+
fmt.Println("\n=== Multiple Notification Types Example ===")
168+
169+
client := redis.NewClient(&redis.Options{
170+
Addr: "localhost:6379",
171+
Protocol: 3,
172+
PushNotifications: true,
173+
})
174+
defer client.Close()
175+
176+
// Register handlers for Redis built-in notification types
177+
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationPubSubMessage, func(ctx context.Context, notification []interface{}) bool {
178+
fmt.Printf("💬 Pub/Sub message: %v\n", notification)
179+
return true
180+
})
181+
182+
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationKeyspace, func(ctx context.Context, notification []interface{}) bool {
183+
fmt.Printf("🔑 Keyspace notification: %v\n", notification)
184+
return true
185+
})
186+
187+
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationKeyevent, func(ctx context.Context, notification []interface{}) bool {
188+
fmt.Printf("⚡ Key event notification: %v\n", notification)
189+
return true
190+
})
191+
192+
// Register handlers for cluster notifications
193+
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationMoving, func(ctx context.Context, notification []interface{}) bool {
194+
fmt.Printf("🚚 Cluster MOVING notification: %v\n", notification)
195+
return true
196+
})
197+
198+
// Register handlers for custom application notifications
199+
client.RegisterPushNotificationHandlerFunc("METRICS_UPDATE", func(ctx context.Context, notification []interface{}) bool {
200+
fmt.Printf("📊 Metrics update: %v\n", notification)
201+
return true
202+
})
203+
204+
client.RegisterPushNotificationHandlerFunc("CONFIG_CHANGE", func(ctx context.Context, notification []interface{}) bool {
205+
fmt.Printf("⚙️ Configuration change: %v\n", notification)
206+
return true
207+
})
208+
209+
fmt.Println("✅ Multiple notification type handlers registered:")
210+
fmt.Println(" Redis built-in notifications:")
211+
fmt.Printf(" - %s: Pub/Sub messages\n", redis.PushNotificationPubSubMessage)
212+
fmt.Printf(" - %s: Keyspace notifications\n", redis.PushNotificationKeyspace)
213+
fmt.Printf(" - %s: Key event notifications\n", redis.PushNotificationKeyevent)
214+
fmt.Println(" Cluster notifications:")
215+
fmt.Printf(" - %s: Cluster slot migration\n", redis.PushNotificationMoving)
216+
fmt.Println(" Custom application notifications:")
217+
fmt.Println(" - METRICS_UPDATE: Application metrics")
218+
fmt.Println(" - CONFIG_CHANGE: Configuration updates")
219+
}
220+
221+
func demonstrateProcessorAPI() {
222+
fmt.Println("\n=== Push Notification Processor API Example ===")
223+
224+
client := redis.NewClient(&redis.Options{
225+
Addr: "localhost:6379",
226+
Protocol: 3,
227+
PushNotifications: true,
228+
})
229+
defer client.Close()
230+
231+
// Get the push notification processor
232+
processor := client.GetPushNotificationProcessor()
233+
if processor == nil {
234+
log.Println("Push notification processor not available")
235+
return
236+
}
237+
238+
fmt.Printf("✅ Push notification processor status: enabled=%v\n", processor.IsEnabled())
239+
240+
// Get the registry to inspect registered handlers
241+
registry := processor.GetRegistry()
242+
commands := registry.GetRegisteredCommands()
243+
fmt.Printf("📋 Registered commands: %v\n", commands)
244+
245+
// Register a handler using the processor directly
246+
processor.RegisterHandlerFunc("DIRECT_REGISTRATION", func(ctx context.Context, notification []interface{}) bool {
247+
fmt.Printf("🎯 Direct registration handler: %v\n", notification)
248+
return true
249+
})
250+
251+
// Check if handlers are registered
252+
if registry.HasHandlers() {
253+
fmt.Println("✅ Push notification handlers are registered and ready")
254+
}
255+
256+
// Demonstrate notification info parsing
257+
sampleNotification := []interface{}{"SAMPLE_EVENT", "arg1", "arg2", 123}
258+
info := redis.ParsePushNotificationInfo(sampleNotification)
259+
if info != nil {
260+
fmt.Printf("📄 Notification info - Command: %s, Args: %d\n", info.Command, len(info.Args))
261+
}
262+
}

options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,17 @@ type Options struct {
216216
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
217217
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
218218
UnstableResp3 bool
219+
220+
// PushNotifications enables general push notification processing.
221+
// When enabled, the client will process RESP3 push notifications and
222+
// route them to registered handlers.
223+
//
224+
// default: false
225+
PushNotifications bool
226+
227+
// PushNotificationProcessor is the processor for handling push notifications.
228+
// If nil, a default processor will be created when PushNotifications is enabled.
229+
PushNotificationProcessor *PushNotificationProcessor
219230
}
220231

221232
func (opt *Options) init() {

pubsub.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,21 @@ type PubSub struct {
3838
chOnce sync.Once
3939
msgCh *channel
4040
allCh *channel
41+
42+
// Push notification processor for handling generic push notifications
43+
pushProcessor *PushNotificationProcessor
4144
}
4245

4346
func (c *PubSub) init() {
4447
c.exit = make(chan struct{})
4548
}
4649

50+
// SetPushNotificationProcessor sets the push notification processor for handling
51+
// generic push notifications received on this PubSub connection.
52+
func (c *PubSub) SetPushNotificationProcessor(processor *PushNotificationProcessor) {
53+
c.pushProcessor = processor
54+
}
55+
4756
func (c *PubSub) String() string {
4857
c.mu.Lock()
4958
defer c.mu.Unlock()
@@ -367,6 +376,18 @@ func (p *Pong) String() string {
367376
return "Pong"
368377
}
369378

379+
// PushNotificationMessage represents a generic push notification received on a PubSub connection.
380+
type PushNotificationMessage struct {
381+
// Command is the push notification command (e.g., "MOVING", "CUSTOM_EVENT").
382+
Command string
383+
// Args are the arguments following the command.
384+
Args []interface{}
385+
}
386+
387+
func (m *PushNotificationMessage) String() string {
388+
return fmt.Sprintf("push: %s", m.Command)
389+
}
390+
370391
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
371392
switch reply := reply.(type) {
372393
case string:
@@ -413,6 +434,18 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
413434
Payload: reply[1].(string),
414435
}, nil
415436
default:
437+
// Try to handle as generic push notification
438+
if c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
439+
ctx := c.getContext()
440+
handled := c.pushProcessor.GetRegistry().HandleNotification(ctx, reply)
441+
if handled {
442+
// Return a special message type to indicate it was handled
443+
return &PushNotificationMessage{
444+
Command: kind,
445+
Args: reply[1:],
446+
}, nil
447+
}
448+
}
416449
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
417450
}
418451
default:
@@ -658,6 +691,9 @@ func (c *channel) initMsgChan() {
658691
// Ignore.
659692
case *Pong:
660693
// Ignore.
694+
case *PushNotificationMessage:
695+
// Ignore push notifications in message-only channel
696+
// They are already handled by the push notification processor
661697
case *Message:
662698
timer.Reset(c.chanSendTimeout)
663699
select {
@@ -712,7 +748,7 @@ func (c *channel) initAllChan() {
712748
switch msg := msg.(type) {
713749
case *Pong:
714750
// Ignore.
715-
case *Subscription, *Message:
751+
case *Subscription, *Message, *PushNotificationMessage:
716752
timer.Reset(c.chanSendTimeout)
717753
select {
718754
case c.allCh <- msg:

0 commit comments

Comments
 (0)