I wan't to skip a flow without losing the data that was sent in case of failure. But I can't found a way to do that. Here is an example code that I being using to test.
val decider: Supervision.Decider = {
case exception: Exception =>
// println(exception.getMessage)
Supervision.restart
}
Source(1 to 10)
.via(Flow[Int].map(x => {
if (x == 5) throw new Exception("boom!")
x
}))
.recover[Int] {
case e: Exception => 0
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.foreach(println))
Output obtained: 1 2 3 4 6 7 8 9 10
Output needed: 1 2 3 4 5 6 7 8 9 10
Is there a way to keep the message in case a flow throws an exception? Or another solution that could help me to solve this problem?
This is not possible with supervision strategies. The philosophy is that if the element is able to recovered, the operator should recover it. It's only in the case of unrecoverable failure that the supervision strategy is invoked.
To quote the supervision strategy docs (for Akka Actors, not Akka Streams, but streams is inheriting the concepts):
After all, if the operator didn't process the element the first time, what would be different the second time? (Thus the warnings on livelocks.)
So if the element is recoverable in some way, then use try/catch. Supervision and stream error handling is more about how to decide what to do with the rest of the stream in the case of an element failure.