-
Notifications
You must be signed in to change notification settings - Fork 557
Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap #4388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/3.6.x
Are you sure you want to change the base?
Conversation
@@ -732,7 +732,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |||
// by another thread in the future. | |||
val len = runtimeBlockingExpiration.length | |||
val unit = runtimeBlockingExpiration.unit | |||
if (pool.cachedThreads.tryTransfer(this, len, unit)) { | |||
if (pool.cachedThreads.offerFirst(this, len, unit)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this will always succeed immediately:
Inserts the specified element at the front of this deque, waiting up to the specified wait time if necessary for space to become available.
Because:
The capacity, if unspecified, is equal to
Integer.MAX_VALUE
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingDeque.html
private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] = | ||
new LinkedTransferQueue | ||
private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] = | ||
new LinkedBlockingDeque |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replicate the old behavior, I think we need to specify a capacity of 0
in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To replicate the old behavior, I think we need to specify a capacity of
0
in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.
Capacity of 0
is not allowed
Because
IllegalArgumentException - if capacity is less than 1
the min we can go is 1
.
Would that work?
This is not quite correct. Under high load (i.e., lots of blocking tasks), then we want the threads to persist so we can reuse them as much as possible. It's under lower load, when there are more cached threads than blocking tasks, we would like the older threads to time out and exit. |
I have a new idea for how to fix this:
Although officially unspecified, It turns out that the non-fair implementation of |
…] and ConcurrentHashMap to keep track of blocker threads.
@@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |||
if (blocking) { | |||
// The worker thread was blocked before. It is no longer part of the | |||
// core pool and needs to be cached. | |||
|
|||
val stateToTransfer = transferState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to do this to avoid NPE in pool.stateTransferQueue.offer(st)
, but I think this is causing the error
[error] x blocking work does not starve poll
[error] None is not Some (IOPlatformSpecification.scala:702)
pool.replaceWorker(idx, cached) | ||
pool.replaceWorker(idx, this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the idx
th worker thread, to replace this
thread which is about to block.. The new code tries to replace this thread with itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh ok I see it now, will try to fix this .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread
…and ConcurrentHashMap idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, this is looking good!
transferState, | ||
new WorkerThread.TransferState, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking for changing it to new WorkerThread.TransferState() for the clone was to ensure the clone would have its own, distinct TransferState object ready for any of its own future blocking scenarios.
I was concerned that if the clone reused the TransferState object from the original thread (which is now a blocker), we might run into ambiguity or issues. For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.
But am not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was concerned that if the clone reused the TransferState object from the original thread (which is now a blocker), we might run into ambiguity or issues. For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.
I see. This is a good thing to be concerned about, but I think we should be safe. Admittedly, maybe this is a micro-optimization to try avoid an allocation that is dwarfed by creating a new thread anyway 😅
Conceptually, we can imagine that the ownership of the TransferState
is being passed from the blocker thread to the clone thread. In fact, this is essentially identical to when we pass the TransferState
via the synchronized queue, the only thing that is different in this case is that the receiving thread is brand new (instead of coming out of the cache).
For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.
Once a thread becomes a blocker, it no longer has state to transfer to anyone. Instead, after completing the blocking operation, it will wait to receive a new TransferState
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok sure, thanks for sharing this , I will revert this change .
|
||
if (pool.transferStateQueue.offer(transferState)) { | ||
// If successful, a waiting thread will pick it up | ||
// Register this thread in the blockerThreads map |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this comment. Doesn't the registration happen above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are absolutely correct. My apologies, I forgot to shift the comment along with the code
var t: WorkerThread[P] = null | ||
while ({ | ||
t = cachedThreads.poll() | ||
t ne null | ||
}) { | ||
val it = blockerThreads.keySet().iterator() | ||
while (it.hasNext()) { | ||
val t = it.next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, I think one of the goals here is to avoid any allocations, in case the runtime was shutting down in a fatal condition (e.g. out-of-memory). Unfortunately, creating the iterator is an allocation. But, I don't know how to iterate the elements of a ConcurrentHashMap
without an iterator 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After searching I was also not able to find any allocation free method , it seems we might need to accept this small allocation as a trade-off, currently . I would still search for it and am open to suggestions for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have to retrace the code, but I think this is an area of secondary concern in terms of allocations. The critical path is ensuring that exceptions can propagate out to the IO#unsafe
calling point without allocation. So long as that is achieved, everything else is gravy. Logically, I don't think WSTP shutdown
matters as much since, in any fatal error case, the process is torpedoed anyway and about to die.
ec7abc7
to
40a00ca
Compare
Quick drive-by sanity check: is it really that hard for us to build a blocking concurrent LIFO structure backed by an array? It seems like doing so would be more testable and a bit more clear than what's going on here, as well as offering us a lot more control over things like allocations and performance tradeoffs. |
Description:
This PR fixes a thread leak in the Work Stealing Thread Pool (Issue #4382), previously caused by FIFO-based reuse of cached blocker threads (introduced in #4295). This FIFO behavior could prevent older cached threads from timing out and exiting during periods of low load, when the number of cached threads exceeds the number of blocking tasks.
The implemented solution transitions to a LIFO-like reuse strategy for cached blocker threads. This is achieved by using a
SynchronousQueue
for state hand-off between threads (leveraging its typically LIFO behavior for waiting pollers in non-fair mode) and aConcurrentHashMap
for tracking these blocker threads.Changes:
SynchronousQueue<TransferState>
for State Hand-off:WorkerThreads
becoming blockersoffer
theirTransferState
to this pool-level queue.WorkerThreads
poll
this queue to acquire new state. In its non-fair implementation, the JDK'sSynchronousQueue
typically services waiting pollers in a LIFO manner, prioritizing more recently cached threads for reuse.offer
isn't immediately taken, the thread preparing to block spawns a new replacement worker to maintain active pool size.ConcurrentHashMap<WorkerThread, Boolean>
for Tracking:WorkerThread
Blocker Logic Update:poll
theSynchronousQueue
for newTransferState
with a timeout (runtimeBlockingExpiration
).WorkStealingThreadPool.shutdown()
now iterates and interrupts all threads currently tracked in theblockerThreads
map.