fs2.Stream observeAsync does not execute a given sink asynchronously

265 Views Asked by At

I'm experimenting with fs2.Stream concurrent features and got some misunderstanding about how it works. I would like to send stream content through some sink in parallel. Here is what I tried:

object TestParallelStream extends App {
  val secondsOnStart = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())
  val stream = fs2.Stream.emits(List(1, 2, 3, 4, 5, 6, 7, 8, 9)).covary[IO]
  val sink: fs2.Sink[IO, Int] = _.evalMap(i => IO {
    println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
    Thread.sleep(5000)
  })
  val executor = Executors.newFixedThreadPool(4)
  implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.fromExecutor(executor))


  stream.observeAsync(3)(sink).compile.drain.unsafeRunSync() //1
  executor.shutdown()
}

The //1 prints the following content:

[1 second]: 1
[6 second]: 2
[11 second]: 3
[16 second]: 4
[21 second]: 5
[26 second]: 6
[31 second]: 7
[36 second]: 8
[41 second]: 9

As can be seen from the output, each element is sent through the sink sequentially.

But if I modify the sink as follows:

// 5 limit and parEvalMap
val sink: fs2.Sink[IO, Int] = _.parEvalMap(5)(i => IO { 
  println(s"[${TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()) - secondsOnStart} second]: $i")
  Thread.sleep(5000)
})

The output is:

[1 second]: 3
[1 second]: 2
[1 second]: 4
[1 second]: 1
[6 second]: 5
[6 second]: 6
[6 second]: 7
[6 second]: 8
[11 second]: 9

Now we have 4 elements are being sent through the sink in parallel at a time (in spite of setting 3 as a limit of observerAsync).

Even if I replace observerAsync with just observe I got the same parallelization effect.

Can you please clarify how sinks actually work?

1

There are 1 best solutions below

0
On

observe is used when you want to pass stream elements through multiple sinks. It doesn't change the concurrency behavior of the sink itself.

You'd use it like this:

stream.observeAsync(n)(sink1).to(sink2)