I have a source which uses a flatMapConcat
to consolidate a new stream of elements, produced by parsing Json. To do this, I am using JsonReader provided by Alpakka (https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html). This works well, but I would like to handle the case where invalid Json is presented to the reader. In this case I would like the exception to bubble up to the supervisor so it can restart the stream.
Here's a quick snippet showcasing the bad json case:
val systemErrorHandler: Supervision.Decider = {
case e: Exception =>
println("System Error")
Supervision.Restart
case _ => Supervision.Stop
}
val routeErrorHandler: Supervision.Decider = {
case e: Exception =>
println("Route Error")
Supervision.Restart
}
val exceptionStrategy = ActorAttributes.supervisionStrategy(routeErrorHandler)
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(systemErrorHandler))
val json = """{"test":"one}"""
val route = Source.single("1").flatMapConcat { _ =>
Source.single(ByteString(json))
.via(JsonReader.select("$")).map(_.utf8String)
}.to(Sink.foreach(println)).withAttributes(exceptionStrategy)
route.run
I would expect JsonReader, upon throwing an exception, to fail the stage and then bubble the exception up to the routeErrorHandler
. However, when run the JsonReader stage is failed and the exception is swallowed. I also attempted to place a recover
in the stream to throw the exception at a 'higher' level (outside the substream) but that also doesn't get handled with either the routeErrorHandler supervisor or the supervisor placed on the ActorMaterializer.
Is this expected behavior? If so, what is the proper way to ensure an exception within a substream reaches the supervisor strategy?