-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[CAE-1088] feat: RESP3 notifications support #3418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 39 commits
b02eed6
1ff0ded
e6e2cea
d7fbe18
1331fb9
4747610
70231ae
958fb1a
79f6df2
c33b157
fdfcf94
be9b6dd
8006fab
d1d4529
a2de263
ad16b21
d3f6197
e6c5590
03bfd9f
9a7a5c8
ada72ce
91805bc
e31987f
075b930
f7948b5
3473c1e
d820ade
b6e712b
f66518c
f4ff2d6
cb8a4e5
c44c8b5
47dd490
1606de8
d530d45
5972b4c
ec4bf57
b4d0ff1
84123b1
d780401
604c8e3
b23f43c
7a0f316
225c0bf
2681d6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -77,6 +77,7 @@ func (cn *Conn) WithReader( | |
return err | ||
} | ||
} | ||
|
||
return fn(cn.rd) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |||||||||
"time" | ||||||||||
|
||||||||||
"github.com/redis/go-redis/v9/internal" | ||||||||||
"github.com/redis/go-redis/v9/internal/proto" | ||||||||||
) | ||||||||||
|
||||||||||
var ( | ||||||||||
|
@@ -71,6 +72,9 @@ type Options struct { | |||||||||
MaxActiveConns int | ||||||||||
ConnMaxIdleTime time.Duration | ||||||||||
ConnMaxLifetime time.Duration | ||||||||||
|
||||||||||
// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without) | ||||||||||
Protocol int | ||||||||||
} | ||||||||||
|
||||||||||
type lastDialErrorWrap struct { | ||||||||||
|
@@ -228,6 +232,7 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { | |||||||||
|
||||||||||
cn := NewConn(netConn) | ||||||||||
cn.pooled = pooled | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drop this line |
||||||||||
return cn, nil | ||||||||||
} | ||||||||||
|
||||||||||
|
@@ -377,6 +382,16 @@ func (p *ConnPool) popIdle() (*Conn, error) { | |||||||||
|
||||||||||
func (p *ConnPool) Put(ctx context.Context, cn *Conn) { | ||||||||||
if cn.rd.Buffered() > 0 { | ||||||||||
ndyakov marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
// Check if this might be push notification data | ||||||||||
if p.cfg.Protocol == 3 { | ||||||||||
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { | ||||||||||
// For push notifications, we allow some buffered data | ||||||||||
// The client will process these notifications before using the connection | ||||||||||
internal.Logger.Printf(ctx, "push: connection has buffered data, likely push notifications - will be processed by client") | ||||||||||
return | ||||||||||
} | ||||||||||
Comment on lines
+393
to
+394
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This early
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually correct, instead of just returning, we should allow the connection to be put back in the pool |
||||||||||
} | ||||||||||
// For non-RESP3 or data that is not a push notification, buffered data is unexpected | ||||||||||
internal.Logger.Printf(ctx, "Conn has unread data") | ||||||||||
p.Remove(ctx, cn, BadConnError{}) | ||||||||||
return | ||||||||||
|
@@ -523,8 +538,22 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool { | |||||||||
return false | ||||||||||
} | ||||||||||
|
||||||||||
if connCheck(cn.netConn) != nil { | ||||||||||
return false | ||||||||||
// Check connection health, but be aware of push notifications | ||||||||||
if err := connCheck(cn.netConn); err != nil { | ||||||||||
// If there's unexpected data, it might be push notifications (RESP3) | ||||||||||
// However, push notification processing is now handled by the client | ||||||||||
// before WithReader to ensure proper context is available to handlers | ||||||||||
if err == errUnexpectedRead && p.cfg.Protocol == 3 { | ||||||||||
if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { | ||||||||||
// For RESP3 connections with push notifications, we allow some buffered data | ||||||||||
// The client will process these notifications before using the connection | ||||||||||
internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client") | ||||||||||
return true // Connection is healthy, client will handle notifications | ||||||||||
} | ||||||||||
return false // Unexpected data, not push notifications, connection is unhealthy | ||||||||||
} else { | ||||||||||
return false | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
cn.SetUsedAt(now) | ||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop this line