Using Spark converting nested json with optional fields to Scala case class not working

1.3k Views Asked by At

I have a use case where I need to read a json file or json string using spark as Dataset[T] in scala. The json file has nested elements and some of the elements in the json are optional. I am able to read the json file and map those to case class if I ignore optional fields in the json as the schema matches with the case class.

According to this link and answer it works for first level json when case class have option field but if it is there is nested element it does not work.

Json String that I am using is as below :

val jsonString = """{
  "Input" :
    {
      "field1" : "Test1",
      "field2" : "Test2",
      "field3Array" : [
      {
      "key1" : "Key123",
      "key2" : ["keyxyz","keyAbc"]
      }
                ]
    },
  "Output": 
    {
      "field1" : "Test2",
      "field2" : "Test3",
      "requiredKey" : "RequiredKeyValue",
      "field3Array" : [
      {
      "key1" : "Key123",
      "key2" : ["keyxyz","keyAbc"]
      }
                ]
    }
}"""

The case class that I have created are as below :

case class InternalFields (key1: String, key2 : Array[String])
case class Input(field1:String, field2: String,field3Array : Array[InternalFields])
case class Output(field1:String, field2: String,requiredKey : String,field3Array : Array[InternalFields])
case class ExternalObject(input : Input, output : Output)

The code through which I am reading the jsonString is as below :

val df = spark.read.option("multiline","true").json(Seq(jsonString).toDS).as[ExternalObject]

The above code works perfectly fine. Now when I add a optional field in the Output case class as json string could have it to support some use case it throws an error saying that the optional field that I have specified in the case class is missing.

So in order to get around this I went ahead and tried providing schema using encoders and see if that works.

After adding optional field my case class got changed to as below :

case class InternalFields (key1: String, key2 : Array[String])
case class Input(field1:String, field2: String,field3Array : Array[InternalFields])
case class Output(field1:String, field2: String,requiredKey : String, optionalKey : Option[String],field3Array : Array[InternalFields])  //changed
case class ExternalObject(input : Input, output : Output)

There is one additional optional field added in Output case class.

Now I am trying to read the jsonString as below :

import org.apache.spark.sql.Encoders
val schema = Encoders.product[ExternalObject].schema
val df = spark.read
           .schema(schema)
           .json(Seq(jsonString).toDS)
           .as[ExternalObject]

When I do df.show or display(df) it gives me output table as below which is null for both input column as well as output column.

enter image description here

If I remove that optional field from the case class then this code also works fine and shows me the expected output.

Is there any way by which I can make this optional field in the inner json or inner case class work and cast it directly to respective case class inside dataset[T].

Any ideas, guidance, suggestions that can make it work would be of great help.

2

There are 2 best solutions below

1
On

The problem is that spark uses struct types to map a class to a Row, take this as an example:

case class MyRow(a: String, b: String, c: Option[String])

Can spark create a dataframe, which sometimes has column c and sometimes not? like:

+-----+-----+-----+
|  a  |  b  |  c  |
+-----+-----+-----+
| a1  | b1  |  c1 |
+-----+-----+-----+
| a2  | b2  |        <-- note the non-existence here :)
+-----+-----+-----+
| a3  | b3  | c3  |
+-----+-----+-----+

Well it cannot, and being nullable, means the key has to exist, but the value can be null:

    ... other key values
    "optionalKey": null,
    ...

This is considered to be valid, and is convertible to your structs. I suggest you use a dedicated JSON library (as you know there are many of them out there), and use udf's or something to extract what you need from json.

3
On

I tested the above code base with the following case class structres

case class Field3Array(
  key1: String,
  key2: List[String]
)
case class Input(
  field1: String,
  field2: String,
  field3Array: List[Field3Array]
)
case class Output(
  field1: String,
  field2: String,
  requiredKey: String,
  field3Array: List[Field3Array]
)
case class Root(
  Input: Input,
  Output: Output
)

The Json string cannot be directly passed to the DataFrameReader as you have tried since the json method expects a path. I put the JSON string in a file and passed the file path to the DataFrameReader and the results were as follows

import org.apache.spark.sql.{Encoder,Encoders}
import org.apache.spark.sql.Dataset

case class Field3Array(
  key1: String,
  key2: List[String]
)
case class Input(
  field1: String,
  field2: String,
  field3Array: List[Field3Array]
)
case class Output(
  field1: String,
  field2: String,
  requiredKey: String,
  field3Array: List[Field3Array]
)
case class Root(
  Input: Input,
  Output: Output
)


val pathToJson: String = "file:////path/to/json/file/on/local/filesystem"

val jsEncoder: Encoder[Root] = Encoders.product[Root]

val df: Dataset[Root] = spark.read.option("multiline","true").json(pathToJson).as[Root]

The results for show are as follows:

df.show(false)

+--------------------------------------------+--------------------------------------------------------------+
|Input                                       |Output                                                        |
+--------------------------------------------+--------------------------------------------------------------+
|[Test1, Test2, [[Key123, [keyxyz, keyAbc]]]]|[Test2, Test3, [[Key123, [keyxyz, keyAbc]]], RequiredKeyValue]|
+--------------------------------------------+--------------------------------------------------------------+

df.select("Input.field1").show()

+------+
|field1|
+------+
| Test1|
+------+