I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:
object TestParallelStream extends App {
val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
val executor = Executors.newFixedThreadPool(4)
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))
stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
executor.shutdown()
}
The //1 prints the following content:
[1 second]: 1
[6 second]: 2
[11 second]: 3
[16 second]: 4
[21 second]: 5
[26 second]: 6
[31 second]: 7
[36 second]: 8
[41 second]: 9
As can be seen from the output, each element is sent through the sink sequentially.
But if I modify the sink as follows:
// 5 limit and parEvalMap
val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO {
println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
Thread.sleep(5000)
})
The output is:
[1 second]: 3
[1 second]: 2
[1 second]: 4
[1 second]: 1
[6 second]: 5
[6 second]: 6
[6 second]: 7
[6 second]: 8
[11 second]: 9
Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).
Even if I replace observerAsync with just observe I got the same parallelization effect.
Can you please clarify how sinks actually work?
observeis used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.You'd use it like this: