How do I nondeterministically flatten infinite FS2 streams

869 Views Asked by At

I'm using Scala's FS2 stream library.

I have a Stream[F, [Stream[F, A]] where both the inner streams and the outer stream are infinite (with appropriate Async instances for F). I want to end up with a Stream[F, A] that concurrently pulls from the outer stream as well as an inner stream, where every new element from the outer stream replaces the current inner stream that I'm pulling from. In particular I want to "eventually" get to at least attempt to pull (although I may be interrupted by the outer stream before I can do so) from all of the inner streams.

My attempts to do this with an external Async reference that holds the current inner stream seem to end in Interrupt exceptions getting thrown.

I do not want a simple flatMap or even concurrent.join. Because my inner streams are infinite, these will never get to more than a finite number of inner streams.

Is there a way of achieving this with FS2?

0

There are 0 best solutions below