Splitting a scalaz-stream process into two child streams

1k Views Asked by At

Using scalaz-stream is it possible to split/fork and then rejoin a stream?

As an example, let's say I have the following function

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)

val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers  = streamOfNumbers.filter(isOdd).fold(0)(add)

zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )

With scalaz-stream, in this example the results would be as you expect - a tuple of numbers from 1 to 10 passed to a sink.

However if we replace streamOfNumbers with something that requires IO, it will actually execute the IO action twice.

Using a Topic I'm able create a pub/sub process that duplicates elements in the stream correctly, however it does not buffer - it simply consumers the entire source as fast as possible regardless of the pace sinks consume it.

I can wrap this in a bounded Queue, however the end result feels a lot more complex than it needs to be.

Is there a simpler way of splitting a stream in scalaz-stream without duplicate IO actions from the source?

3

There are 3 best solutions below

3
On BEST ANSWER

Also to clarify the previous answer delas with the "splitting" requirement. The solution to your specific issue may be without the need of splitting streams:

val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int\/Int] = streamOfNumbers.map {
   case even if even % 2 == 0 => right(even)
   case odd => left(odd)
} 
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)

val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???

summed
.drainW(evenSink)
.to(oddSink)
1
On

You can perhaps still use topic and just assure that the children processes will subscribe before you will push to topic.

However please note this solution does not have any bounds on it, i.e. if you will be pushing too fast, you may encounter OOM error.

def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
  val topic = async.topic[A]

  val sub1 = topic.subscribe
  val sub2 = topic.subscribe

  merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
0
On

I likewise needed this functionality. My situation was quite a bit trickier disallowing me to work around it in this manner.

Thanks to Daniel Spiewak's response in this thread, I was able to get the following to work. I improved on his solution by adding onHalt so my application would exit once the Process completed.

def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
  val left = async.boundedQueue[A](limit)
  val right = async.boundedQueue[A](limit)

  val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
    Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
  }
  val dequeue = Process((left.dequeue, right.dequeue))

  enqueue merge dequeue
}