-
-
Notifications
You must be signed in to change notification settings - Fork 564
Fix #4382: Fix thread leak in WSTP by replacing LinkedTransferQueue with SynchronousQueue and ConcurrentHashMap #4388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
16f2616
c5f36ff
869c954
cafe75a
cbdad86
40a00ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait} | |
| import scala.concurrent.duration.{Duration, FiniteDuration} | ||
|
|
||
| import java.lang.Long.MIN_VALUE | ||
| import java.util.concurrent.{ArrayBlockingQueue, ThreadLocalRandom} | ||
| import java.util.concurrent.ThreadLocalRandom | ||
| import java.util.concurrent.atomic.AtomicBoolean | ||
|
|
||
| import WorkerThread.{Metrics, TransferState} | ||
|
|
@@ -110,7 +110,6 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
| */ | ||
| private[this] var _active: Runnable = _ | ||
|
|
||
| private val stateTransfer: ArrayBlockingQueue[TransferState] = new ArrayBlockingQueue(1) | ||
| private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration | ||
|
|
||
| private[effect] var currentIOFiber: IOFiber[?] = _ | ||
|
|
@@ -732,20 +731,27 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
| // by another thread in the future. | ||
| val len = runtimeBlockingExpiration.length | ||
| val unit = runtimeBlockingExpiration.unit | ||
| if (pool.cachedThreads.tryTransfer(this, len, unit)) { | ||
| // Someone accepted the transfer of this thread and will transfer the state soon. | ||
| val newState = stateTransfer.take() | ||
|
|
||
| // Try to poll for a new state from the transfer queue | ||
| val newState = pool.transferStateQueue.poll(len, unit) | ||
|
|
||
| if (newState ne null) { | ||
| // Got a state to take over | ||
| init(newState) | ||
|
|
||
| } else { | ||
| // The timeout elapsed and no one woke up this thread. It's time to exit. | ||
| // No state to take over after timeout, exit | ||
| pool.blockedWorkerThreadCounter.decrementAndGet() | ||
| // Remove from blocker threads map if present | ||
| pool.blockerThreads.remove(this) | ||
| return | ||
| } | ||
| } catch { | ||
| case _: InterruptedException => | ||
| // This thread was interrupted while cached. This should only happen | ||
| // during the shutdown of the pool. Nothing else to be done, just | ||
| // exit. | ||
| pool.blockerThreads.remove(this) | ||
| return | ||
| } | ||
| } | ||
|
|
@@ -928,15 +934,18 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
| // Set the name of this thread to a blocker prefixed name. | ||
| setName(s"$prefix-$nameIndex") | ||
|
|
||
| val cached = pool.cachedThreads.poll() | ||
| if (cached ne null) { | ||
| // There is a cached worker thread that can be reused. | ||
| val idx = index | ||
| pool.replaceWorker(idx, cached) | ||
| // Transfer the data structures to the cached thread and wake it up. | ||
| transferState.index = idx | ||
| transferState.tick = tick + 1 | ||
| val _ = cached.stateTransfer.offer(transferState) | ||
| val idx = index | ||
|
|
||
| // Prepare the transfer state | ||
| transferState.index = idx | ||
| transferState.tick = tick + 1 | ||
|
|
||
| val _ = pool.blockerThreads.put(this, java.lang.Boolean.TRUE) | ||
|
|
||
| if (pool.transferStateQueue.offer(transferState)) { | ||
| // If successful, a waiting thread will pick it up | ||
| // Register this thread in the blockerThreads map | ||
|
||
|
|
||
| } else { | ||
| // Spawn a new `WorkerThread`, a literal clone of this one. It is safe to | ||
| // transfer ownership of the local queue and the parked signal to the new | ||
|
|
@@ -961,7 +970,7 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
| system, | ||
| _poller, | ||
| metrics, | ||
| transferState, | ||
| new WorkerThread.TransferState, | ||
|
||
| pool) | ||
| // Make sure the clone gets our old name: | ||
| val clonePrefix = pool.threadPrefix | ||
|
|
@@ -1002,6 +1011,8 @@ private[effect] final class WorkerThread[P <: AnyRef]( | |
| setName(s"$prefix-${_index}") | ||
|
|
||
| blocking = false | ||
|
|
||
| pool.replaceWorker(newIdx, this) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, I think one of the goals here is to avoid any allocations, in case the runtime was shutting down in a fatal condition (e.g. out-of-memory). Unfortunately, creating the iterator is an allocation. But, I don't know how to iterate the elements of a
ConcurrentHashMapwithout an iterator 🤔There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After searching I was also not able to find any allocation free method , it seems we might need to accept this small allocation as a trade-off, currently . I would still search for it and am open to suggestions for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd have to retrace the code, but I think this is an area of secondary concern in terms of allocations. The critical path is ensuring that exceptions can propagate out to the
IO#unsafecalling point without allocation. So long as that is achieved, everything else is gravy. Logically, I don't think WSTPshutdownmatters as much since, in any fatal error case, the process is torpedoed anyway and about to die.