Akka Streams: Creating another RunnableGraph after shutdown with KillSwitch

500 Views Asked by At

I try to shutdown a stream transforming some numbers and create another stream of the same graph blueprint. However, the second stream instance does not run or at least it does not print anything to the console.

What am I missing?

object KillSwitchSample extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val killSwitch = KillSwitches.shared("switch")

  val stream1 = createStream("stream 1")
  stream1.run()
  Thread.sleep(200)
  killSwitch.shutdown()

  val stream2 = createStream("stream 2")
  stream2.run()
  Thread.sleep(200)
  killSwitch.shutdown()

  def createStream(streamName: String): RunnableGraph[NotUsed] = {
    Source.fromGraph(new NumbersSource)
      .via(killSwitch.flow)
      .map(el => s"$streamName: $el")
      .to(Sink.foreach(println))
  }
}

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      private var counter = 1

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, counter)
          counter += 1
        }
      })
    }
}
1

There are 1 best solutions below

1
On BEST ANSWER

You use a shared KillSwitch. A shared KillSwitch can only be toggled once after which it remembers that it has already been toggled and will therefore immediately kill terminate later streams as well.

That's what happens with your code. You triggered the kill switch before running the graph the second time.

You can use a KillSwitches.single instead to get a new KillSwitch each time:

def createStream(streamName: String): RunnableGraph[UniqueKillSwitch] =
  Source.fromGraph(new NumbersSource)
    .map(el => s"$streamName: $el")
    .viaMat(KillSwitches.single)(Keep.right)
    .to(Sink.foreach(println))

val switch1 = createStream("a").run()
// ...
switch1.shutdown()

val switch2 = createStream("b").run()
// ...
switch2.shutdown()