ZIO: How to return JSON ? [instead of using case class in ZIO-Http use schema to map?]

1k Views Asked by At

I tried directly getting body of JSON in code which I then want to convert to Avro to write to a kafka topic.

Here is my code with case class:


import zhttp.http._
import zio._
import zhttp.http.{Http, Method, Request, Response, Status}
import zhttp.service.Server
import zio.json._
import zio.kafka._
import zio.kafka.serde.Serde
import zio.schema._


case class Experiments(experimentId: String,
                       variantId: String,
                       accountId: String,
                       deviceId: String,
                       date: Int)

//case class RootInterface (events: Seq[Experiments])


object Experiments {
  implicit val encoder: JsonEncoder[Experiments] = DeriveJsonEncoder.gen[Experiments]
  implicit val decoder: JsonDecoder[Experiments] = DeriveJsonDecoder.gen[Experiments]
  implicit val codec: JsonCodec[Experiments] = DeriveJsonCodec.gen[Experiments]
  implicit val schema: Schema[Experiments] = DeriveSchema.gen

}

object HttpService {
  def apply(): Http[ExpEnvironment, Throwable, Request, Response] =
    Http.collectZIO[Request] {

      case req@(Method.POST -> !! / "zioCollector") =>
        val c = req.body.asString.map(_.fromJson[Experiments])
        for {
          u <- req.body.asString.map(_.fromJson[Experiments])
          r <- u match {
            case Left(e) =>
              ZIO.debug(s"Failed to parse the input: $e").as(
                Response.text(e).setStatus(Status.BadRequest)
              )
            case Right(u) =>
              println(s"$u +       =====")
              ExpEnvironment.register(u)
                .map(id => Response.text(id))
          }
        }
        yield r
    }
}

//  val experimentsSerde: Serde[Any, Experiments] = Serde.string.inmapM { string =>
//    //desericalization
//    ZIO.fromEither(string.fromJson[Experiments].left.map(errorMessage => new RuntimeException(errorMessage)))
//  } { theMatch =>
//    ZIO.effect(theMatch.toJson)
//
//  }

object ZioCollectorMain extends ZIOAppDefault {
  def run: ZIO[Environment with ZIOAppArgs with Scope, Any, Any] = {
    Server.start(
      port = 9001,
      http = HttpService()).provide(ZLayerExp.layer)
  }
}

I'm looking into Zio-Json but no success yet, any help is appreciated !

We could also schema something to get the avro generic record

here's my json :

{ "experimentId": "abc", "variantId": "123", "accountId": "123", "deviceId": "123", "date": 1664544365 }

1

There are 1 best solutions below

1
On

This function works for me in Scala 3 (sorry, I didn't include all the code but it should be enough):

import zio.*
import zio.Console.printLine
import zhttp.http.*
import zhttp.service.Server
import zio.json.*
...

  case class Experiments(experimentId: String,
                         variantId: String,
                         accountId: String,
                         deviceId: String,
                         date: Int)

//case class RootInterface (events: Seq[Experiments])


  object Experiments:
    implicit val encoder: JsonEncoder[Experiments] = DeriveJsonEncoder.gen[Experiments]
    implicit val decoder: JsonDecoder[Experiments] = DeriveJsonDecoder.gen[Experiments]
    implicit val codec: JsonCodec[Experiments] = DeriveJsonCodec.gen[Experiments]

  val postingExperiment: Http[Any, Throwable, Request, Response] =

    Http.collectZIO[Request] {
      case req@(Method.POST -> !! / "zioCollector") =>
        //val c = req.body.asString.map(_.fromJson[Experiments])
          val experimentsZIO = req.body.asString.map(_.fromJson[Experiments])
          for {
            experimentsOrError <- experimentsZIO
            response <- experimentsOrError match {
              case Left(e) => ZIO.debug(s"Failed to parse the input: $e").as(
                Response.text(e).setStatus(Status.BadRequest)
              )
              case Right(experiments) => ZIO.succeed(Response.json(experiments.toJson))
            }
          }   yield response
    }

I modified your code slightly (you didn't post your ExpEnvironment class), and it returns back the object posted to the url.

and the test code is:

import sttp.client3.{SimpleHttpClient, UriContext, basicRequest}

object TestExperiments:

  def main(args: Array[String]): Unit =

    val client = SimpleHttpClient()
    //post request
    val request = basicRequest
      .post(uri"http://localhost:9009/zioCollector")
      .body("{ \"experimentId\": \"abc\", \"variantId\": \"123\", \"accountId\": \"123\", \"deviceId\": \"123\", \"date\": 1664544365 }")

    val response = client.send(request)
    println(response.body)

    val invalidJsonRequest = basicRequest
      .post(uri"http://localhost:9009/zioCollector")
      .body("{ \"experimentId\": \"abc\", \"variantId\": \"123\", \"accountId\": \"123\", \"deviceId\": \"123\", \"date\": 1664544365 ") // missing the closing bracket

    val invalidJsonResponse = client.send(invalidJsonRequest)
    println(invalidJsonResponse.body)

You have to add: "com.softwaremill.sttp.client3" %% "core" % "3.8.3" to your sbt file.

build.sbt:

ThisBuild / scalaVersion     := "3.2.0"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "TestSpeed"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "TestZio",
    libraryDependencies ++= Seq(
      "dev.zio" %% "zio" % "2.0.2",
      "dev.zio"       %% "zio-json"       % "0.3.0-RC11",
      "io.d11" %% "zhttp" % "2.0.0-RC11",
      "dev.zio" %% "zio-test" % "2.0.2" % Test,
      "com.softwaremill.sttp.client3" %% "core" % "3.8.3" % Test
    ),
    testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
  )

I didn't include anything related to avro because I am not familiar with it.