Skip to content

[CAE-1088] feat: RESP3 notifications support #3418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b02eed6
feat: add general push notification system
ndyakov Jun 26, 2025
1ff0ded
feat: enforce single handler per notification type
ndyakov Jun 26, 2025
e6e2cea
feat: remove global handlers and enable push notifications by default
ndyakov Jun 26, 2025
d7fbe18
feat: fix connection health check interference with push notifications
ndyakov Jun 26, 2025
1331fb9
fix: remove unused fields and ensure push notifications work in clone…
ndyakov Jun 26, 2025
4747610
test: add comprehensive unit tests for 100% coverage
ndyakov Jun 26, 2025
70231ae
refactor: simplify push notification interface
ndyakov Jun 26, 2025
958fb1a
fix: resolve data race in PushNotificationProcessor
ndyakov Jun 26, 2025
79f6df2
remove: push-notification-demo
ndyakov Jun 26, 2025
c33b157
feat: add protected handler support and rename command to pushNotific…
ndyakov Jun 26, 2025
fdfcf94
feat: add VoidPushNotificationProcessor for disabled push notifications
ndyakov Jun 26, 2025
be9b6dd
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled m…
ndyakov Jun 26, 2025
8006fab
fix: ensure push notification processor is never nil in newConn
ndyakov Jun 26, 2025
d1d4529
fix: initialize push notification processor in SentinelClient
ndyakov Jun 26, 2025
a2de263
fix: copy push notification processor to transaction baseClient
ndyakov Jun 26, 2025
ad16b21
fix: initialize push notification processor in NewFailoverClient
ndyakov Jun 27, 2025
d3f6197
feat: add GetHandler method and improve push notification API encapsu…
ndyakov Jun 27, 2025
e6c5590
feat: enable real push notification processors for SentinelClient and…
ndyakov Jun 27, 2025
03bfd9f
feat: remove GetRegistry from PushNotificationProcessorInterface for …
ndyakov Jun 27, 2025
9a7a5c8
fix: add nil reader check in ProcessPendingNotifications to prevent p…
ndyakov Jun 27, 2025
ada72ce
refactor: move push notification logic to pusnotif package
ndyakov Jun 27, 2025
91805bc
refactor: remove handlerWrapper and use separate maps in registry
ndyakov Jun 27, 2025
e31987f
Fixes tests:
ndyakov Jun 27, 2025
075b930
fix: update coverage test to expect errors for disabled push notifica…
ndyakov Jun 27, 2025
f7948b5
fix: address pr review
ndyakov Jun 27, 2025
3473c1e
fix: simplify api
ndyakov Jun 27, 2025
d820ade
test: add comprehensive test coverage for pushnotif package
ndyakov Jun 27, 2025
b6e712b
feat: add proactive push notification processing to WithReader
ndyakov Jun 27, 2025
f66518c
feat: add pub/sub message filtering to push notification processor
ndyakov Jun 27, 2025
f4ff2d6
feat: expand notification filtering to include streams, keyspace, and…
ndyakov Jun 27, 2025
cb8a4e5
feat: process push notifications before returning connections from pool
ndyakov Jul 2, 2025
c44c8b5
fix: increase peek notification name bytes
ndyakov Jul 3, 2025
47dd490
feat: enhance push notification handlers with context information
ndyakov Jul 4, 2025
1606de8
feat: implement strongly typed HandlerContext interface
ndyakov Jul 4, 2025
d530d45
feat: implement strongly typed HandlerContext with concrete types in …
ndyakov Jul 4, 2025
5972b4c
refactor: move all push notification logic to root package and remove…
ndyakov Jul 4, 2025
ec4bf57
cleanup: remove redundant internal push notification packages
ndyakov Jul 4, 2025
b4d0ff1
refactor: organize push notification code into separate files
ndyakov Jul 4, 2025
84123b1
refactor(push): completly change the package structure
ndyakov Jul 4, 2025
d780401
refactor(push): simplify handler context
ndyakov Jul 5, 2025
604c8e3
fix(tests): debug logger
ndyakov Jul 5, 2025
b23f43c
fix(peek): non-blocking peek
ndyakov Jul 5, 2025
7a0f316
fix(tests): remove bench_decode tests
ndyakov Jul 5, 2025
225c0bf
fix(tests): add global ctx in tests
ndyakov Jul 5, 2025
2681d6d
Merge branch 'master' into ndyakov/CAE-1088-resp3-notification-handlers
ndyakov Jul 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (cn *Conn) WithReader(
return err
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this line

return fn(cn.rd)
}

Expand Down
33 changes: 31 additions & 2 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
)

var (
Expand Down Expand Up @@ -71,6 +72,9 @@ type Options struct {
MaxActiveConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration

// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without)
Protocol int
}

type lastDialErrorWrap struct {
Expand Down Expand Up @@ -228,6 +232,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {

cn := NewConn(netConn)
cn.pooled = pooled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop this line

return cn, nil
}

Expand Down Expand Up @@ -377,6 +382,16 @@ func (p *ConnPool) popIdle() (*Conn, error) {

func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 {
// Check if this might be push notification data
if p.cfg.Protocol == 3 {
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(ctx, "push: connection has buffered data, likely push notifications - will be processed by client")
return
}
Comment on lines +393 to +394
Copy link
Preview

Copilot AI Jul 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This early return in Put prevents the connection from being placed back into the pool, leading to connection leaks. You should still call p.connsMu/p.Put or similar logic after detecting push data, or explicitly return the connection to the pool.

Suggested change
return
}
}
// Allow the connection to proceed to the pool management logic

Copilot uses AI. Check for mistakes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually correct, instead of just returning, we should allow the connection to be put back in the pool

}
// For non-RESP3 or data that is not a push notification, buffered data is unexpected
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
Expand Down Expand Up @@ -523,8 +538,22 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
return false
}

