-
Notifications
You must be signed in to change notification settings - Fork 10.3k
mvcc: refactor watch store by consolidating watcherGroup.choose/chooseAll #21108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Zhijun <[email protected]>
|
Hi @zhijun42. Thanks for your PR. I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Signed-off-by: Zhijun <[email protected]>
Codecov Report❌ Patch coverage is
Additional details and impacted files
... and 19 files with indirect coverage changes @@ Coverage Diff @@
## main #21108 +/- ##
==========================================
- Coverage 68.39% 68.35% -0.04%
==========================================
Files 429 429
Lines 35244 35240 -4
==========================================
- Hits 24105 24090 -15
- Misses 9746 9756 +10
- Partials 1393 1394 +1 Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
fuweid
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall it looks good to me.
Please squash your commits into one. Thanks
I don't know what it's next step after refactor.
If there is plan or detail about follow-up, it will be better.
| if s.unsynced.size() == 0 { | ||
| return &ret, minRev | ||
| } | ||
| for w := range s.unsynced.watchers { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Behavior changes slightly when maxWatchersPerSync < s.unsynced.size(): because s.unsynced is a map, iteration order is random, and the new loop can traverse many entries while still filling the batch. This means more compacted watchers may be handled (and compact responses sent) earlier than before, but in a nondeterministic order and if w.ch is not too busy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that new implementation will return maxWatchers, no matter number of compaction attempts it did, while the previous implementation would count compactions into maxWatchers. Have I understood it correctly?
This means that now resync work mount is not bounded by maxWatchers, but by maxWatchers + numCompacted. So we could be slowing down resync.
Good catch, this is a good suggestion as the presented PR was meant as refactor.
This could be imply changed by changing ret.size() >= maxWatchers to watchersConsidered >= maxWatchers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that new implementation will return maxWatchers, no matter number of compaction attempts it did, while the previous implementation would count compactions into maxWatchers. Have I understood it correctly?
Yes. map iteration order is nodeterministic. So it may go through all the watchers for that.
| ret.add(w) | ||
| if ret.size() >= maxWatchers { | ||
| break | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't consistent with the current behaviour. You may return an incorrect minRev if you break the loop due to reaching the maxWatchers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning returning minRev for the watchers is consistent with previous behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minRev part is correct. However, it may scan more than before.
We can go like this
for w := range s.unsynced.watchers {
if ret.size() >= maxWatchers {
break
}
ret.add(w)
...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, not sure. I have another suggestion, there is no sense trying to compile code in our heads, maybe we should start from ensuring the functions are properly tested before changing anything :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-read the function, and confirmed that the minRev is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ensuring the functions are properly tested before changing anything
agreed on this.
It'd better to add an unit test separately to cover different scenarios, and ensure all cases pass on main. Afterwards, rebase this PR, ensure this PR doesn't break any case.
ahrtr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Going back to motivation, I don't think single loop is more readable than two. Also I strongly don't agree with idea with using single loop for efficiency. Doesn't seem worth sacrificing readability here.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this refactor is beneficial.
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: zhijun42 The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Thanks for the feedback! Let me share the full context to help decide the best path forward. This PR is a stepping stone toward a larger memory optimization, handling the problem: Slow Watchers Cause Memory Spikes. When slow watchers fall far behind (e.g., due to network issues or slow consumers), they have very small minRev values. These far-behind watchers drag down the global minimum revision used for database queries. The current implementation always fetches events in range We could use a priority queue to drain these slow watchers first, but I think that's kinda overkill. So I'm proposing a bounded revision range solution. Instead of I've run some watch benchmarks and seen consistently much lower memory usage. Here's what the final implementation looks like. This requires two passes (first to find // selectForSync selects up to maxWatchers from the group that need to sync from DB.
// It skips watchers with pending events (non-nil eventBatch) as they are handled by retryLoop.
// It also handles compacted watchers by sending compaction responses and removing them.
//
// To prevent slow watchers from causing large DB queries, this uses revision-range batching:
// 1. First filtering: find all eligible watchers and get the global minimum revision among them;
// 2. Second filtering: only select watchers within [globalMin, globalMin + selectForSyncRevRange]
// This ensures oldest watchers are fully drained before moving on to newer ones.
//
// Returns the selected watchers and the revision range [minRev, maxRev] needed for DB fetch.
// If no watchers need sync, returns (nil, MaxInt64, 0).
func (wg *watcherGroup) selectForSync(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64, int64) {
// First pass: collect eligible watchers and find global minimum revision
globalMinRev := int64(math.MaxInt64)
var eligibleWatchers []*watcher
for w := range wg.watchers {
if w.minRev > curRev {
if !w.restore {
panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
}
w.restore = false
}
if w.minRev < compactRev {
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
default:
}
continue
}
eligibleWatchers = append(eligibleWatchers, w)
if globalMinRev > w.minRev {
globalMinRev = w.minRev
}
}
// Limit revision range to prevent fetching too many events at once
maxAllowedRev := globalMinRev + selectForSyncRevRange /* constant set to 100 */
if maxAllowedRev > curRev {
maxAllowedRev = curRev
}
// Second pass: filter by revision range
ret := newWatcherGroup()
for _, w := range eligibleWatchers {
if w.minRev > maxAllowedRev && w.minRev <= curRev {
continue
}
ret.add(w)
if ret.size() >= maxWatchers {
break
}
}
return &ret, globalMinRev, maxAllowedRev
} |
|
I got lost, I incorrectly assumed that this is part of #21065 and not #16839. Developing a fix is totally different than developing a feature or improving performance. They require different approach in process and implementation. Please don't mix them. Fixes are usually expected to be backported, we cannot do build them on pile of refactors because we don't want to backport refactors. Fix should bea minimal targeted change to mitigate the issue that can be backported. I would not start from refactor.
While I agree that this change could help, I don't think "some" benchmarks are enough to give us confidence to implement it. It's not hard to swap one property for another, like get lower memory by doing smaller chunks. The trick is how to ensure there are no regressions in all other properties, or if there are show that it's a worthy tradeoff. In case of your proposal the tradeoff is fairness. The current algorithm randomly selects and synchronize it fully, the downside of that unbounded memory. Limiting resynchronization to 100 events for slowest watchers will definitely lower memory, but is no longer fair. If users periodically open a watch on old revision, the new algorithm would only synchronize that new watcher and totally avoid synchronizing newer watchers. It's a trick problem that requires more considerations for design, testing and benchmarking. Overall, really good job on investigating the issue and coding, however I think we need to take a step back as this change is too big to be just a PR. We need a proper design. Like in #16839 (comment) we started from proposal, profiles and benchmarking. We need to agree on benchmarks and tests to compare solutions, gather ideas, use benchmarks to pick the best and only then start implementation. Happy to collaborate with you on that. |
|
@serathius Great feedback! And good point about watcher fairness. I did underestimate the scope and impact of such optimization attempt. Will take a step back and think about it more thoroughly. |
Let's talk and bounce ideas, feel free to contact me on K8s slack. We could also start from Google doc for easier collaboration over an issue. |
Split from PR #21104
Changes:
Consolidated functions
watcherGroup.chooseandwatcherGroup.chooseAllinto a single function renamed asselectForSync(becausechooseis a terrible name). And moved this function from structwatcherGroupto the outer structwatchableStorebecause the behavior of selecting watchers for synchronization doesn't really belong there.Use single-pass iteration: process watchers and add to result set in one loop (more efficient than the original two-pass approach)