Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 2c1b54e

Browse files
authoredNov 26, 2024
Merge pull request #1640 from mdipirro/scala-672-stream-errors
SCALA-672 Add code for stream error handling
2 parents a4b06f1 + 9faa82d commit 2c1b54e

File tree

4 files changed

+150
-0
lines changed

4 files changed

+150
-0
lines changed
 

‎build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,8 @@ lazy val scala_akka_3 = (project in file("scala-akka-3"))
338338
libraryDependencies ++= Seq(
339339
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
340340
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
341+
"org.slf4j" % "slf4j-api" % "2.0.16",
342+
"org.slf4j" % "slf4j-simple" % "2.0.16",
341343
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
342344
akkaActorTyped,
343345
akkaStreamDep,
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package com.baeldung.scala.akka.stream.errors
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.RestartSettings
5+
import akka.stream.scaladsl.RestartSource
6+
7+
import scala.concurrent.duration.DurationInt
8+
import scala.language.postfixOps
9+
import scala.util.{Failure, Success}
10+
11+
object Main extends App {
12+
implicit val system: ActorSystem = ActorSystem("baeldung")
13+
14+
private val backoffSettings = RestartSettings(
15+
minBackoff = 3 seconds,
16+
maxBackoff = 30 seconds,
17+
randomFactor = 0.2
18+
).withMaxRestarts(3, 5.minutes)
19+
20+
RestartSource
21+
.withBackoff(backoffSettings) { () => source }
22+
.via(parse)
23+
.via(compare)
24+
.runWith(sink)
25+
.andThen {
26+
case Failure(exception) => println(exception)
27+
case Success((correct, total)) =>
28+
println(s"$correct/$total correct answers")
29+
}(system.dispatcher)
30+
.onComplete(_ => system.terminate())(system.dispatcher)
31+
32+
/*source
33+
.via(parseWithLogging)
34+
.via(compare)
35+
.runWith(sink)
36+
.andThen {
37+
case Failure(exception) => println(exception)
38+
case Success((correct, total)) =>
39+
println(s"$correct/$total correct answers")
40+
}(system.dispatcher)
41+
.onComplete(_ => system.terminate())(system.dispatcher)*/
42+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.baeldung.scala.akka.stream
2+
3+
import akka.NotUsed
4+
import akka.stream.Attributes
5+
import akka.stream.scaladsl.{Flow, Sink, Source}
6+
7+
import scala.concurrent.Future
8+
9+
package object errors {
10+
val source: Source[String, NotUsed] = Source(
11+
Seq("5,10", "15,15", "78,79", "12,12", "0,0", "456,456")
12+
)
13+
14+
val parse: Flow[String, (Int, Int), NotUsed] =
15+
Flow[String]
16+
.map { pair =>
17+
val parts = pair.split(",")
18+
(parts(0).toInt, parts(1).toInt)
19+
}
20+
21+
val parseWithRecover: Flow[String, Either[String, (Int, Int)], NotUsed] =
22+
Flow[String]
23+
.map { pair =>
24+
val parts = pair.split(",")
25+
Right((parts(0).toInt, parts(1).toInt))
26+
}
27+
.recover({ case e: ArrayIndexOutOfBoundsException =>
28+
Left(e.getMessage)
29+
})
30+
31+
val parseWithLogging: Flow[String, (Int, Int), NotUsed] =
32+
Flow[String]
33+
.map { pair =>
34+
val parts = pair.split(",")
35+
(parts(0).toInt, parts(1).toInt)
36+
}
37+
.log(name = "Baeldung stream")
38+
.addAttributes(
39+
Attributes.logLevels(
40+
onElement = Attributes.LogLevels.Info
41+
)
42+
)
43+
44+
val compare: Flow[(Int, Int), Boolean, NotUsed] =
45+
Flow[(Int, Int)]
46+
.map { case (userAnswer, correctAnswer) => userAnswer == correctAnswer }
47+
48+
val sink: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
49+
case ((correctCount, total), wasCorrect) =>
50+
if (wasCorrect) (correctCount + 1, total + 1)
51+
else (correctCount, total + 1)
52+
}
53+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.baeldung.scala.akka.stream.errors
2+
3+
import akka.actor.ActorSystem
4+
import akka.stream.{ActorAttributes, Supervision}
5+
import akka.stream.scaladsl.Keep
6+
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
7+
import org.scalatest.concurrent.ScalaFutures.convertScalaFuture
8+
import org.scalatest.flatspec.AnyFlatSpec
9+
import org.scalatest.matchers.should.Matchers
10+
11+
class ErrorRecoveryUnitTest extends AnyFlatSpec with Matchers {
12+
implicit val system: ActorSystem = ActorSystem("baeldung")
13+
14+
"The \"parseWithRecover\" flow" should "parse recover from a parsing error" in {
15+
val (pub, sub) = TestSource[String]()
16+
.via(parseWithRecover)
17+
.toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)
18+
.run()
19+
20+
pub.sendNext("1,1")
21+
pub.sendNext("145146")
22+
pub.sendComplete()
23+
24+
sub.requestNext(Right(1, 1))
25+
sub.requestNext(Left("Index 1 out of bounds for length 1"))
26+
sub.expectComplete()
27+
}
28+
29+
"The \"Resume\" supervision strategy" should "ignore parsing errors" in {
30+
val runnableGraph = TestSource[String]()
31+
.via(parseWithRecover)
32+
.toMat(TestSink[Either[String, (Int, Int)]]())(Keep.both)
33+
34+
val decider: Supervision.Decider = {
35+
case _: ArrayIndexOutOfBoundsException => Supervision.Resume
36+
case _ => Supervision.Stop
37+
}
38+
39+
val graphWithResumeSupervision =
40+
runnableGraph.withAttributes(ActorAttributes.supervisionStrategy(decider))
41+
42+
val (pub, sub) = graphWithResumeSupervision.run()
43+
44+
pub.sendNext("1,1")
45+
pub.sendNext("145146")
46+
pub.sendNext("1,2")
47+
pub.sendComplete()
48+
49+
sub.requestNext(Right(1, 1))
50+
sub.requestNext(Right(1, 2))
51+
sub.expectComplete()
52+
}
53+
}

0 commit comments

Comments
 (0)
Please sign in to comment.