Skip to content

KAFKA-19312: Avoiding concurrent execution of onComplete and tryComplete #19759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 25, 2025

Conversation

apoorvmittal10
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 commented May 19, 2025

The onComplete method in DelayedOperation is guaranteed to run only
once, through forceComplete, invoked either by tryComplete or when
operation is expired (run method). The invocation of tryComplete is
done by attaining lock so no concurrent execution of tryComplete
happens for same delayed operation. However, there can be concurrent
execution of tryComplete and onComplete as the expiration thread
can trigger a separte run of onComplete while tryComplete is still
executing. This behaviour is not ideal as there are parallel runs where
1 threads method execution is wasteful i.e. if onComplete is already
invoked by another thread then execution of tryComplete is not
required.

Reviewers: Jun Rao [email protected], Andrew Schofield
[email protected], Chia-Ping Tsai [email protected]

@github-actions github-actions bot added the triage PRs from the community label May 19, 2025
@apoorvmittal10 apoorvmittal10 requested review from junrao and chia7712 May 19, 2025 15:35
@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka small Small PRs labels May 19, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels May 19, 2025
@chia7712
Copy link
Member

chia7712 commented May 19, 2025

The onComplete method in DelayedOperation is guaranteed to run only
once, either by tryComplete or onExpiration.

onExpiration? or forceComplete?

if
onComplete is already invoked by another thread then execution of
tryComplete is not required.

Excuse me, how to address it? do you plan to check isCompleted before executing tryComplete?

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the PR. The code looks reasonable to me. Could you run TestPurgatoryPerformance to see if there is any perf degradation?

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. This was a tricky bug to find, so nice catch.

@chia7712
Copy link
Member

@apoorvmittal10 It is too cool to add response in comment, so I don't notice your response :)

For 2) tryComplete invocation is in lock itself and there is also a checck for isCompleted prior invocation:

Should we update safeTryCompleteOrElse to ensure it check for isCompleted prior invocation.

@apoorvmittal10
Copy link
Contributor Author

@apoorvmittal10 It is too cool to add response in comment, so I don't notice your response :)

For 2) tryComplete invocation is in lock itself and there is also a checck for isCompleted prior invocation:

Should we update safeTryCompleteOrElse to ensure it check for isCompleted prior invocation.

@chia7712 Ahhhh, my bad. I clicked edit instead of reply 🫣🤦‍♂️ My bad.

safeTryCompleteOrElse is invoked when the operation is being added to purgatory, if tryComplete returns false then it gets added in purgatory. An operation should never have completed true when invoking safeTryCompleteOrElse.

@apoorvmittal10
Copy link
Contributor Author

The onComplete method in DelayedOperation is guaranteed to run only
once, either by tryComplete or onExpiration.

onExpiration? or forceComplete?

if
onComplete is already invoked by another thread then execution of
tryComplete is not required.

Excuse me, how to address it? do you plan to check isCompleted before executing tryComplete?

For 1) I meant the invocation by run method, I have updated the description. Thanks.

For 2) tryComplete invocation is in lock itself and there is also a checck for isCompleted prior invocation:

@chia7712 just rewriting, so we have corrected flow of comments.

return false;
}
// Attain lock prior completing the request.
lock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, the lock must be reentrant, correct? If so, we should consider replacing the Lock type with ReentrantLock to prevent potential deadlocks.

Additionally, DelayedOperation(long delayMs, Optional<Lock> lockOpt) seems to lose its meaning once the old group coordinator is removed. We might want to clean that up.

These points can be addressed in a follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implemenation of Lock is over ReentrantLock itself in DelayedOperation and elsewhere, abstracted over Lock API in DelayedOperation. Are you suggesting to make it more tighter and use ReentrantLock explicitly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to make it more tighter and use ReentrantLock explicitly?

