Scala: How convert nested/complicated data to avro using avro4s?

1.5k Views Asked by At

I have Scala 2.12 and imported the library avro4s, following the link for my requirement.

Basically, my avro schema is as follows: Sample only:

Schema: {"name": "person","type": "record","fields": [{"name": "address","type": {"type" : "record","name" : "AddressUSRecord","fields" : [{"name": "streetaddress", "type": "string"},{"name": "city", "type":"string"}]}}]}

So I have 3 case classes created.

I tested the schema based on these classes, and it looks fine.
So, the schema generation is good.

Now, I am creating the required objects, as per case class.

When I am trying to write the avro file, I am getting null pointer exception.

Error:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.avro.util.Utf8$2.toUtf8(Utf8.java:123)
    at org.apache.avro.util.Utf8.getBytesFor(Utf8.java:172)
    at org.apache.avro.util.Utf8.<init>(Utf8.java:39)
    at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:73)
    at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:68)
    at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
    at com.sksamuel.avro4s.Encoder$.encodeFieldLazy(Encoder.scala:379)
    at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
    at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
    at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
    at com.sksamuel.avro4s.Encoder$.encodeFieldNotLazy(Encoder.scala:373)
    at MyClass$$anon$4.encode(MyClass.scala:90)
    at MyClass$$anon$4.encode(MyClass.scala:90)
    at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2(AvroDataOutputStream.scala:35)
    at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2$adapted(AvroDataOutputStream.scala:34)
    at com.sksamuel.avro4s.AvroDataOutputStream.write(AvroDataOutputStream.scala:46)
    at MyClass$.delayedEndpoint$MyClass$1(MyClass.scala:91)
    at MyClass$delayedInit$body.apply(MyClass.scala:42)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at MyClass$.main(MyClass.scala:42)
    at MyClass.main(MyClass.scala)

Code:

//import java.io.File
import com.sksamuel.avro4s.{AvroOutputStream, AvroSchema}
import java.io.File

//case class Person(name: String, age: Int)
//case class Book(title: String, year: Int, owner: Person, authors: Seq[Person])
// case class as per schema

object MyClass extends App {
  val outFile = "/path/TestScala.avro"
   // val schema = AvroSchema[Book]
  println("Hello, World!")
   // println(schema)


  val head = header(
    prop1="val1"
    prop2=null

  )

  val pnlBody = pnlData(
    <corresponsing property vlaues, some with null>
  )

  val record = MyClass(header = head, body = pnlBody)

  val schema = AvroSchema[MyClass]
  println(schema)
  println(record)

  val os = AvroOutputStream.data[MyClass].to(new File(outFile)).build(schema)
  os.write(record)
  os.flush()
  os.close()

}

Basically, based on the schema I have, I want to understand how should be my final record object?

UPDATE:

Based on the below suggestions by @Antot and @Daniel, I changed my header and body class to use Option[String] for all the values which are expected to be null. But still the same issue.

Changes to the case classes of header and data, produced following schema and record. Is the below record correctly created?

Please advise?

UPDATE 2:
I think the issue with the Nulls. The records are expected to have few attributes as NULL. Since I changed to Option[String], its value should be None and not null. I am new to Scala, so still understanding its data types.

So, changing the value from null to None works now.

However, I still have one question. If my attributes are Option[String], how does it translates to Avro? If my value is None, does it translate to Avro null?

1

There are 1 best solutions below

0
On

Your problem is that your schema does not define the fields as nullable. If you have null values, that must be supported in the schema. To do this in Avro you have to create a "union" of two schemas - one being the NULL schema, and one being the "real type". Eg, look at this schema.

{
  "type": "record",
  "name": "MyClass",
  "namespace": "com.sksamuel.avro4s",
  "fields": [
    {
      "name": "a",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

This is a record type com.sksamuel.avro4s.MyClass that has one field, a. Then the type of a can be either null or string. So when writing out records of this type, you can use either null or a string for field a.

Now, you don't have to create this schema by hand (as you did in your post). You can use the AvroSchema macro to do the magic for you based on a case class.

val schema = AvroSchema[MyClass].

When using this macro, a nullable union will be created if you define the type to be Option[T]. So you could do,

case class MyClass(a: Option[String])

And you would end up with the same schema above. If you did,

case class MyClass(a: String)

Then the schema would be:

{
  "type": "record",
  "name": "MyClass",
  "namespace": "com.sksamuel.avro4s",
  "fields": [
    {
      "name": "a",
      "type": "string"
    }
  ]
}

tl;dr

Either create the schema from a case class where the nullable field is defined as an Option, or update your hand rolled schema to use a union of {null, T}.