Get a Stream of Entity from an Http4s Response with Circe

433 Views Asked by At

I'm trying to retrieve a Stream[IO, Job] from an http4s Response, but the Scala compiler warns me that it cannot find any suitable Decoder:

Cannot decode into a value of type fs2.Stream[IO,Job], because no EntityDecoder[[+A]IO[A], fs2.Stream[IO,Job]] instance could be found.
[error]         retrieved <- response.as[fs2.Stream[IO, Job]]

The code that generates the above error is the following:

import io.circe.generic.auto._
import org.http4s.circe.CirceEntityCodec._
// Many other imports

"should return the stream of all jobs" in {
  for {
    response <- jobsRoutes.orNotFound.run(
      Request(
        method = Method.GET,
        uri = uri"/jobs",
        headers = Headers(Accept(MediaType.`text/event-stream`))
      )
    )
    retrieved <- response.as[fs2.Stream[IO, Job]]
  } yield {
    response.status shouldBe Status.Ok
  }
}

In the build.sbt file, I have the following dependencies:

// Many other omitted dependencies
"org.http4s"            %% "http4s-circe"                  % "0.23.14",
"io.circe"              %% "circe-generic"                 % "0.14.2",
"io.circe"              %% "circe-fs2"                     % "0.14.0",

The definition of the Job entity is:

final case class Job(
    id: UUID,
    date: Long,
    salaryLo: Option[Int],
    salaryHi: Option[Int],
    currency: Option[String],
    location: String,
    tags: List[String],
    description: String,
    localUrl: Option[String],
    externalUrl: Option[String]
    image: Option[String],
    country: Option[String],
    title: String,
    company: String,
    seniority: Option[String],
    other: Option[String]
)

I cannot understand what's going on.

1

There are 1 best solutions below

0
On

This works for me:

//scala 2.23.10
//cats-effect 3.5.3
//http4s-core 0.23.25
//fs2-core 3.9.3
//circe-core 0.14.6
//circe-parser 0.14.6

import cats.effect.IO
import fs2.Stream
import io.circe.{Decoder, Json}
import org.http4s.{HttpRoutes, Request, ServerSentEvent}

val routes: HttpRoutes[IO] = ???

def getEntitiesFromStream(request: Request[IO]): IO[List[Json]] =
  Stream
    .eval(routes.orNotFound.run(request))
    .flatMap(_.body.through(ServerSentEvent.decoder))
    .compile
    .toList
    .map(_.collect { case ServerSentEvent(Some(data), _, _, _, _) =>
      io.circe.parser.parse(data).toOption.get
    })

Where request can be something like

Request(
  method = Method.GET,
  uri = uri"/jobs"
) //no need to set MediaType.`text/event-stream`

(.toOption.get might be not desired, but it's irrelevant to the problem and to the example)

Scastie snippet: https://scastie.scala-lang.org/eBZvMBW5Q1GIGhH4BUgz4w