yes, but this change needs to cleanup the code. It can be addressed later if we want to merge this bug fix first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I ll create a folow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little bored, so I created Jira KAFKA-19322 (https://issues.apache.org/jira/browse/KAFKA-19322). Feel free to take it over, or I can assign it to a mentee 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are awesome, feel free to assign to a mentee and I can review or else I will pick it up.

@junrao
Copy link
Contributor

junrao commented May 20, 2025

This was a tricky bug to find, so nice catch.

@apoorvmittal10 : My understanding is that this PR is not fixing a bug. Without the PR, we still guarantee that onComplete() will be called exactly once. So, this PR just saves some unnecessary work after the operation completes. Is that correct?

@apoorvmittal10
Copy link
Contributor Author

This was a tricky bug to find, so nice catch.

@apoorvmittal10 : My understanding is that this PR is not fixing a bug. Without the PR, we still guarantee that onComplete() will be called exactly once. So, this PR just saves some unnecessary work after the operation completes. Is that correct?

It does both, without the PR, though we guarantee that onComplete will be called exactly once but there can be a thread which is inside onComplete (when timeout occurs) and a separate thread invoking tryComplete. For DelayedShareFetch, prior this PR, a thread can be waiting in onComplete for lock which has already marked completed true in DelayedOperation but parallel thread in tryComplete is running, which acquires some partitions. Now the thread in tryComplete cannot invoke onComplete as completed is marked true in DelayedOperation, hence acquired partitions will be released. Then other thread which was waiting in onComplete for the lock, starts executing and releases the same acquired topic partitions again. This PR fixes this issue as well.

@apoorvmittal10
Copy link
Contributor Author

@apoorvmittal10 : Thanks for the PR. The code looks reasonable to me. Could you run TestPurgatoryPerformance to see if there is any perf degradation?

I ran some tests and performance is same.

After the chages:

--num 10000 --rate 100 --timeout 1000 --pct50 0.5 --pct75 0.75

# latency samples: pct75 = 0, pct50 = 0, min = 0, max = 7
# interval samples: rate = 100.068948, min = 0, max = 129
# enqueue rate (10000 requests):
# <elapsed time ms>	<target rate>	<actual rate>	<process cpu time ms>	<G1 Old Generation count> <G1 Young Generation count>	<G1 Old Generation time ms> <G1 Young Generation time ms>
101196	99.809364	99.806376	3240	0 2	0 8
--num 10000 --rate 1000 --timeout 1000 --pct50 0.5 --pct75 0.75

# latency samples: pct75 = 0, pct50 = 0, min = 0, max = 9
# interval samples: rate = 999.371395, min = 0, max = 14
# enqueue rate (10000 requests):
# <elapsed time ms>	<target rate>	<actual rate>	<process cpu time ms>	<G1 Old Generation count> <G1 Young Generation count>	<G1 Old Generation time ms> <G1 Young Generation time ms>
11104	989.902990	989.805008	1349	0 2	0 7

Before changes:

--num 10000 --rate 100 --timeout 1000 --pct50 0.5 --pct75 0.75

# latency samples: pct75 = 0, pct50 = 0, min = 0, max = 9
# interval samples: rate = 100.020304, min = 0, max = 130
# enqueue rate (10000 requests):
# <elapsed time ms>	<target rate>	<actual rate>	<process cpu time ms>	<G1 Old Generation count> <G1 Young Generation count>	<G1 Old Generation time ms> <G1 Young Generation time ms>
102366	98.657274	98.652408	3444	0 2	0 8

--num 10000 --rate 1000 --timeout 1000 --pct50 0.5 --pct75 0.75

# latency samples: pct75 = 0, pct50 = 0, min = 0, max = 8
# interval samples: rate = 997.134236, min = 0, max = 14
# enqueue rate (10000 requests):
# <elapsed time ms>	<target rate>	<actual rate>	<process cpu time ms>	<G1 Old Generation count> <G1 Young Generation count>	<G1 Old Generation time ms> <G1 Young Generation time ms>
11218	978.665101	978.665101	1624	0 2	0 7

@junrao
Copy link
Contributor

junrao commented May 21, 2025

Now the thread in tryComplete cannot invoke onComplete as completed is marked true in DelayedOperation, hence acquired partitions will be released.

Hmm, is this true? tryComplete will only release the acquired locks if forceComplete() returns true, right?

@apoorvmittal10
Copy link
Contributor Author

Now the thread in tryComplete cannot invoke onComplete as completed is marked true in DelayedOperation, hence acquired partitions will be released.

Hmm, is this true? tryComplete will only release the acquired locks if forceComplete() returns true, right?

Yes it's true, it can happen in current implementation. This line when invoked from tryComplete can release the locks, which is correct but conflicts with concurrent execution on request timeout. To make sure that our acquired share partitions release is correct, we added a check here. In our performance runs we saw the issue. However it will not cause issue per se for share fetches, as SharePartition will not acquire record offsets which are already acquired but it will have some wasted fetches.

@junrao
Copy link
Contributor

junrao commented May 21, 2025

@apoorvmittal10 : Thanks for the explanation. It seems that we found a case that an acquired sharePartition lock was released more than once. I am still not sure how it happened. forceComplete() is supposed to only return true once and the lock is only released when forceComplete() returns true. How is the acquired lock released a second time?

@apoorvmittal10
Copy link
Contributor Author

@apoorvmittal10 : Thanks for the explanation. It seems that we found a case that an acquired sharePartition lock was released more than once. I am still not sure how it happened. forceComplete() is supposed to only return true once and the lock is only released when forceComplete() returns true. How is the acquired lock released a second time?

Prior to this PR there is no lock in forceComplete but a way to make sure that onComplete is not executed twice. However, consider that thread 1 has started executing safeTryComplete which attains lock and checks that if the operation has marked completed, if not then tryComplete is executed. Now thread 1 is executing tryComplete, then thread 2 on timeout executes forceComplete which marks completed as true and waits here for the operation lock. Now thread 1, in tryComplete, acquires some share partitions but as completed is already marked true then the share partitions are released and so the lock for delayed operation. Thread 2 now starts executing onComplete but as partitionsAcquired is at instance level hence onComplete tries to execute further with those share partitions which are already released. Once completed then onCompete releases those share partitions again.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the explanation. It's clear to me why this is a bug now. A couple of comments.

* <p>
* On operation timeout, onComplete is invoked, last try occurs to acquire partitions and read
* from log, if acquired. The fetch will only happen from local log and not remote storage, on
* operation expiration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better consistency, should we remove released partitions from partitionsAcquired in releasePartitionLocks()? It seems that if we had done that, we wouldn't get into the situation of releasing the lock more than once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it but it complicates the code execution in DelayedShareFetch. Clearing partitionsAcquired alone is not sufficient, as there could be pending remote fetch as well which might need additional time to finish. Hence I didn't go with that approach.

return true;
} else {
// Do not proceed if the operation is already completed.
if (completed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An implication of this change is that when forceComplete() is called inside tryComplete, it always returns true. Currently, we have multiple places that check the return value of forceComplete() inside tryComplete. Should we adjust those?

Copy link
Contributor Author

@apoorvmittal10 apoorvmittal10 May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can revert it to AtomicBoolean#compareAndSet but I didn't understand the problem. There would only be first execution of forceComplete which should be successful and return true. Subsequent executions will return false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao that is a good point. After avoiding concurrent execution, forceComplete MUST return true if it is executed by tryComplete. Hence, checking the returned value of forceComplete gets meaningless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so are we saying that first execution of forceComplete from tryComplete will return true? And as the operation is marked completed hence no further execution of tryComplete can trigger, which makes all executions of forceComplete within tryComplete always returns true?

I have removed the check in DelayedShareFetch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so are we saying that first execution of forceComplete from tryComplete will return true? And as the operation is marked completed hence no further execution of tryComplete can trigger, which makes all executions of forceComplete within tryComplete always returns true?

yes, we are on the same page. I hope I'm not misunderstanding @junrao comment :)

@apoorvmittal10 apoorvmittal10 requested a review from junrao May 22, 2025 06:46
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.

onComplete();
return true;
} else {
// Do not proceed if the operation is already completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the expiration logic checks the return value of forceComplete(). We could do the following in the expiration logic and change forceComplete() to return void and avoid using the lock there (the caller will get the lock instead).

        lock.lock();
        try {
            if (!isCompleted()) {
              forceComplete();
              onExpiration();
             }
        } finally {
            lock.unlock();
        }

Copy link
Contributor Author

@apoorvmittal10 apoorvmittal10 May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and change forceComplete() to return void

Though I agree that now if forceComplete is invoked by tryComplete then it will return true but tryComplete still need to return boolean hence all places where we return the value from tryComplete based on forceComplete has to be changed and also requires explicit return true from all the places. I am of the opinion to not do this. Or if you think we should then shall we take it as a separate PR for just refactor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the issue is that there are quite a few places where tryComplete() calls forceComplete() and checks the return value of forceComplete(). You fixed the issue in DelayedShareFetch, but not in other places. As we evolve the code, it would be useful to leave the code base in a better place for future developers, even though it takes a bit of more work. This can be done in a followup jira. Filed https://issues.apache.org/jira/browse/KAFKA-19325 to track it.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.

onComplete();
return true;
} else {
// Do not proceed if the operation is already completed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the issue is that there are quite a few places where tryComplete() calls forceComplete() and checks the return value of forceComplete(). You fixed the issue in DelayedShareFetch, but not in other places. As we evolve the code, it would be useful to leave the code base in a better place for future developers, even though it takes a bit of more work. This can be done in a followup jira. Filed https://issues.apache.org/jira/browse/KAFKA-19325 to track it.

* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
* forceComplete().
* Once an operation is completed, isCompleted() will return true. onComplete() is called from forceComplete(),
* which is triggered once by either expiration if the operation is not completed after delayMs, or tryComplete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it's possible for forceComplete() to be called once from tryComplete() and another from expiration. So, we need to remove once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apoorvmittal10 apoorvmittal10 requested a review from junrao May 23, 2025 21:57
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 : Thanks for the updated PR. LGTM

@chia7712 chia7712 merged commit adb7677 into apache:trunk May 25, 2025
24 checks passed
chia7712 pushed a commit that referenced this pull request May 26, 2025
…xternal lock (#19798)

Remove the DelayedOperation constructor that accepts an external lock.

According to this [PR](#19759).

Reviewers: Ken Huang <[email protected]>, PoAn Yang
 <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai
 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants