Skip to content

Commit cbdad86

Browse files
committed
Fixed several inconsistencies and reimplemented the SynchronousQueue and ConcurrentHashMap idea
1 parent cafe75a commit cbdad86

File tree

2 files changed

+38
-34
lines changed

2 files changed

+38
-34
lines changed

core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
131131
*/
132132
private[this] val state: AtomicInteger = new AtomicInteger(threadCount << UnparkShift)
133133

134-
private[unsafe] val stateTransferQueue: SynchronousQueue[WorkerThread.TransferState] =
134+
private[unsafe] val transferStateQueue: SynchronousQueue[WorkerThread.TransferState] =
135135
new SynchronousQueue[WorkerThread.TransferState](false)
136136

137137
private[unsafe] val blockerThreads: ConcurrentHashMap[WorkerThread[P], java.lang.Boolean] =
@@ -752,11 +752,12 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
752752
system.close()
753753
}
754754

755-
var ts: WorkerThread.TransferState = new WorkerThread.TransferState()
756-
while ({
757-
ts = stateTransferQueue.poll()
758-
ts ne null
759-
}) {}
755+
val it = blockerThreads.keySet().iterator()
756+
while (it.hasNext()) {
757+
val t = it.next()
758+
t.interrupt()
759+
// don't bother joining, cached threads are not doing anything interesting
760+
}
760761

761762
// Drain the external queue.
762763
externalQueue.clear()

core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.concurrent.{BlockContext, CanAwait}
2626
import scala.concurrent.duration.{Duration, FiniteDuration}
2727

2828
import java.lang.Long.MIN_VALUE
29-
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
29+
import java.util.concurrent.ThreadLocalRandom
3030
import java.util.concurrent.atomic.AtomicBoolean
3131

3232
import WorkerThread.{Metrics, TransferState}
@@ -713,7 +713,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
713713
if (blocking) {
714714
// The worker thread was blocked before. It is no longer part of the
715715
// core pool and needs to be cached.
716-
val stateToTransfer = transferState
716+
717717
// First of all, remove the references to data structures of the core
718718
// pool because they have already been transferred to another thread
719719
// which took the place of this one.
@@ -727,32 +727,31 @@ private[effect] final class WorkerThread[P <: AnyRef](
727727
transferState = null
728728

729729
try {
730-
// Try to transfer this thread via the stateTransferQueue to be picked up
730+
// Try to transfer this thread via the cached threads data structure, to be picked up
731731
// by another thread in the future.
732-
val st = stateToTransfer
733732
val len = runtimeBlockingExpiration.length
734733
val unit = runtimeBlockingExpiration.unit
735-
val timeoutNanos = unit.toNanos(len)
736734

737-
if (pool.stateTransferQueue.offer(st)) {
738-
// Someone accepted the transfer of this thread and will transfer the state soon.
739-
val newState = pool.stateTransferQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS)
740-
if (newState ne null) {
741-
init(newState)
742-
} else {
743-
// The timeout elapsed and no one woke up this thread. It's time to exit.
744-
pool.blockedWorkerThreadCounter.decrementAndGet()
745-
return
746-
}
735+
// Try to poll for a new state from the transfer queue
736+
val newState = pool.transferStateQueue.poll(len, unit)
737+
738+
if (newState ne null) {
739+
// Got a state to take over
740+
init(newState)
741+
747742
} else {
748-
// Nobody polling, spawn new replacement was done in prepareForBlocking
743+
// No state to take over after timeout, exit
749744
pool.blockedWorkerThreadCounter.decrementAndGet()
745+
// Remove from blocker threads map if present
746+
pool.blockerThreads.remove(this)
750747
return
751748
}
752749
} catch {
753750
case _: InterruptedException =>
754751
// This thread was interrupted while cached. This should only happen
755-
// during the shutdown of the pool. Nothing else to be done, just exit.
752+
// during the shutdown of the pool. Nothing else to be done, just
753+
// exit.
754+
pool.blockerThreads.remove(this)
756755
return
757756
}
758757
}
@@ -935,16 +934,18 @@ private[effect] final class WorkerThread[P <: AnyRef](
935934
// Set the name of this thread to a blocker prefixed name.
936935
setName(s"$prefix-$nameIndex")
937936

938-
val ts = transferState
939-
val available = pool.stateTransferQueue.poll(0, TimeUnit.MILLISECONDS)
940-
if (available ne null) {
941-
// There is a cached worker thread that can be reused.
942-
val idx = index
943-
pool.replaceWorker(idx, this)
944-
// Transfer the data structures to the cached thread and wake it up.
945-
ts.index = idx
946-
ts.tick = tick + 1
947-
val _ = pool.stateTransferQueue.offer(ts)
937+
val idx = index
938+
939+
// Prepare the transfer state
940+
transferState.index = idx
941+
transferState.tick = tick + 1
942+
943+
val _ = pool.blockerThreads.put(this, java.lang.Boolean.TRUE)
944+
945+
if (pool.transferStateQueue.offer(transferState)) {
946+
// If successful, a waiting thread will pick it up
947+
// Register this thread in the blockerThreads map
948+
948949
} else {
949950
// Spawn a new `WorkerThread`, a literal clone of this one. It is safe to
950951
// transfer ownership of the local queue and the parked signal to the new
@@ -969,7 +970,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
969970
system,
970971
_poller,
971972
metrics,
972-
transferState,
973+
new WorkerThread.TransferState,
973974
pool)
974975
// Make sure the clone gets our old name:
975976
val clonePrefix = pool.threadPrefix
@@ -1010,6 +1011,8 @@ private[effect] final class WorkerThread[P <: AnyRef](
10101011
setName(s"$prefix-${_index}")
10111012

10121013
blocking = false
1014+
1015+
pool.replaceWorker(newIdx, this)
10131016
}
10141017

10151018
/**

0 commit comments

Comments
 (0)