if connCheck(cn.netConn) != nil {
return false
// Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data, it might be push notifications (RESP3)
// However, push notification processing is now handled by the client
// before WithReader to ensure proper context is available to handlers
if err == errUnexpectedRead && p.cfg.Protocol == 3 {
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
// For RESP3 connections with push notifications, we allow some buffered data
// The client will process these notifications before using the connection
internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client")
return true // Connection is healthy, client will handle notifications
}
return false // Unexpected data, not push notifications, connection is unhealthy
} else {
return false
}
}

cn.SetUsedAt(now)
Expand Down
21 changes: 21 additions & 0 deletions internal/proto/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ func (r *Reader) PeekReplyType() (byte, error) {
return b[0], nil
}

func (r *Reader) PeekPushNotificationName() (string, error) {
// peek 36 bytes, should be enough to read the push notification name
buf, err := r.rd.Peek(36)
if err != nil {
return "", err
}
if buf[0] != RespPush {
return "", fmt.Errorf("redis: can't parse push notification: %q", buf)
}
// remove push notification type and length
nextLine := buf[2:]
for i := 1; i < len(buf); i++ {
if buf[i] == '\r' && buf[i+1] == '\n' {
nextLine = buf[i+2:]
break
}
}
// return notification name or error
return r.readStringReply(nextLine)
}

// ReadLine Return a valid reply, it will check the protocol or redis error,
// and discard the attribute type.
func (r *Reader) ReadLine() ([]byte, error) {
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/push"
)

// Limiter is the interface of a rate limiter or a circuit breaker.
Expand Down Expand Up @@ -216,6 +217,13 @@ type Options struct {
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
UnstableResp3 bool

// Push notifications are always enabled for RESP3 connections (Protocol: 3)
// and are not available for RESP2 connections. No configuration option is needed.

// PushNotificationProcessor is the processor for handling push notifications.
// If nil, a default processor will be created for RESP3 connections.
PushNotificationProcessor push.NotificationProcessor
}

func (opt *Options) init() {
Expand Down Expand Up @@ -592,5 +600,7 @@ func newConnPool(
MaxActiveConns: opt.MaxActiveConns,
ConnMaxIdleTime: opt.ConnMaxIdleTime,
ConnMaxLifetime: opt.ConnMaxLifetime,
// Pass protocol version for push notification optimization
Protocol: opt.Protocol,
})
}
26 changes: 23 additions & 3 deletions osscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func (c *ClusterClient) processTxPipelineNode(
}

func (c *ClusterClient) processTxPipelineNodeConn(
ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
Expand All @@ -1641,7 +1641,7 @@ func (c *ClusterClient) processTxPipelineNodeConn(
trimmedCmds := cmds[1 : len(cmds)-1]

if err := c.txPipelineReadQueued(
ctx, rd, statusCmd, trimmedCmds, failedCmds,
ctx, node, cn, rd, statusCmd, trimmedCmds, failedCmds,
); err != nil {
setCmdsErr(cmds, err)

Expand All @@ -1653,30 +1653,50 @@ func (c *ClusterClient) processTxPipelineNodeConn(
return err
}

return pipelineReadCmds(rd, trimmedCmds)
return node.Client.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
})
}

func (c *ClusterClient) txPipelineReadQueued(
ctx context.Context,
node *clusterNode,
cn *pool.Conn,
rd *proto.Reader,
statusCmd *StatusCmd,
cmds []Cmder,
failedCmds *cmdsMap,
) error {
// Parse queued replies.
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
if err := statusCmd.readReply(rd); err != nil {
return err
}

for _, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
err := statusCmd.readReply(rd)
if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
continue
}
return err
}

// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse number of replies.
line, err := rd.ReadLine()
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/push"
)

// PubSub implements Pub/Sub commands as described in
Expand Down Expand Up @@ -38,6 +39,9 @@
chOnce sync.Once
msgCh *channel
allCh *channel

// Push notification processor for handling generic push notifications
pushProcessor push.NotificationProcessor
}

func (c *PubSub) init() {
Expand Down Expand Up @@ -436,6 +440,12 @@
}

err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
return c.cmd.readReply(rd)
})

Expand Down Expand Up @@ -532,6 +542,23 @@
return c.allCh.allCh
}

func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
if c.pushProcessor == nil {
return nil
}

// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}

func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
// PubSub doesn't have a client or connection pool, so we pass nil for those
// PubSub connections are blocking
return push.HandlerContext{}

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (8.2.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (8.0.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (8.0.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (7.4.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (7.2.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (8.2.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (7.2.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / test-redis-ce (7.4.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (8.0.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (8.2.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (8.0.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (8.2.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (7.4.x, 1.23.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / benchmark (7.4.x, 1.24.x)

undefined: push.HandlerContext

Check failure on line 558 in pubsub.go

View workflow job for this annotation

GitHub Actions / build (1.24.x, 7.4.2-54)

undefined: push.HandlerContext
return push.NewNotificationHandlerContext(nil, nil, c, cn, true)
}

type ChannelOption func(c *channel)

// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
Expand Down
Loading
Loading