Interrupt ZStream mapMPar processing

342 Views Asked by At

I have the following code which, because of Excel max row limitations, is restricted to ~1million rows:

ZStream.unwrap(generateStreamData).mapMPar(32) {m =>
  streamDataToCsvExcel
}

All fairly straightforward and it works perfectly. I keep track of the number of rows streamed, and then stop writing data. However I want to interrupt all the child fibers spawned in mapMPar, something like this:

ZStream.unwrap(generateStreamData).interruptWhen(effect.true).mapMPar(32) {m =>
  streamDataToCsvExcel
}

Unfortunately the process is interrupted immediately here. I'm probably missing something obvious...

Editing the post as it needs some clarity.

My stream of data is generated by an expensive process in which data is pulled from a remote server, (this data is itself calculated by an expensive process) with n Fibers. I then process the streams and then stream them out to the client. Once the processed row count has reached ~1 million, I then need to stop pulling data from the remote server (i.e. interrupt all the Fibers) and end the process.

1

There are 1 best solutions below

0
On

Here's what I can come up with after your clarification. The ZIO 1.x version is a bit uglier because of the lack of .dropRight

Basically we can use takeUntilM to count the size of elements we've gotten to stop once we get to the maximum size (and then use .dropRight or the additional filter to discard the last element that would take it over the limit)

This ensures that both

  • You only run streamDataToCsvExcel until the last possible message before hitting the size limit
  • Because streams are lazy expensiveQuery only gets run for as many messages as you can fit within the limit (or N+1 if the last value is discarded because it would go over the limit)
import zio._
import zio.stream._

object Main extends zio.App {

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {

    val expensiveQuery     = ZIO.succeed(Chunk(1, 2))
    val generateStreamData = ZIO.succeed(ZStream.repeatEffect(expensiveQuery))

    def streamDataToCsvExcel = ZIO.unit

    def count(ref: Ref[Int], size: Int): UIO[Boolean] =
      ref.updateAndGet(_ + size).map(_ > 10)

    for {
      counter <- Ref.make(0)
      _ <- ZStream
            .unwrap(generateStreamData)
            .takeUntilM(next => count(counter, next.size)) // Count size of messages and stop when it's reached
            .filterM(_ => counter.get.map(_ <= 10))                     // Filter last message from `takeUntilM`. Ideally should be .dropRight(1) with ZIO 2
            .mapMPar(32)(_ => streamDataToCsvExcel)
            .runDrain

    } yield ExitCode.success
  }
}

If relying on the laziness of streams doesn't work for your use case you can trigger an interrupt of some sort from the takeUntilM condition. For example you could update the count function to

def count(ref: Ref[Int], size: Int): UIO[Boolean] =
  ref.updateAndGet(_ + size).map(_ > 10)
    .tapSome { case true => someFiber.interrupt }