I'm trying to use fs2 streams 0.10.0-M9 and doobie version 0.5.0-M9 for getting a sequence of objects from an http call which I want to then insert into a postgres database but I'm having issues structuring this code, getting the following error:
Error:(49, 12) Cannot prove that Seq[fs2.Stream[cats.effect.IO,Int]] <:< fs2.Stream[cats.effect.IO,O2]. .join(100)
What i want to do is run the insert statements concurrently once the call to the web service returns. Here's the code:
fetchProducts(date).map{items =>
items.map( i =>
Stream.eval(upsertProductIO(i).transact(xa))
)
}
.join(100)
.run
.unsafeRunSync()
//rest call
def fetchProducts(changeDate: String): Stream[IO, Seq[Product]] = {
//rest call here
}
//DAO code
def upsertProductIO(p: Product): ConnectionIO[Int] = {
upsertProduct(p).run
}
The problem is that you have a
Seq[Stream[IO, Seq[Product]]
, What you want, rather, is aStream[IO, Seq[Product]]
, which you can do with aFoldable
type class instance: