I try to shutdown a stream transforming some numbers and create another stream of the same graph blueprint. However, the second stream instance does not run or at least it does not print anything to the console.
What am I missing?
object KillSwitchSample extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer()
val killSwitch = KillSwitches.shared("switch")
val stream1 = createStream("stream 1")
stream1.run()
Thread.sleep(200)
killSwitch.shutdown()
val stream2 = createStream("stream 2")
stream2.run()
Thread.sleep(200)
killSwitch.shutdown()
def createStream(streamName: String): RunnableGraph[NotUsed] = {
Source.fromGraph(new NumbersSource)
.via(killSwitch.flow)
.map(el => s"$streamName: $el")
.to(Sink.foreach(println))
}
}
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
You use a shared
KillSwitch
. A sharedKillSwitch
can only be toggled once after which it remembers that it has already been toggled and will therefore immediately kill terminate later streams as well.That's what happens with your code. You triggered the kill switch before running the graph the second time.
You can use a
KillSwitches.single
instead to get a newKillSwitch
each time: