Scala, ZIO, Kafka - how to add Producer layer to scope

71 Views Asked by At

I have a class which produces messages to Kafka (using zio-kafka):

case class MyObject(name: String, otherObject: OtherObject)

class MyProducer(config: KafkaConfiguration) {
   val layer: ZLayer[Any, Throwable, Producer] = config.layer() //it creates layer

  def produceEvent(e: MyObject): RIO[Producer, Unit] = {
    val message = new ProducerRecord(topic, e)
Producer.produce(message, Serde.int, Serde.Json).unit
}

}


object MyProducer {
  val layer: ZLayer[SomeConfig, Throwable, MyProducer] = ZLayer(for { ... } yield new MyProducer(config)
}

And now I would like to use this produceEvent method in other place:

class A(producer: MyProducer) {
  def doSmth(): ZIO[Producer, Throwable, Unit] {

   for {
    // some logic 
    _ <- producer.produceEvent(event)

   } yield ()
   
  } 
}

The problem is at runtime I got an error:

Defect in zio.Environment: Could not find Producer inside Environment

I have no idea how to provide Producer to scope. I provided MyProducer correctly, but then I need to return ZIO[Producer, Throwable, Unit] . Maybe there is other solution to "cut" this Producer from environment and just return Task[Unit]?

1

There are 1 best solutions below

2
On

Indeed, it makes sense to keep the Producer local to MyProducer.

You can "provide" it right away:

def produceEvent(e: MyObject): IO[Unit] = {
  val message = new ProducerRecord(topic, e)
  Producer.produce(message, Serde.int, Serde.Json)
    .unit
    .provide(layer)
}