Avro4s, how to serialise a map with custom key type?

1.6k Views Asked by At

I am using Avro4s. It's easy to serialise a

Map[String, T]

but I have a situation like

sealed trait Base
case object First extends Base
case object Second extends Base

and I need to serialise something like

Map[Base, T]

Any advice on the best way to achieve this? Thanks.

1

There are 1 best solutions below

0
On BEST ANSWER

The thing is that according to the Avro spec

Map keys are assumed to be strings.

So the only type supported by Avro is Map[String,T]. It means that you need to write some custom code that will map your Map[Base, T] onto Map[String,T] and back. Something like this will probably work for you:

import scala.collection.breakOut
import scala.collection.immutable.Map
import scala.collection.JavaConverters._
import com.sksamuel.avro4s._
import org.apache.avro.Schema
import org.apache.avro.Schema.Field

object BaseMapAvroHelpers {
  private val nameMap: Map[Base, String] = Map(First -> "first", Second -> "second")
  private val revNameMap: Map[String, Base] = nameMap.toList.map(kv => (kv._2, kv._1)).toMap

  implicit def toSchema[T: SchemaFor]: ToSchema[Map[Base, T]] = new ToSchema[Map[Base, T]] {
    override val schema: Schema = Schema.createMap(implicitly[SchemaFor[T]].apply())
  }

  implicit def toValue[T: SchemaFor : ToValue]: ToValue[Map[Base, T]] = new ToValue[Map[Base, T]] {
    override def apply(value: Map[Base, T]): java.util.Map[String, T] = value.map(kv => (nameMap(kv._1), kv._2)).asJava
  }

  implicit def fromValue[T: SchemaFor : FromValue]: FromValue[Map[Base, T]] = new FromValue[Map[Base, T]] {
    override def apply(value: Any, field: Field): Map[Base, T] = {
      val fromValueS = implicitly[FromValue[String]]
      val fromValueT = implicitly[FromValue[T]]
      value.asInstanceOf[java.util.Map[Any, Any]].asScala.map(kv => (revNameMap(fromValueS(kv._1)), fromValueT(kv._2)))(breakOut)
    }
  }
}

Usage example:

case class Wrapper[T](value: T)
def test(): Unit = {
  import BaseMapAvroHelpers._
  val map: Map[Base, String] = Map(First -> "abc", Second -> "xyz")
  val wrapper = Wrapper(map)
  val schema = AvroSchema[Wrapper[Map[Base, String]]]
  println(s"Schema: $schema")

  val bufOut = new ByteArrayOutputStream()
  val out = AvroJsonOutputStream[Wrapper[Map[Base, String]]](bufOut)
  out.write(wrapper)
  out.flush()
  println(s"Avro Out: ${bufOut.size}")
  println(bufOut.toString("UTF-8"))

  val in = AvroJsonInputStream[Wrapper[Map[Base, String]]](new ByteArrayInputStream(bufOut.toByteArray))
  val read = in.singleEntity
  println(s"read: $read")
}

and the output is something like:

Schema: {"type":"record","name":"Wrapper","namespace":"so","fields":[{"name":"value","type":{"type":"map","values":"string"}}]}
Avro Out: 40
{"value":{"first":"abc","second":"xyz"}}
read: Success(Wrapper(Map(First -> abc, Second -> xyz)))