Skip to content

Conversation

@zhijun42
Copy link
Contributor

Split from PR #21104

Changes:

  • Consolidated functions watcherGroup.choose and watcherGroup.chooseAll into a single function renamed as selectForSync (because choose is a terrible name). And moved this function from struct watcherGroup to the outer struct watchableStore because the behavior of selecting watchers for synchronization doesn't really belong there.

  • Use single-pass iteration: process watchers and add to result set in one loop (more efficient than the original two-pass approach)

@k8s-ci-robot
Copy link

Hi @zhijun42. Thanks for your PR.

I'm waiting for a etcd-io member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@serathius
Copy link
Member

/ok-to-test

/cc @ahrtr @fuweid

Signed-off-by: Zhijun <[email protected]>
@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 83.33333% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.35%. Comparing base (091d095) to head (95db1a0).

Files with missing lines Patch % Lines
server/storage/mvcc/watchable_store.go 83.33% 2 Missing and 2 partials ⚠️
Additional details and impacted files
Files with missing lines Coverage Δ
server/storage/mvcc/watcher_group.go 89.90% <ø> (-0.54%) ⬇️
server/storage/mvcc/watchable_store.go 91.69% <83.33%> (-1.57%) ⬇️

... and 19 files with indirect coverage changes

@@            Coverage Diff             @@
##             main   #21108      +/-   ##
==========================================
- Coverage   68.39%   68.35%   -0.04%     
==========================================
  Files         429      429              
  Lines       35244    35240       -4     
==========================================
- Hits        24105    24090      -15     
- Misses       9746     9756      +10     
- Partials     1393     1394       +1     

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 091d095...95db1a0. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Member

@fuweid fuweid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it looks good to me.

Please squash your commits into one. Thanks

I don't know what it's next step after refactor.
If there is plan or detail about follow-up, it will be better.

