I'm using Scala's FS2 stream library.
I have a Stream[F, [Stream[F, A]]
where both the inner streams and the outer stream are infinite (with appropriate Async
instances for F
). I want to end up with a Stream[F, A]
that concurrently pulls from the outer stream as well as an inner stream, where every new element from the outer stream replaces the current inner stream that I'm pulling from. In particular I want to "eventually" get to at least attempt to pull (although I may be interrupted by the outer stream before I can do so) from all of the inner streams.
My attempts to do this with an external Async
reference that holds the current inner stream seem to end in Interrupt
exceptions getting thrown.
I do not want a simple flatMap
or even concurrent.join
. Because my inner streams are infinite, these will never get to more than a finite number of inner streams.
Is there a way of achieving this with FS2?