Aggregate Map of map values using Spark DataSet

984 Views Asked by At

I have the columnfamily in cassandra in the following map format which I want to process using Spark DataSet. So, I want to bucket the models values in two categories premium (City and Duster) vs non-premium (Alto K10, Aspire, Nano and i10) and I want the final count of values of premium vs non-premium which would be 2 (City and Duster count) vs 10 (Alto K10, Aspire, Nano and i10).

Code :

  case class UserProfile(userdata:Map[String,Map[String,Int]])

 val userprofileDataSet = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"userprofilesagg","keyspace" -> "KEYSPACENAME")).load().as[UserProfile]

How do to the processing on userprofileDataSet ??

Data Format:

 {'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4},   
    'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}}

Edited question:

With respect to squid's answer. I want to aggregate the results for each user now like this :

    DOICvncGKUH9xBLnW3e9jXcd2 | non-premium | [Nano, Alto K10, Aspire, i10] | 12 | premium | [City, Duster] | 2
    BkkpgeAdCkYJEXsdZjiVz3bSb | non-premium | [Nano, Alto K10, Aspire, i10] | 17 | premium | [City, Duster] | 5 

Now case class would look like this

Case class:

 case class UserProfile(userid:String, userdata:Map[String,Map[String,Int]])

Data:

DOICvncGKUH9xBLnW3e9jXcd2 | {'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4},   
    'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}}

BkkpgeAdCkYJEXsdZjiVz3bSb | {'bodystyle': {'Compact Sedan': 7, 'Hatchback': 5, 'SUV': 3, 'Sedan': 7},   
    'models': {'Alto K10': 1, 'Aspire': 7, 'City': 4, 'Duster': 1, 'Nano': 8, 'i10': 1}}  

Moreover, you asked regarding why I have mentioned Bodystyle. So that I could apply the similar aggregation (SUV, Sedan) as premium and rest non-premium on it.

1

There are 1 best solutions below

0
On

I am not sure what exactly is the role of bodystyle. If I understood the problem correctly then you want category and count, you may try something like below and remove types if not useful:

--userprofile table
CREATE TABLE `userprofile`(
`properties` map<string,map<string,int>>);

--Aggregate by category
select category, 
       collect_set(type) as types, 
       sum(value) as count 
from (select case when lower(type) in ('city','duster') then 'premium'
             when lower(type) in ('alto k10', 'aspire', 'nano' , 'i10') then 'non-premium'
        end as category,
        type,value 
 from (select properties['models'] as models from userprofile) t
    lateral view explode(models) t as type, value)l group by category

Output

category    |   types                            |         count
non-premium | ["Aspire","i10","Nano","Alto K10"] |         12
premium     | ["City","Duster"]                  |         2