Explode Cassandra UDT with flatmap in Spark 2.x (Scala)

398 Views Asked by At

I have data in Cassandra (3.11.2) as which is also my df :

Data in Cassandra:

id | some_data  
-- | ---------  
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]

df details:

    //|  |-- id: integer (nullable = true)
    //|  |-- some_data: array (nullable = true)
    //|  |    |-- element: struct (containsNull = true)
    //|  |    |    |-- s1: string (nullable = true)
    //|  |    |    |-- s2: string (nullable = true)

Here the Cassandra schema is defined as:

id : String
some_data : list frozen test_udt created as --> CREATE TYPE test.test_udt ( s1 text, s2 text );

I'm using spark-cassandra-connector 2.0 to pull data from Cassandra for processing on Spark 2.2.1.

Required Output

The output is the exploded form of df

id | some_data                                          | s1    | s2  
-- | ---------------------------------------------------| ----- | ---- 
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str11 | str12
1  | [{s1:"str11", s2:"str12"},{s1:"str13", s2:"str14"}]| str13 | str14 
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str21 | str22
2  | [{s1:"str21", s2:"str22"},{s1:"str23", s2:"str24"}]| str23 | str24
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str31 | str32
3  | [{s1:"str31", s2:"str32"},{s1:"str33", s2:"str44"}]| str33 | str34

My approach in the past

I have used spark-cassandra-connector 1.6 and on Spark 1.6 and I had a neat solution to the above problem as:

import org.apache.spark.sql.functions._    
case class my_data(s1 : String, s2 : String)

val flatData = df.explode(df("some_data")){
            case Row(x : Seq[Row]) =>
                x.map(x =>

After we upgraded to 2.x, I'm getting an error on using explode function. The spark document says explode is deprecated. flatMap is suggested as an alternative to explode.


  1. How do I explode the Dataframe in Scala to get the same results as before?
  2. How do I translate my old code using flatmap?

There are 1 best solutions below


You can use explode function which is also suggested as an alternative to the explode method. getItem is used to get a field from a struct by it's name.

df.withColumn("exploded" , explode($"some_data"))
  .withColumn("s1" , $"exploded".getItem("s1"))
  .withColumn("s2" , $"exploded".getItem("s2"))

//|id |some_data                     |s1   |s2   |
//|1  |[[str11,str12], [str13,str14]]|str11|str12|
//|1  |[[str11,str12], [str13,str14]]|str13|str14|
//|2  |[[str21,str22], [str23,str24]]|str21|str22|
//|2  |[[str21,str22], [str23,str24]]|str23|str24|
//|3  |[[str31,str32], [str33,str44]]|str31|str32|
//|3  |[[str31,str32], [str33,str44]]|str33|str44|