This code snippet in akka-stream cookbook documentation illustrates how to trigger the flow of elements programmatically :
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[Message, Trigger]())
elements ~> zip.in0
triggerSource ~> zip.in1
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
ClosedShape
})
In this example, how Trigger
and triggerSource
would look like ?
1.
Trigger
can be anything, as you can see from the graph logic, it's getting discarded all the time. Most likely it will be2.
triggerSource
is any validSource[Trigger, _]
. See the docs for a list of possibilities.Note that this specific example involves a
ClosedShape
materializing toNotUsed
, so it will need tweaking if you need to accesstriggerSource
materialized value, or connecting this to a more complex graph producingTrigger
s.