-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19312: Avoiding concurrent execution of onComplete and tryComplete #19759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Excuse me, how to address it? do you plan to check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the PR. The code looks reasonable to me. Could you run TestPurgatoryPerformance to see if there is any perf degradation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. This was a tricky bug to find, so nice catch.
@apoorvmittal10 It is too cool to add response in comment, so I don't notice your response :)
Should we update |
@chia7712 Ahhhh, my bad. I clicked edit instead of reply 🫣🤦♂️ My bad. safeTryCompleteOrElse is invoked when the operation is being added to purgatory, if tryComplete returns false then it gets added in purgatory. An operation should never have completed true when invoking safeTryCompleteOrElse. |
For 1) I meant the invocation by run method, I have updated the description. Thanks. For 2) tryComplete invocation is in kafka/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java Line 136 in d3f8979
@chia7712 just rewriting, so we have corrected flow of comments. |
return false; | ||
} | ||
// Attain lock prior completing the request. | ||
lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this change, the lock must be reentrant, correct? If so, we should consider replacing the Lock
type with ReentrantLock
to prevent potential deadlocks.
Additionally, DelayedOperation(long delayMs, Optional<Lock> lockOpt)
seems to lose its meaning once the old group coordinator is removed. We might want to clean that up.
These points can be addressed in a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default implemenation of Lock is over ReentrantLock itself in DelayedOperation and elsewhere, abstracted over Lock API in DelayedOperation. Are you suggesting to make it more tighter and use ReentrantLock explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting to make it more tighter and use ReentrantLock explicitly?
yes, but this change needs to cleanup the code. It can be addressed later if we want to merge this bug fix first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I ll create a folow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was a little bored, so I created Jira KAFKA-19322 (https://issues.apache.org/jira/browse/KAFKA-19322). Feel free to take it over, or I can assign it to a mentee 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are awesome, feel free to assign to a mentee and I can review or else I will pick it up.
@apoorvmittal10 : My understanding is that this PR is not fixing a bug. Without the PR, we still guarantee that onComplete() will be called exactly once. So, this PR just saves some unnecessary work after the operation completes. Is that correct? |
It does both, without the PR, though we guarantee that onComplete will be called exactly once but there can be a thread which is inside onComplete (when timeout occurs) and a separate thread invoking tryComplete. For DelayedShareFetch, prior this PR, a thread can be waiting in onComplete for lock which has already marked |
I ran some tests and performance is same. After the chages:
Before changes:
|
Hmm, is this true? tryComplete will only release the acquired locks if forceComplete() returns true, right? |
Yes it's true, it can happen in current implementation. This line when invoked from tryComplete can release the locks, which is correct but conflicts with concurrent execution on request timeout. To make sure that our acquired share partitions release is correct, we added a check here. In our performance runs we saw the issue. However it will not cause issue per se for share fetches, as SharePartition will not acquire record offsets which are already acquired but it will have some wasted fetches. |
@apoorvmittal10 : Thanks for the explanation. It seems that we found a case that an acquired sharePartition lock was released more than once. I am still not sure how it happened. forceComplete() is supposed to only return true once and the lock is only released when forceComplete() returns true. How is the acquired lock released a second time? |
Prior to this PR there is no lock in forceComplete but a way to make sure that onComplete is not executed twice. However, consider that thread 1 has started executing safeTryComplete which attains lock and checks that if the operation has marked completed, if not then tryComplete is executed. Now thread 1 is executing tryComplete, then thread 2 on timeout executes forceComplete which marks completed as true and waits here for the operation lock. Now thread 1, in tryComplete, acquires some share partitions but as completed is already marked true then the share partitions are released and so the lock for delayed operation. Thread 2 now starts executing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the explanation. It's clear to me why this is a bug now. A couple of comments.
* <p> | ||
* On operation timeout, onComplete is invoked, last try occurs to acquire partitions and read | ||
* from log, if acquired. The fetch will only happen from local log and not remote storage, on | ||
* operation expiration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better consistency, should we remove released partitions from partitionsAcquired
in releasePartitionLocks()
? It seems that if we had done that, we wouldn't get into the situation of releasing the lock more than once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it but it complicates the code execution in DelayedShareFetch. Clearing partitionsAcquired alone is not sufficient, as there could be pending remote fetch as well which might need additional time to finish. Hence I didn't go with that approach.
return true; | ||
} else { | ||
// Do not proceed if the operation is already completed. | ||
if (completed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An implication of this change is that when forceComplete() is called inside tryComplete
, it always returns true. Currently, we have multiple places that check the return value of forceComplete() inside tryComplete
. Should we adjust those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can revert it to AtomicBoolean#compareAndSet but I didn't understand the problem. There would only be first execution of forceComplete which should be successful and return true. Subsequent executions will return false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao that is a good point. After avoiding concurrent execution, forceComplete
MUST return true if it is executed by tryComplete
. Hence, checking the returned value of forceComplete
gets meaningless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so are we saying that first execution of forceComplete from tryComplete will return true? And as the operation is marked completed hence no further execution of tryComplete can trigger, which makes all executions of forceComplete within tryComplete always returns true?
I have removed the check in DelayedShareFetch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so are we saying that first execution of forceComplete from tryComplete will return true? And as the operation is marked completed hence no further execution of tryComplete can trigger, which makes all executions of forceComplete within tryComplete always returns true?
yes, we are on the same page. I hope I'm not misunderstanding @junrao comment :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. A couple of more comments.
onComplete(); | ||
return true; | ||
} else { | ||
// Do not proceed if the operation is already completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the expiration logic checks the return value of forceComplete(). We could do the following in the expiration logic and change forceComplete()
to return void and avoid using the lock there (the caller will get the lock instead).
lock.lock();
try {
if (!isCompleted()) {
forceComplete();
onExpiration();
}
} finally {
lock.unlock();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and change forceComplete() to return void
Though I agree that now if forceComplete is invoked by tryComplete then it will return true but tryComplete still need to return boolean hence all places where we return the value from tryComplete based on forceComplete has to be changed and also requires explicit return true
from all the places. I am of the opinion to not do this. Or if you think we should then shall we take it as a separate PR for just refactor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the issue is that there are quite a few places where tryComplete() calls forceComplete() and checks the return value of forceComplete(). You fixed the issue in DelayedShareFetch, but not in other places. As we evolve the code, it would be useful to leave the code base in a better place for future developers, even though it takes a bit of more work. This can be done in a followup jira. Filed https://issues.apache.org/jira/browse/KAFKA-19325 to track it.
server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. Just a minor comment.
onComplete(); | ||
return true; | ||
} else { | ||
// Do not proceed if the operation is already completed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the issue is that there are quite a few places where tryComplete() calls forceComplete() and checks the return value of forceComplete(). You fixed the issue in DelayedShareFetch, but not in other places. As we evolve the code, it would be useful to leave the code base in a better place for future developers, even though it takes a bit of more work. This can be done in a followup jira. Filed https://issues.apache.org/jira/browse/KAFKA-19325 to track it.
* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls | ||
* forceComplete(). | ||
* Once an operation is completed, isCompleted() will return true. onComplete() is called from forceComplete(), | ||
* which is triggered once by either expiration if the operation is not completed after delayMs, or tryComplete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it's possible for forceComplete() to be called once from tryComplete() and another from expiration. So, we need to remove once
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@apoorvmittal10 : Thanks for the updated PR. LGTM
…xternal lock (#19798) Remove the DelayedOperation constructor that accepts an external lock. According to this [PR](#19759). Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
The
onComplete
method in DelayedOperation is guaranteed to run onlyonce, through
forceComplete
, invoked either bytryComplete
or whenoperation is expired (
run
method). The invocation oftryComplete
isdone by attaining
lock
so no concurrent execution oftryComplete
happens for same delayed operation. However, there can be concurrent
execution of
tryComplete
andonComplete
as theexpiration
threadcan trigger a separte run of
onComplete
whiletryComplete
is stillexecuting. This behaviour is not ideal as there are parallel runs where
1 threads method execution is wasteful i.e. if
onComplete
is alreadyinvoked by another thread then execution of
tryComplete
is notrequired.
Reviewers: Jun Rao [email protected], Andrew Schofield
[email protected], Chia-Ping Tsai [email protected]