if s.unsynced.size() == 0 {
return &ret, minRev
}
for w := range s.unsynced.watchers {
Copy link
Member

@fuweid fuweid Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior changes slightly when maxWatchersPerSync < s.unsynced.size(): because s.unsynced is a map, iteration order is random, and the new loop can traverse many entries while still filling the batch. This means more compacted watchers may be handled (and compact responses sent) earlier than before, but in a nondeterministic order and if w.ch is not too busy.

Copy link
Member

@serathius serathius Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean that new implementation will return maxWatchers, no matter number of compaction attempts it did, while the previous implementation would count compactions into maxWatchers. Have I understood it correctly?

This means that now resync work mount is not bounded by maxWatchers, but by maxWatchers + numCompacted. So we could be slowing down resync.

Good catch, this is a good suggestion as the presented PR was meant as refactor.

This could be imply changed by changing ret.size() >= maxWatchers to watchersConsidered >= maxWatchers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean that new implementation will return maxWatchers, no matter number of compaction attempts it did, while the previous implementation would count compactions into maxWatchers. Have I understood it correctly?

Yes. map iteration order is nodeterministic. So it may go through all the watchers for that.

Comment on lines +450 to +453
ret.add(w)
if ret.size() >= maxWatchers {
break
}
Copy link
Member

@ahrtr ahrtr Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't consistent with the current behaviour. You may return an incorrect minRev if you break the loop due to reaching the maxWatchers.

Copy link
Member

@serathius serathius Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning returning minRev for the watchers is consistent with previous behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minRev part is correct. However, it may scan more than before.
We can go like this

for w := range s.unsynced.watchers {
     if ret.size() >= maxWatchers {
			break
		}
     ret.add(w)
    ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, not sure. I have another suggestion, there is no sense trying to compile code in our heads, maybe we should start from ensuring the functions are properly tested before changing anything :P

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-read the function, and confirmed that the minRev is correct.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensuring the functions are properly tested before changing anything

agreed on this.

It'd better to add an unit test separately to cover different scenarios, and ensure all cases pass on main. Afterwards, rebase this PR, ensure this PR doesn't break any case.

Copy link
Member

@ahrtr ahrtr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@serathius
Copy link
Member

Going back to motivation, I don't think single loop is more readable than two. Also I strongly don't agree with idea with using single loop for efficiency. Doesn't seem worth sacrificing readability here.

Consolidated functions watcherGroup.choose and watcherGroup.chooseAll into a single function renamed as selectForSync (because choose is a terrible name). And moved this function from struct watcherGroup to the outer struct watchableStore because the behavior of selecting watchers for synchronization doesn't really belong there.

Use single-pass iteration: process watchers and add to result set in one loop (more efficient than the original two-pass approach)

Copy link
Member

@serathius serathius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think this refactor is beneficial.

@k8s-ci-robot
Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by: zhijun42
Once this PR has been reviewed and has the lgtm label, please assign ahrtr for approval. For more information see the Code Review Process.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@zhijun42
Copy link
Contributor Author

Thanks for the feedback! Let me share the full context to help decide the best path forward.

This PR is a stepping stone toward a larger memory optimization, handling the problem: Slow Watchers Cause Memory Spikes.

When slow watchers fall far behind (e.g., due to network issues or slow consumers), they have very small minRev values. These far-behind watchers drag down the global minimum revision used for database queries. The current implementation always fetches events in range[minRev, curRev), so a single slow watcher at revision 3 when curRev is 300 causes a query for 300-ish revisions worth of events - consuming significant memory even though most watchers don't need that historical data.

We could use a priority queue to drain these slow watchers first, but I think that's kinda overkill.

So I'm proposing a bounded revision range solution. Instead of [globalMinRev, curRev), we fetch [globalMinRev, globalMinRev + 100] (100 is an arbitrary number that we can change) and only sync watchers within that window. This ensures oldest watchers are fully drained before moving on to newer ones.

I've run some watch benchmarks and seen consistently much lower memory usage.

Here's what the final implementation looks like. This requires two passes (first to find globalMinRev, then to filter), so it doesn't map cleanly to the choose / chooseAll pattern.

// selectForSync selects up to maxWatchers from the group that need to sync from DB.
// It skips watchers with pending events (non-nil eventBatch) as they are handled by retryLoop.
// It also handles compacted watchers by sending compaction responses and removing them.
//
// To prevent slow watchers from causing large DB queries, this uses revision-range batching:
// 1. First filtering: find all eligible watchers and get the global minimum revision among them;
// 2. Second filtering: only select watchers within [globalMin, globalMin + selectForSyncRevRange]
// This ensures oldest watchers are fully drained before moving on to newer ones.
//
// Returns the selected watchers and the revision range [minRev, maxRev] needed for DB fetch.
// If no watchers need sync, returns (nil, MaxInt64, 0).
func (wg *watcherGroup) selectForSync(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64, int64) {
	// First pass: collect eligible watchers and find global minimum revision
	globalMinRev := int64(math.MaxInt64)
	var eligibleWatchers []*watcher
	for w := range wg.watchers {
		if w.minRev > curRev {
			if !w.restore {
				panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
			}
			w.restore = false
		}
		if w.minRev < compactRev {
			select {
			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
				w.compacted = true
				wg.delete(w)
			default:
			}
			continue
		}
		eligibleWatchers = append(eligibleWatchers, w)
		if globalMinRev > w.minRev {
			globalMinRev = w.minRev
		}
	}

	// Limit revision range to prevent fetching too many events at once
	maxAllowedRev := globalMinRev + selectForSyncRevRange /* constant set to 100 */
	if maxAllowedRev > curRev {
		maxAllowedRev = curRev
	}

	// Second pass: filter by revision range
	ret := newWatcherGroup()
	for _, w := range eligibleWatchers {
		if w.minRev > maxAllowedRev && w.minRev <= curRev {
			continue
		}
		ret.add(w)
		if ret.size() >= maxWatchers {
			break
		}
	}
	return &ret, globalMinRev, maxAllowedRev
}

@serathius
Copy link
Member

I got lost, I incorrectly assumed that this is part of #21065 and not #16839. Developing a fix is totally different than developing a feature or improving performance. They require different approach in process and implementation. Please don't mix them.

Fixes are usually expected to be backported, we cannot do build them on pile of refactors because we don't want to backport refactors. Fix should bea minimal targeted change to mitigate the issue that can be backported. I would not start from refactor.

So I'm proposing a bounded revision range solution. Instead of [globalMinRev, curRev), we fetch [globalMinRev, globalMinRev + 100] (100 is an arbitrary number that we can change) and only sync watchers within that window. This ensures oldest watchers are fully drained before moving on to newer ones.

I've run some watch benchmarks and seen consistently much lower memory usage.

While I agree that this change could help, I don't think "some" benchmarks are enough to give us confidence to implement it. It's not hard to swap one property for another, like get lower memory by doing smaller chunks. The trick is how to ensure there are no regressions in all other properties, or if there are show that it's a worthy tradeoff.

In case of your proposal the tradeoff is fairness. The current algorithm randomly selects and synchronize it fully, the downside of that unbounded memory. Limiting resynchronization to 100 events for slowest watchers will definitely lower memory, but is no longer fair. If users periodically open a watch on old revision, the new algorithm would only synchronize that new watcher and totally avoid synchronizing newer watchers. It's a trick problem that requires more considerations for design, testing and benchmarking.

Overall, really good job on investigating the issue and coding, however I think we need to take a step back as this change is too big to be just a PR. We need a proper design. Like in #16839 (comment) we started from proposal, profiles and benchmarking.

We need to agree on benchmarks and tests to compare solutions, gather ideas, use benchmarks to pick the best and only then start implementation. Happy to collaborate with you on that.

@zhijun42
Copy link
Contributor Author

@serathius Great feedback! And good point about watcher fairness. I did underestimate the scope and impact of such optimization attempt. Will take a step back and think about it more thoroughly.

@serathius
Copy link
Member

Will take a step back and think about it more thoroughly.

Let's talk and bounce ideas, feel free to contact me on K8s slack. We could also start from Google doc for easier collaboration over an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

5 participants