I have a Process[Task, A], and I need to run a function A => B whose run time ranges from instantaneous to really long on each A of the stream to yield a Process[Task, B].
The catch is that I'd like to process each A as soon as possible in an ExecutionContext and pass the result as soon as I have it, regardless of the order in which the As are received.
A concrete example would be the following code, where I would hope all the odd numbers to be printed immediately and the even ones about 500ms later. What happens instead is that (odd, even) couples are printed, interleaved with 500ms pauses:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
Process.range(0, 100).flatMap { i =>
Process.eval(Task.apply {
if(i % 2 == 0) Thread.sleep(500)
i
}(executor))
}.to(io.printStreamSink(System.out)(_ println _))
.run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}
Turns out the answer is using channels. Here's the updated code that seems to do exactly what I want: