I would like to create a Source and later push elements on it, like in:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
What is the recommended way to do this?
Thanks!
I would like to create a Source and later push elements on it, like in:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
What is the recommended way to do this?
Thanks!
On
Since Akka 2.5 Source has a preMaterialize method.
According to the documentation, this looks like the indicated way to do what you ask:
There are situations in which you require a
Sourcematerialized value before theSourcegets hooked up to the rest of the graph. This is particularly useful in the case of “materialized value powered” Sources, likeSource.queue,Source.actorReforSource.maybe.
Below an example on how this would be with a SourceQueue. Elements are pushed to the queue before and after materialization, as well as from within the Flow:
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ActorMaterializer, OverflowStrategy}
implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()
val sourceDecl = Source.queue[String](bufferSize = 2, OverflowStrategy.backpressure)
val (sourceMat, source) = sourceDecl.preMaterialize()
// Adding element before actual materialization
sourceMat.offer("pre materialization element")
val flow = Flow[String].map { e =>
if(!e.contains("new")) {
// Adding elements from within the flow
sourceMat.offer("new element generated inside the flow")
}
s"Processing $e"
}
// Actually materializing with `run`
source.via(flow).to(Sink.foreach(println)).run()
// Adding element after materialization
sourceMat.offer("post materialization element")
Output:
Processing pre materialization element
Processing post materialization element
Processing new element generated inside the flow
Processing new element generated inside the flow
On
After playing around and looking for a good solution to this I came across this solution which is clean, simple, and works both pre and post materialization. https://stackoverflow.com/a/32553913/6791842
val (ref: ActorRef, publisher: Publisher[Int]) =
Source.actorRef[Int](bufferSize = 1000, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both).run()
ref ! 1 //before
val source = Source.fromPublisher(publisher)
ref ! 2 //before
Thread.sleep(1000)
ref ! 3 //before
source.runForeach(println)
ref ! 4 //after
Thread.sleep(1000)
ref ! 5 //after
Output:
1
2
3
4
5
There are three ways this can be achieved:
1. Post Materialization with SourceQueue
You can use
Source.queuethat materializes the Flow into aSourceQueue:2. Post Materialization with Actor
There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:
3. Pre Materialization with Actor
Similarly, you could explicitly create an Actor that contains a message buffer, use that Actor to create a Source, and then send that Actor messages as described in the answer here: