Skip to content

Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap #4388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: series/3.6.x
Choose a base branch
from

Conversation

pantShrey
Copy link

@pantShrey pantShrey commented Apr 19, 2025

Description:
This PR fixes a thread leak in the Work Stealing Thread Pool (Issue #4382), previously caused by FIFO-based reuse of cached blocker threads (introduced in #4295). This FIFO behavior could prevent older cached threads from timing out and exiting during periods of low load, when the number of cached threads exceeds the number of blocking tasks.
The implemented solution transitions to a LIFO-like reuse strategy for cached blocker threads. This is achieved by using a SynchronousQueue for state hand-off between threads (leveraging its typically LIFO behavior for waiting pollers in non-fair mode) and a ConcurrentHashMap for tracking these blocker threads.
Changes:

  1. SynchronousQueue<TransferState> for State Hand-off:
    • WorkerThreads becoming blockers offer their TransferState to this pool-level queue.
    • Cached (previously blocker) WorkerThreads poll this queue to acquire new state. In its non-fair implementation, the JDK's SynchronousQueue typically services waiting pollers in a LIFO manner, prioritizing more recently cached threads for reuse.
    • If an offer isn't immediately taken, the thread preparing to block spawns a new replacement worker to maintain active pool size.
  2. ConcurrentHashMap<WorkerThread, Boolean> for Tracking:
    • All threads that enter the blocker/cached state are now tracked in this map.
  3. WorkerThread Blocker Logic Update:
    • After its blocking call, a blocker thread attempts to poll the SynchronousQueue for new TransferState with a timeout (runtimeBlockingExpiration).
    • Successful polling leads to re-initialization as an active worker. If the poll times out, the thread terminates and is removed from tracking.
  4. Shutdown Logic Modification:
    • WorkStealingThreadPool.shutdown() now iterates and interrupts all threads currently tracked in the blockerThreads map.

@@ -732,7 +732,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// by another thread in the future.
val len = runtimeBlockingExpiration.length
val unit = runtimeBlockingExpiration.unit
if (pool.cachedThreads.tryTransfer(this, len, unit)) {
if (pool.cachedThreads.offerFirst(this, len, unit)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this will always succeed immediately:

Inserts the specified element at the front of this deque, waiting up to the specified wait time if necessary for space to become available.

Because:

The capacity, if unspecified, is equal to Integer.MAX_VALUE

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingDeque.html

private[unsafe] val cachedThreads: LinkedTransferQueue[WorkerThread[P]] =
new LinkedTransferQueue
private[unsafe] val cachedThreads: LinkedBlockingDeque[WorkerThread[P]] =
new LinkedBlockingDeque
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To replicate the old behavior, I think we need to specify a capacity of 0 in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.

Copy link
Author

@pantShrey pantShrey Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To replicate the old behavior, I think we need to specify a capacity of 0 in the constructor (essentially, a synchronous deque). I'm not entirely certain if that's supported.

Capacity of 0 is not allowed
Because

IllegalArgumentException - if capacity is less than 1

the min we can go is 1.
Would that work?

@armanbilge
Copy link
Member

allowing older threads to time out and exit under high load.

This is not quite correct. Under high load (i.e., lots of blocking tasks), then we want the threads to persist so we can reuse them as much as possible.

It's under lower load, when there are more cached threads than blocking tasks, we would like the older threads to time out and exit.

@armanbilge
Copy link
Member

I have a new idea for how to fix this:

  1. We should have a pool-level SynchronousQueue[TransferState].
  2. When a thread transitions to blocking, it should offer its state to the queue. If this fails, it can start a new worker thread to replace itself.
  3. When a thread transitions to cached, it can poll up to the timeout for a new state.
  4. Meanwhile, we can use a ConcurrentHashMap to keep track of blocker threads.

Although officially unspecified, It turns out that the non-fair implementation of SynchronousQueue in the JDK uses a LIFO stack. Although we probably don't want to rely on this in the long-term, I propose that this is good enough to fix the bug for now.

…] and ConcurrentHashMap to keep track of blocker threads.
@@ -714,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
if (blocking) {
// The worker thread was blocked before. It is no longer part of the
// core pool and needs to be cached.

val stateToTransfer = transferState
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to do this to avoid NPE in pool.stateTransferQueue.offer(st), but I think this is causing the error

[error] x blocking work does not starve poll
[error] None is not Some (IOPlatformSpecification.scala:702)

Comment on lines 935 to 943
pool.replaceWorker(idx, cached)
pool.replaceWorker(idx, this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change doesn't make sense. The old code used to take a thread out of the cache and promote it to the idxth worker thread, to replace this thread which is about to block.. The new code tries to replace this thread with itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ok I see it now, will try to fix this .

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of adding a thread reference field in Transfer state to pass on the cached WorkerThread

Copy link
Member

@armanbilge armanbilge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, this is looking good!

Comment on lines 964 to 973
transferState,
new WorkerThread.TransferState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking for changing it to new WorkerThread.TransferState() for the clone was to ensure the clone would have its own, distinct TransferState object ready for any of its own future blocking scenarios.
I was concerned that if the clone reused the TransferState object from the original thread (which is now a blocker), we might run into ambiguity or issues. For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.
But am not sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was concerned that if the clone reused the TransferState object from the original thread (which is now a blocker), we might run into ambiguity or issues. For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.

I see. This is a good thing to be concerned about, but I think we should be safe. Admittedly, maybe this is a micro-optimization to try avoid an allocation that is dwarfed by creating a new thread anyway 😅

Conceptually, we can imagine that the ownership of the TransferState is being passed from the blocker thread to the clone thread. In fact, this is essentially identical to when we pass the TransferState via the synchronized queue, the only thing that is different in this case is that the receiving thread is brand new (instead of coming out of the cache).

For instance, if the original thread (now a blocker) were to later interact with that same TransferState object.

Once a thread becomes a blocker, it no longer has state to transfer to anyone. Instead, after completing the blocking operation, it will wait to receive a new TransferState.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok sure, thanks for sharing this , I will revert this change .


if (pool.transferStateQueue.offer(transferState)) {
// If successful, a waiting thread will pick it up
// Register this thread in the blockerThreads map
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this comment. Doesn't the registration happen above?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely correct. My apologies, I forgot to shift the comment along with the code

Comment on lines -752 to +757
var t: WorkerThread[P] = null
while ({
t = cachedThreads.poll()
t ne null
}) {
val it = blockerThreads.keySet().iterator()
while (it.hasNext()) {
val t = it.next()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I think one of the goals here is to avoid any allocations, in case the runtime was shutting down in a fatal condition (e.g. out-of-memory). Unfortunately, creating the iterator is an allocation. But, I don't know how to iterate the elements of a ConcurrentHashMap without an iterator 🤔

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After searching I was also not able to find any allocation free method , it seems we might need to accept this small allocation as a trade-off, currently . I would still search for it and am open to suggestions for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have to retrace the code, but I think this is an area of secondary concern in terms of allocations. The critical path is ensuring that exceptions can propagate out to the IO#unsafe calling point without allocation. So long as that is achieved, everything else is gravy. Logically, I don't think WSTP shutdown matters as much since, in any fatal error case, the process is torpedoed anyway and about to die.

@pantShrey pantShrey changed the title Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with LinkedBlockingDeque Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap May 23, 2025
@pantShrey pantShrey force-pushed the wstp-thread-leak-4382 branch from ec7abc7 to 40a00ca Compare May 23, 2025 07:28
@djspiewak
Copy link
Member

Quick drive-by sanity check: is it really that hard for us to build a blocking concurrent LIFO structure backed by an array? It seems like doing so would be more testable and a bit more clear than what's going on here, as well as offering us a lot more control over things like allocations and performance tradeoffs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants