avro4s can not deserialize AnyRef

533 Views Asked by At

i have a simple case class

case class KafkaContainer(key: String, payload: AnyRef)

then i want to send this to kafka topic via producer i do this

val byteArrayStream = new ByteArrayOutputStream()
      val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
      output.write(msg)
      output.close()
      val bytes = byteArrayStream.toByteArray
      producer.send(new ProducerRecord("my_topic", msg.key, bytes))

and this is working well

then i try to consume this

Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
    .map { msg =>
      val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
      val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
      val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
      input.close()
        ...
    }.runWith(Sink.ignore)

and this is working well with any class in payload.

But! If it AnyRef. Consumer code fails with

Error:(38, 96) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

Error:(38, 96) not enough arguments for method binary: (implicit evidence$21: com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer], implicit evidence$22: com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]. Unspecified value parameter evidence$22. val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer

if i declare implicits with

implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

it fail to compile with

Error:(58, 71) could not find Lazy implicit value of type com.sksamuel.avro4s.FromValue[Object] implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

Error:(58, 71) not enough arguments for method lazyConverter: (implicit fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]. Unspecified value parameter fromValue. implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

if a add every implicit that complier is require

lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]

compile fails with error

Error:(58, 69) exception during macro expansion: java.lang.IllegalArgumentException: requirement failed: Require a case class but Object is not at scala.Predef$.require(Predef.scala:277) at com.sksamuel.avro4s.FromRecord$.applyImpl(FromRecord.scala:283) implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]

but if i replace AnyRef for some class - no implicit required, everything works fine again

1

There are 1 best solutions below

0
On

I have a similar problem using the Any data type. You have to specify what types for this member variable are valid as Any or AnyRef could be anything. Then use Either or shapeless (also see the Github Documentation). For my case it can be String, Long, Double or null, so using shapeless you can do:

case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])

This converts into a union type in AVRO:

{
    "name" : "value",
    "type" : [ "null", "string", "long", "double" ]
}