FS2 join Cannot prove that Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]

708 Views Asked by At

I'm trying to use fs2 streams 0.10.0-M9 and doobie version 0.5.0-M9 for getting a sequence of objects from an http call which I want to then insert into a postgres database but I'm having issues structuring this code, getting the following error:

Error:(49, 12) Cannot prove that Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]. .join(100)

What i want to do is run the insert statements concurrently once the call to the web service returns. Here's the code:

fetchProducts(date).map{items  =>
        items.map( i =>
          Stream.eval(upsertProductIO(i).transact(xa))
        )
      }
      .join(100)
      .run
      .unsafeRunSync()

//rest call
def fetchProducts(changeDate: String): Stream[IO, Seq[Product]] = {
//rest call here
}

//DAO code
def upsertProductIO(p: Product): ConnectionIO[Int] = {
  upsertProduct(p).run
}
1

There are 1 best solutions below

9
On

The problem is that you have a Seq[Stream[IO, Seq[Product]], What you want, rather, is a Stream[IO, Seq[Product]], which you can do with a Foldable type class instance:

import cats.implicits._
import scala.concurrent.ExecutionContext.Implicits.global

fetchProducts(date).map { items =>
   items.map(i => Stream.eval(upsertProductIO(i).transact(xa)))
}.map(seqOfStream.toList.sequence)
 .join(100)
 .run
 .unsafeRunSync()