I have the columnfamily in cassandra in the following map format which I want to process using Spark DataSet. So, I want to bucket the models values in two categories premium (City and Duster
) vs non-premium (Alto K10, Aspire, Nano and i10
) and I want the final count of values of premium vs non-premium which would be 2 (City
and Duster
count) vs 10 (Alto K10, Aspire, Nano and i10
).
Code :
case class UserProfile(userdata:Map[String,Map[String,Int]])
val userprofileDataSet = spark.read.format("org.apache.spark.sql.cassandra").options(Map("table"->"userprofilesagg","keyspace" -> "KEYSPACENAME")).load().as[UserProfile]
How do to the processing on userprofileDataSet ??
Data Format:
{'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4},
'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}}
Edited question:
With respect to squid's answer. I want to aggregate the results for each user now like this :
DOICvncGKUH9xBLnW3e9jXcd2 | non-premium | [Nano, Alto K10, Aspire, i10] | 12 | premium | [City, Duster] | 2
BkkpgeAdCkYJEXsdZjiVz3bSb | non-premium | [Nano, Alto K10, Aspire, i10] | 17 | premium | [City, Duster] | 5
Now case class would look like this
Case class:
case class UserProfile(userid:String, userdata:Map[String,Map[String,Int]])
Data:
DOICvncGKUH9xBLnW3e9jXcd2 | {'bodystyle': {'Compact Sedan': 1, 'Hatchback': 8, 'SUV': 1, 'Sedan': 4},
'models': {'Alto K10': 3, 'Aspire': 4, 'City': 1, 'Duster': 1, 'Nano': 3, 'i10': 2}}
BkkpgeAdCkYJEXsdZjiVz3bSb | {'bodystyle': {'Compact Sedan': 7, 'Hatchback': 5, 'SUV': 3, 'Sedan': 7},
'models': {'Alto K10': 1, 'Aspire': 7, 'City': 4, 'Duster': 1, 'Nano': 8, 'i10': 1}}
Moreover, you asked regarding why I have mentioned Bodystyle. So that I could apply the similar aggregation (SUV, Sedan)
as premium and rest non-premium on it.
I am not sure what exactly is the role of
bodystyle
. If I understood the problem correctly then you want category and count, you may try something like below and removetypes
if not useful:Output