Skip to content

Commit cb8a4e5

Browse files
committed
feat: process push notifications before returning connections from pool
Implement push notification processing in baseClient._getConn() to ensure that all cluster topology changes are handled immediately before connections are used for commands. This is critical for hitless upgrades and real-time cluster state awareness. Key Enhancements: 1. Enhanced Connection Retrieval (_getConn): - Process push notifications for both existing and new connections - Added processPushNotifications() call before returning connections - Ensures immediate handling of cluster topology changes - Proper error handling with connection removal on processing failures 2. Push Notification Processing Method: - Added processPushNotifications() method to baseClient - Only processes notifications for RESP3 connections with processors - Uses WithReader() to safely access connection reader - Integrates with existing push notification infrastructure 3. Connection Flow Enhancement: - Existing connections: Health check → Push notification processing → Return - New connections: Initialization → Push notification processing → Return - Failed processing results in connection removal and error return - Seamless integration with existing connection management 4. RESP3 Protocol Integration: - Protocol version check (only process for RESP3) - Push processor availability check - Graceful handling when processors are not available - Consistent behavior with existing push notification system 5. Error Handling and Recovery: - Remove connections if push notification processing fails - Return errors to trigger connection retry mechanisms - Maintain connection pool health and reliability - Prevent returning connections with unprocessed notifications Implementation Details: - processPushNotifications() checks protocol and processor availability - Uses cn.WithReader() to safely access the connection reader - Calls pushProcessor.ProcessPendingNotifications() for actual processing - Applied to both pooled connections and newly initialized connections - Consistent error handling across all connection retrieval paths Flow Enhancement: 1. Connection requested via _getConn() 2. Connection retrieved from pool (existing or new) 3. Connection initialization (if new) 4. Push notification processing (NEW) 5. Connection returned to caller 6. Commands executed with up-to-date cluster state Benefits: - Immediate cluster topology awareness before command execution - Enhanced hitless upgrade reliability with real-time notifications - Reduced command failures during cluster topology changes - Consistent push notification handling across all connection types - Better integration with Redis cluster operations This ensures that Redis cluster topology changes (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are always processed before connections are used, providing the foundation for reliable hitless upgrades and seamless cluster operations.
1 parent f4ff2d6 commit cb8a4e5

File tree

1 file changed

+30
-0
lines changed

1 file changed

+30
-0
lines changed

redis.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,13 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
273273
}
274274

275275
if cn.Inited {
276+
// Process all pending push notifications before returning the connection
277+
// This ensures that cluster topology changes are handled immediately
278+
if err := c.processPushNotifications(ctx, cn); err != nil {
279+
// If push notification processing fails, remove the connection
280+
c.connPool.Remove(ctx, cn, err)
281+
return nil, err
282+
}
276283
return cn, nil
277284
}
278285

@@ -284,9 +291,32 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
284291
return nil, err
285292
}
286293

294+
// Process any pending push notifications on the newly initialized connection
295+
// This ensures that any notifications received during connection setup are handled
296+
if err := c.processPushNotifications(ctx, cn); err != nil {
297+
// If push notification processing fails, remove the connection
298+
c.connPool.Remove(ctx, cn, err)
299+
return nil, err
300+
}
301+
287302
return cn, nil
288303
}
289304

305+
// processPushNotifications processes all pending push notifications on a connection
306+
// This ensures that cluster topology changes are handled immediately before the connection is used
307+
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
308+
// Only process push notifications for RESP3 connections with a processor
309+
if c.opt.Protocol != 3 || c.pushProcessor == nil {
310+
return nil
311+
}
312+
313+
// Use WithReader to access the reader and process push notifications
314+
// This is critical for hitless upgrades to work properly
315+
return cn.WithReader(ctx, 0, func(rd *proto.Reader) error {
316+
return c.pushProcessor.ProcessPendingNotifications(ctx, rd)
317+
})
318+
}
319+
290320
func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener {
291321
return auth.NewReAuthCredentialsListener(
292322
c.reAuthConnection(poolCn),

0 commit comments

Comments
 (0)