I'm experimenting with fs2.Stream
concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:
object TestParallelStream extends App {
val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
val executor = Executors.newFixedThreadPool(4)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))
stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
executor.shutdown()
}
The //1
prints the following content:
[1 second]: 1
[6 second]: 2
[11 second]: 3
[16 second]: 4
[21 second]: 5
[26 second]: 6
[31 second]: 7
[36 second]: 8
[41 second]: 9
As can be seen from the output, each element is sent through the sink
sequentially.
But if I modify the sink as follows:
// 5 limit and parEvalMap
val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
The output is:
[1 second]: 3
[1 second]: 2
[1 second]: 4
[1 second]: 1
[6 second]: 5
[6 second]: 6
[6 second]: 7
[6 second]: 8
[11 second]: 9
Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3
as a limit of observerAsync
).
Even if I replace observerAsync
with just observe
I got the same parallelization effect.
Can you please clarify how sinks actually work?
observe
is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.You'd use it like this: