Skip to content

Commit adb7677

Browse files
KAFKA-19312 Avoiding concurrent execution of onComplete and tryComplete (#19759)
The `onComplete` method in DelayedOperation is guaranteed to run only once, through `forceComplete`, invoked either by `tryComplete` or when operation is expired (`run` method). The invocation of `tryComplete` is done by attaining `lock` so no concurrent execution of `tryComplete` happens for same delayed operation. However, there can be concurrent execution of `tryComplete` and `onComplete` as the `expiration` thread can trigger a separte run of `onComplete` while `tryComplete` is still executing. This behaviour is not ideal as there are parallel runs where 1 threads method execution is wasteful i.e. if `onComplete` is already invoked by another thread then execution of `tryComplete` is not required. I ran some tests and performance is same. ### After the chages: ``` --num 10000 --rate 100 --timeout 1000 --pct50 0.5 --pct75 0.75 # latency samples: pct75 = 0, pct50 = 0, min = 0, max = 7 # interval samples: rate = 100.068948, min = 0, max = 129 # enqueue rate (10000 requests): # <elapsed time ms> <target rate> <actual rate> <process cpu time ms> <G1 Old Generation count> <G1 Young Generation count> <G1 Old Generation time ms> <G1 Young Generation time ms> 101196 99.809364 99.806376 3240 0 2 0 8 ``` ``` --num 10000 --rate 1000 --timeout 1000 --pct50 0.5 --pct75 0.75 # latency samples: pct75 = 0, pct50 = 0, min = 0, max = 9 # interval samples: rate = 999.371395, min = 0, max = 14 # enqueue rate (10000 requests): # <elapsed time ms> <target rate> <actual rate> <process cpu time ms> <G1 Old Generation count> <G1 Young Generation count> <G1 Old Generation time ms> <G1 Young Generation time ms> 11104 989.902990 989.805008 1349 0 2 0 7 ``` ### Before changes: ``` --num 10000 --rate 100 --timeout 1000 --pct50 0.5 --pct75 0.75 # latency samples: pct75 = 0, pct50 = 0, min = 0, max = 9 # interval samples: rate = 100.020304, min = 0, max = 130 # enqueue rate (10000 requests): # <elapsed time ms> <target rate> <actual rate> <process cpu time ms> <G1 Old Generation count> <G1 Young Generation count> <G1 Old Generation time ms> <G1 Young Generation time ms> 102366 98.657274 98.652408 3444 0 2 0 8 --num 10000 --rate 1000 --timeout 1000 --pct50 0.5 --pct75 0.75 # latency samples: pct75 = 0, pct50 = 0, min = 0, max = 8 # interval samples: rate = 997.134236, min = 0, max = 14 # enqueue rate (10000 requests): # <elapsed time ms> <target rate> <actual rate> <process cpu time ms> <G1 Old Generation count> <G1 Young Generation count> <G1 Old Generation time ms> <G1 Young Generation time ms> 11218 978.665101 978.665101 1624 0 2 0 7 Reviewers: Jun Rao <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 1407b12 commit adb7677

File tree

2 files changed

+43
-48
lines changed

2 files changed

+43
-48
lines changed

core/src/main/java/kafka/server/share/DelayedShareFetch.java

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -202,32 +202,29 @@ public void onExpiration() {
202202
* Complete the share fetch operation by fetching records for all partitions in the share fetch request irrespective
203203
* of whether they have any acquired records. This is called when the fetch operation is forced to complete either
204204
* because records can be acquired for some partitions or due to MaxWaitMs timeout.
205+
* <p>
206+
* On operation timeout, onComplete is invoked, last try occurs to acquire partitions and read
207+
* from log, if acquired. The fetch will only happen from local log and not remote storage, on
208+
* operation expiration.
205209
*/
206210
@Override
207211
public void onComplete() {
208-
// We are utilizing lock so that onComplete doesn't do a dirty read for instance variables -
209-
// partitionsAcquired and localPartitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
210-
lock.lock();
211212
log.trace("Completing the delayed share fetch request for group {}, member {}, "
212213
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
213214
partitionsAcquired.keySet());
214215

215-
try {
216-
if (remoteStorageFetchException.isPresent()) {
217-
completeErroneousRemoteShareFetchRequest();
218-
} else if (pendingRemoteFetchesOpt.isPresent()) {
219-
if (maybeRegisterCallbackPendingRemoteFetch()) {
220-
log.trace("Registered remote storage fetch callback for group {}, member {}, "
221-
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
222-
partitionsAcquired.keySet());
223-
return;
224-
}
225-
completeRemoteStorageShareFetchRequest();
226-
} else {
227-
completeLocalLogShareFetchRequest();
216+
if (remoteStorageFetchException.isPresent()) {
217+
completeErroneousRemoteShareFetchRequest();
218+
} else if (pendingRemoteFetchesOpt.isPresent()) {
219+
if (maybeRegisterCallbackPendingRemoteFetch()) {
220+
log.trace("Registered remote storage fetch callback for group {}, member {}, "
221+
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
222+
partitionsAcquired.keySet());
223+
return;
228224
}
229-
} finally {
230-
lock.unlock();
225+
completeRemoteStorageShareFetchRequest();
226+
} else {
227+
completeLocalLogShareFetchRequest();
231228
}
232229
}
233230

@@ -358,15 +355,15 @@ public boolean tryComplete() {
358355
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) {
359356
partitionsAcquired = topicPartitionData;
360357
localPartitionsAlreadyFetched = replicaManagerReadResponse;
361-
return forceCompleteRequest();
358+
return forceComplete();
362359
} else {
363360
log.debug("minBytes is not satisfied for the share fetch request for group {}, member {}, " +
364361
"topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
365362
sharePartitions.keySet());
366363
releasePartitionLocks(topicPartitionData.keySet());
367364
}
368365
} else {
369-
log.trace("Can't acquire records for any partition in the share fetch request for group {}, member {}, " +
366+
log.trace("Can't acquire any partitions in the share fetch request for group {}, member {}, " +
370367
"topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
371368
sharePartitions.keySet());
372369
}
@@ -381,10 +378,8 @@ public boolean tryComplete() {
381378
releasePartitionLocks(topicPartitionData.keySet());
382379
partitionsAcquired.clear();
383380
localPartitionsAlreadyFetched.clear();
384-
return forceCompleteRequest();
385-
} else {
386-
return forceCompleteRequest();
387381
}
382+
return forceComplete();
388383
}
389384
}
390385

@@ -785,7 +780,7 @@ private boolean maybeCompletePendingRemoteFetch() {
785780
}
786781

787782
if (canComplete || pendingRemoteFetchesOpt.get().isDone()) { // Case d
788-
return forceCompleteRequest();
783+
return forceComplete();
789784
} else
790785
return false;
791786
}
@@ -944,16 +939,6 @@ private void cancelRemoteFetchTask(RemoteFetch remoteFetch) {
944939
}
945940
}
946941

947-
private boolean forceCompleteRequest() {
948-
boolean completedByMe = forceComplete();
949-
// If the delayed operation is completed by me, the acquired locks are already released in onComplete().
950-
// Otherwise, we need to release the acquired locks.
951-
if (!completedByMe) {
952-
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
953-
}
954-
return completedByMe;
955-
}
956-
957942
private void completeRemoteShareFetchRequestOutsidePurgatory() {
958943
if (outsidePurgatoryCallbackLock.compareAndSet(false, true)) {
959944
completeRemoteStorageShareFetchRequest();

server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperation.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.kafka.server.util.timer.TimerTask;
2020

2121
import java.util.Optional;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322
import java.util.concurrent.locks.Lock;
2423
import java.util.concurrent.locks.ReentrantLock;
2524

@@ -29,10 +28,9 @@
2928
* a delayed fetch operation could be waiting for a given number of bytes to accumulate.
3029
* <br/>
3130
* The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
32-
* Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either
33-
* forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
34-
* or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
35-
* forceComplete().
31+
* Once an operation is completed, isCompleted() will return true. onComplete() is called from forceComplete(),
32+
* which is triggered by either expiration, if the operation is not completed after delayMs; or tryComplete(),
33+
* if the operation can be completed now.
3634
* <br/>
3735
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
3836
* <br/>
@@ -41,7 +39,7 @@
4139
*/
4240
public abstract class DelayedOperation extends TimerTask {
4341

44-
private final AtomicBoolean completed = new AtomicBoolean(false);
42+
private volatile boolean completed = false;
4543

4644
protected final Lock lock;
4745

@@ -68,24 +66,36 @@ public DelayedOperation(long delayMs, Lock lock) {
6866
* Return true iff the operation is completed by the caller: note that
6967
* concurrent threads can try to complete the same operation, but only
7068
* the first thread will succeed in completing the operation and return
71-
* true, others will still return false
69+
* true, others will still return false.
7270
*/
7371
public boolean forceComplete() {
74-
if (completed.compareAndSet(false, true)) {
75-
// cancel the timeout timer
76-
cancel();
77-
onComplete();
78-
return true;
79-
} else {
72+
// Do not proceed if the operation is already completed.
73+
if (completed) {
8074
return false;
8175
}
76+
// Attain lock prior completing the request.
77+
lock.lock();
78+
try {
79+
// Re-check, if the operation is already completed by some other thread.
80+
if (!completed) {
81+
completed = true;
82+
// cancel the timeout timer
83+
cancel();
84+
onComplete();
85+
return true;
86+
} else {
87+
return false;
88+
}
89+
} finally {
90+
lock.unlock();
91+
}
8292
}
8393

8494
/**
8595
* Check if the delayed operation is already completed
8696
*/
8797
public boolean isCompleted() {
88-
return completed.get();
98+
return completed;
8999
}
90100

91101
/**

0 commit comments

Comments
 (0)