diff --git a/core/js-native/src/main/scala/cats/effect/IOFiberPlatform.scala b/core/js/src/main/scala/cats/effect/IOFiberPlatform.scala similarity index 100% rename from core/js-native/src/main/scala/cats/effect/IOFiberPlatform.scala rename to core/js/src/main/scala/cats/effect/IOFiberPlatform.scala diff --git a/core/jvm-native/src/main/scala/cats/effect/IOCompanionMultithreadedPlatform.scala b/core/jvm-native/src/main/scala/cats/effect/IOCompanionMultithreadedPlatform.scala new file mode 100644 index 0000000000..d6dc40d701 --- /dev/null +++ b/core/jvm-native/src/main/scala/cats/effect/IOCompanionMultithreadedPlatform.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.std.Console +import cats.effect.tracing.Tracing + +import java.time.{Instant, ZonedDateTime} + +private[effect] abstract class IOCompanionMultithreadedPlatform { this: IO.type => + private[this] val TypeDelay = Sync.Type.Delay + private[this] val TypeBlocking = Sync.Type.Blocking + private[this] val TypeInterruptibleOnce = Sync.Type.InterruptibleOnce + private[this] val TypeInterruptibleMany = Sync.Type.InterruptibleMany + + /** + * Intended for thread blocking operations. `blocking` will shift the execution of the + * blocking operation to a separate threadpool to avoid blocking on the main execution + * context. See the thread-model documentation for more information on why this is necessary. + * Note that the created effect will be uncancelable; if you need cancelation then you should + * use [[interruptible[A](thunk:=>A):*]] or [[interruptibleMany]]. + * + * {{{ + * IO.blocking(scala.io.Source.fromFile("path").mkString) + * }}} + * + * @param thunk + * The side effect which is to be suspended in `IO` and evaluated on a blocking execution + * context + * + * Implements [[cats.effect.kernel.Sync.blocking]]. + */ + def blocking[A](thunk: => A): IO[A] = { + val fn = () => thunk + Blocking(TypeBlocking, fn, Tracing.calculateTracingEvent(fn.getClass)) + } + + // this cannot be marked private[effect] because of static forwarders in Java + @deprecated("use interruptible / interruptibleMany instead", "3.3.0") + def interruptible[A](many: Boolean, thunk: => A): IO[A] = { + val fn = () => thunk + Blocking( + if (many) TypeInterruptibleMany else TypeInterruptibleOnce, + fn, + Tracing.calculateTracingEvent(fn.getClass)) + } + + /** + * Like [[blocking]] but will attempt to abort the blocking operation using thread interrupts + * in the event of cancelation. The interrupt will be attempted only once. + * + * Note the following tradeoffs: + * - this has slightly more overhead than [[blocking]] due to the machinery necessary for + * the interrupt coordination, + * - thread interrupts are very often poorly considered by Java (and Scala!) library + * authors, and it is possible for interrupts to result in resource leaks or invalid + * states. It is important to be certain that this will not be the case before using this + * mechanism. + * + * @param thunk + * The side effect which is to be suspended in `IO` and evaluated on a blocking execution + * context + * + * Implements [[cats.effect.kernel.Sync.interruptible[A](thunk:=>A):*]] + */ + def interruptible[A](thunk: => A): IO[A] = { + val fn = () => thunk + Blocking(TypeInterruptibleOnce, fn, Tracing.calculateTracingEvent(fn.getClass)) + } + + /** + * Like [[blocking]] but will attempt to abort the blocking operation using thread interrupts + * in the event of cancelation. The interrupt will be attempted repeatedly until the blocking + * operation completes or exits. + * + * Note the following tradeoffs: + * - this has slightly more overhead than [[blocking]] due to the machinery necessary for + * the interrupt coordination, + * - thread interrupts are very often poorly considered by Java (and Scala!) library + * authors, and it is possible for interrupts to result in resource leaks or invalid + * states. It is important to be certain that this will not be the case before using this + * mechanism. + * + * @param thunk + * The side effect which is to be suspended in `IO` and evaluated on a blocking execution + * context + * + * Implements [[cats.effect.kernel.Sync!.interruptibleMany]] + */ + def interruptibleMany[A](thunk: => A): IO[A] = { + val fn = () => thunk + Blocking(TypeInterruptibleMany, fn, Tracing.calculateTracingEvent(fn.getClass)) + } + + def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] = + if (hint eq TypeDelay) + apply(thunk) + else { + val fn = () => thunk + Blocking(hint, fn, Tracing.calculateTracingEvent(fn.getClass)) + } + + def realTimeInstant: IO[Instant] = asyncForIO.realTimeInstant + + def realTimeZonedDateTime: IO[ZonedDateTime] = asyncForIO.realTimeZonedDateTime + + /** + * Reads a line as a string from the standard input using the platform's default charset, as + * per `java.nio.charset.Charset.defaultCharset()`. + * + * The effect can raise a `java.io.EOFException` if no input has been consumed before the EOF + * is observed. This should never happen with the standard input, unless it has been replaced + * with a finite `java.io.InputStream` through `java.lang.System#setIn` or similar. + * + * @see + * `cats.effect.std.Console#readLineWithCharset` for reading using a custom + * `java.nio.charset.Charset` + * + * @return + * an IO effect that describes reading the user's input from the standard input as a string + */ + def readLine: IO[String] = + Console[IO].readLine +} diff --git a/core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala b/core/jvm-native/src/main/scala/cats/effect/IOFiberPlatform.scala similarity index 95% rename from core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala rename to core/jvm-native/src/main/scala/cats/effect/IOFiberPlatform.scala index f4e1e80ef9..74f070c1bd 100644 --- a/core/jvm/src/main/scala/cats/effect/IOFiberPlatform.scala +++ b/core/jvm-native/src/main/scala/cats/effect/IOFiberPlatform.scala @@ -18,7 +18,6 @@ package cats.effect import cats.effect.unsafe.UnsafeNonFatal -import java.nio.channels.ClosedByInterruptException import java.util.{concurrent => juc} import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} @@ -65,9 +64,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) { try { Right(cur.thunk()) } catch { - case ex: ClosedByInterruptException => throw ex - - // this won't suppress InterruptedException: + case t if InterruptThrowable.ClosedByInterrupt(t) => throw t case t if UnsafeNonFatal(t) => Left(t) } @@ -82,7 +79,7 @@ private[effect] abstract class IOFiberPlatform[A] extends AtomicBoolean(false) { back } catch { - case _: InterruptedException | _: ClosedByInterruptException => + case t if InterruptThrowable(t) => null } finally { canInterrupt.tryAcquire() diff --git a/core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala index c0b342c07a..8383d475ef 100644 --- a/core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -16,132 +16,14 @@ package cats.effect -import cats.effect.std.Console -import cats.effect.tracing.Tracing - -import java.time.{Instant, ZonedDateTime} import java.util.concurrent.{CompletableFuture, CompletionStage} -private[effect] abstract class IOCompanionPlatform { this: IO.type => - - private[this] val TypeDelay = Sync.Type.Delay - private[this] val TypeBlocking = Sync.Type.Blocking - private[this] val TypeInterruptibleOnce = Sync.Type.InterruptibleOnce - private[this] val TypeInterruptibleMany = Sync.Type.InterruptibleMany - - /** - * Intended for thread blocking operations. `blocking` will shift the execution of the - * blocking operation to a separate threadpool to avoid blocking on the main execution - * context. See the thread-model documentation for more information on why this is necessary. - * Note that the created effect will be uncancelable; if you need cancelation then you should - * use [[interruptible[A](thunk:=>A):*]] or [[interruptibleMany]]. - * - * {{{ - * IO.blocking(scala.io.Source.fromFile("path").mkString) - * }}} - * - * @param thunk - * The side effect which is to be suspended in `IO` and evaluated on a blocking execution - * context - * - * Implements [[cats.effect.kernel.Sync.blocking]]. - */ - def blocking[A](thunk: => A): IO[A] = { - val fn = () => thunk - Blocking(TypeBlocking, fn, Tracing.calculateTracingEvent(fn.getClass)) - } - - // this cannot be marked private[effect] because of static forwarders in Java - @deprecated("use interruptible / interruptibleMany instead", "3.3.0") - def interruptible[A](many: Boolean, thunk: => A): IO[A] = { - val fn = () => thunk - Blocking( - if (many) TypeInterruptibleMany else TypeInterruptibleOnce, - fn, - Tracing.calculateTracingEvent(fn.getClass)) - } - - /** - * Like [[blocking]] but will attempt to abort the blocking operation using thread interrupts - * in the event of cancelation. The interrupt will be attempted only once. - * - * Note the following tradeoffs: - * - this has slightly more overhead than [[blocking]] due to the machinery necessary for - * the interrupt coordination, - * - thread interrupts are very often poorly considered by Java (and Scala!) library - * authors, and it is possible for interrupts to result in resource leaks or invalid - * states. It is important to be certain that this will not be the case before using this - * mechanism. - * - * @param thunk - * The side effect which is to be suspended in `IO` and evaluated on a blocking execution - * context - * - * Implements [[cats.effect.kernel.Sync.interruptible[A](thunk:=>A):*]] - */ - def interruptible[A](thunk: => A): IO[A] = { - val fn = () => thunk - Blocking(TypeInterruptibleOnce, fn, Tracing.calculateTracingEvent(fn.getClass)) - } - - /** - * Like [[blocking]] but will attempt to abort the blocking operation using thread interrupts - * in the event of cancelation. The interrupt will be attempted repeatedly until the blocking - * operation completes or exits. - * - * Note the following tradeoffs: - * - this has slightly more overhead than [[blocking]] due to the machinery necessary for - * the interrupt coordination, - * - thread interrupts are very often poorly considered by Java (and Scala!) library - * authors, and it is possible for interrupts to result in resource leaks or invalid - * states. It is important to be certain that this will not be the case before using this - * mechanism. - * - * @param thunk - * The side effect which is to be suspended in `IO` and evaluated on a blocking execution - * context - * - * Implements [[cats.effect.kernel.Sync!.interruptibleMany]] - */ - def interruptibleMany[A](thunk: => A): IO[A] = { - val fn = () => thunk - Blocking(TypeInterruptibleMany, fn, Tracing.calculateTracingEvent(fn.getClass)) - } - - def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] = - if (hint eq TypeDelay) - apply(thunk) - else { - val fn = () => thunk - Blocking(hint, fn, Tracing.calculateTracingEvent(fn.getClass)) - } +private[effect] abstract class IOCompanionPlatform extends IOCompanionMultithreadedPlatform { + this: IO.type => def fromCompletableFuture[A](fut: IO[CompletableFuture[A]]): IO[A] = asyncForIO.fromCompletableFuture(fut) def fromCompletionStage[A](completionStage: IO[CompletionStage[A]]): IO[A] = asyncForIO.fromCompletionStage(completionStage) - - def realTimeInstant: IO[Instant] = asyncForIO.realTimeInstant - - def realTimeZonedDateTime: IO[ZonedDateTime] = asyncForIO.realTimeZonedDateTime - - /** - * Reads a line as a string from the standard input using the platform's default charset, as - * per `java.nio.charset.Charset.defaultCharset()`. - * - * The effect can raise a `java.io.EOFException` if no input has been consumed before the EOF - * is observed. This should never happen with the standard input, unless it has been replaced - * with a finite `java.io.InputStream` through `java.lang.System#setIn` or similar. - * - * @see - * `cats.effect.std.Console#readLineWithCharset` for reading using a custom - * `java.nio.charset.Charset` - * - * @return - * an IO effect that describes reading the user's input from the standard input as a string - */ - def readLine: IO[String] = - Console[IO].readLine - } diff --git a/core/jvm/src/main/scala/cats/effect/InterruptThrowable.scala b/core/jvm/src/main/scala/cats/effect/InterruptThrowable.scala new file mode 100644 index 0000000000..2aa8771ac9 --- /dev/null +++ b/core/jvm/src/main/scala/cats/effect/InterruptThrowable.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import java.nio.channels.ClosedByInterruptException + +private[effect] object InterruptThrowable { + def apply(t: Throwable): Boolean = t match { + case _: InterruptedException => true + case _: ClosedByInterruptException => true + case _ => false + } + + def ClosedByInterrupt(t: Throwable): Boolean = t match { + case _: ClosedByInterruptException => true + case _ => false + } +} diff --git a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala index 6557d43ee7..4b0c188865 100644 --- a/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala +++ b/core/native/src/main/scala/cats/effect/IOCompanionPlatform.scala @@ -16,51 +16,6 @@ package cats.effect -import cats.effect.std.Console - -import java.time.Instant - -private[effect] abstract class IOCompanionPlatform { this: IO.type => - - private[this] val TypeDelay = Sync.Type.Delay - - def blocking[A](thunk: => A): IO[A] = - // do our best to mitigate blocking - IO.cede *> apply(thunk).guarantee(IO.cede) - - private[effect] def interruptible[A](many: Boolean, thunk: => A): IO[A] = { - val _ = many - blocking(thunk) - } - - def interruptible[A](thunk: => A): IO[A] = interruptible(false, thunk) - - def interruptibleMany[A](thunk: => A): IO[A] = interruptible(true, thunk) - - def suspend[A](hint: Sync.Type)(thunk: => A): IO[A] = - if (hint eq TypeDelay) - apply(thunk) - else - blocking(thunk) - - def realTimeInstant: IO[Instant] = asyncForIO.realTimeInstant - - /** - * Reads a line as a string from the standard input using the platform's default charset, as - * per `java.nio.charset.Charset.defaultCharset()`. - * - * The effect can raise a `java.io.EOFException` if no input has been consumed before the EOF - * is observed. This should never happen with the standard input, unless it has been replaced - * with a finite `java.io.InputStream` through `java.lang.System#setIn` or similar. - * - * @see - * `cats.effect.std.Console#readLineWithCharset` for reading using a custom - * `java.nio.charset.Charset` - * - * @return - * an IO effect that describes reading the user's input from the standard input as a string - */ - def readLine: IO[String] = - Console[IO].readLine - +private[effect] abstract class IOCompanionPlatform extends IOCompanionMultithreadedPlatform { + this: IO.type => } diff --git a/core/native/src/main/scala/cats/effect/InterruptThrowable.scala b/core/native/src/main/scala/cats/effect/InterruptThrowable.scala new file mode 100644 index 0000000000..b792cd6aaa --- /dev/null +++ b/core/native/src/main/scala/cats/effect/InterruptThrowable.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2020-2025 Typelevel + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +private[effect] object InterruptThrowable { + def apply(t: Throwable): Boolean = t match { + case _: InterruptedException => true + case _ => false + } + + def ClosedByInterrupt(t: Throwable): Boolean = t match { + case _ => false + } +} diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index ea9b46b37b..b0e495c10c 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -1002,7 +1002,7 @@ private final class IOFiber[A]( case 21 => val cur = cur0.asInstanceOf[Blocking[Any]] - /* we know we're on the JVM here */ + /* we know we're on JVM or Native here */ if (isStackTracing) { pushTracingEvent(cur.event) diff --git a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala index 776d6ef5d0..6db23f6d80 100644 --- a/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/js/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Typelevel + * Copyright 2020-2025 Typelevel * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/tests/jvm-native/src/test/scala/cats/effect/IOConcurrencySuite.scala b/tests/jvm-native/src/test/scala/cats/effect/IOConcurrencySuite.scala index f6b9ff51e0..eab17f196e 100644 --- a/tests/jvm-native/src/test/scala/cats/effect/IOConcurrencySuite.scala +++ b/tests/jvm-native/src/test/scala/cats/effect/IOConcurrencySuite.scala @@ -116,6 +116,68 @@ trait IOConcurrencySuite extends DetectPlatform { this: BaseSuite => task.replicateA_(100) } + real("interrupt well-behaved blocking synchronous effect") { + var interrupted = true + val latch = new CountDownLatch(1) + + val await = IO.interruptible { + latch.countDown() + Thread.sleep(15000) + interrupted = false + } + + for { + f <- await.start + _ <- IO.blocking(latch.await()) + _ <- f.cancel + _ <- IO(assert(interrupted)) + } yield () + } + + real("interrupt ill-behaved blocking synchronous effect") { + var interrupted = true + val latch = new CountDownLatch(1) + + val await = IO.interruptibleMany { + latch.countDown() + + try { + Thread.sleep(15000) + } catch { + case _: InterruptedException => () + } + + // psych! + try { + Thread.sleep(15000) + } catch { + case _: InterruptedException => () + } + + // I AM INVINCIBLE + Thread.sleep(15000) + + interrupted = false + } + + for { + f <- await.start + _ <- IO.blocking(latch.await()) + _ <- f.cancel + _ <- IO(assert(interrupted)) + } yield () + } + + ticked("realTimeInstant should return an Instant constructed from realTime") { + implicit ticker => + val op = for { + now <- IO.realTimeInstant + realTime <- IO.realTime + } yield now.toEpochMilli == realTime.toMillis + + assertCompleteAs(op, true) + } + real("auto-cede") { val forever = IO.unit.foreverM diff --git a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala index 682c69611f..6b5952cc59 100644 --- a/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/jvm/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -20,7 +20,7 @@ import org.scalacheck.Prop.forAll import scala.concurrent.ExecutionContext -import java.util.concurrent.{CompletableFuture, CountDownLatch, ExecutorService, Executors} +import java.util.concurrent.{CompletableFuture, ExecutorService, Executors} trait IOPlatformSuite extends IOConcurrencySuite { this: BaseScalaCheckSuite => @@ -58,68 +58,6 @@ trait IOPlatformSuite extends IOConcurrencySuite { this: BaseScalaCheckSuite => assertCompleteAs(test, Left(e)) } - real("interrupt well-behaved blocking synchronous effect") { - var interrupted = true - val latch = new CountDownLatch(1) - - val await = IO.interruptible { - latch.countDown() - Thread.sleep(15000) - interrupted = false - } - - for { - f <- await.start - _ <- IO.blocking(latch.await()) - _ <- f.cancel - _ <- IO(assert(interrupted)) - } yield () - } - - real("interrupt ill-behaved blocking synchronous effect") { - var interrupted = true - val latch = new CountDownLatch(1) - - val await = IO.interruptibleMany { - latch.countDown() - - try { - Thread.sleep(15000) - } catch { - case _: InterruptedException => () - } - - // psych! - try { - Thread.sleep(15000) - } catch { - case _: InterruptedException => () - } - - // I AM INVINCIBLE - Thread.sleep(15000) - - interrupted = false - } - - for { - f <- await.start - _ <- IO.blocking(latch.await()) - _ <- f.cancel - _ <- IO(assert(interrupted)) - } yield () - } - - ticked("realTimeInstant should return an Instant constructed from realTime") { - implicit ticker => - val op = for { - now <- IO.realTimeInstant - realTime <- IO.realTime - } yield now.toEpochMilli == realTime.toMillis - - assertCompleteAs(op, true) - } - if (javaMajorVersion >= 21) real("block in-place on virtual threads") { val loomExec = classOf[Executors] diff --git a/tests/native/src/test/scala/cats/effect/IOPlatformSuite.scala b/tests/native/src/test/scala/cats/effect/IOPlatformSuite.scala index afbb4c93a4..a29b7f8085 100644 --- a/tests/native/src/test/scala/cats/effect/IOPlatformSuite.scala +++ b/tests/native/src/test/scala/cats/effect/IOPlatformSuite.scala @@ -20,15 +20,5 @@ trait IOPlatformSuite extends IOConcurrencySuite { this: BaseSuite => def platformTests() = { concurrencyTests() - - ticked("realTimeInstant should return an Instant constructed from realTime") { - implicit ticker => - val op = for { - now <- IO.realTimeInstant - realTime <- IO.realTime - } yield now.toEpochMilli == realTime.toMillis - - assertCompleteAs(op, true) - } } }