spark dataset API : find out distribution of device usage for each user along with other aggregation

96 Views Asked by At

Below is my dataset.

user,device,time_spent,video_start
userA,mob,5,1
userA,desk,5,2
userA,desk,5,3
userA,mob,5,2
userA,mob,5,2
userB,desk,5,2
userB,mob,5,2
userB,mob,5,2
userB,desk,5,2

I want to find out below aggregation for each user.

   user     total_time_spent        device_distribution
   userA           20                {mob:60%,desk:40%}
   userB           20                {mob:50%,desk:50%}

Can someone help me to achieve this using spark 2.0 API preferably in Java. I have tried using UserDefinedAggregateFunction but it doesn't support group within group as I have to group each user group by device to find aggregated time spent on each device.

2

There are 2 best solutions below

0
On

Here the pivot function is pretty useful. An article from Databricks on the subject. For the code (sorry it's Scala but that shouldn't be a big problem to translate it to Java):

import org.apache.spark.sql.functions.udf

case class DeviceDistribution(mob: String, desk: String)

val makeDistribution = udf((mob: Long, desk: Long) => {
  val mobPct = 100.0 * mob / (mob + desk)
  val deskPct = 100.0 * desk / (mob + desk)

  DeviceDistribution(s"$mobPct%", s"$deskPct%")
})

// load your dataset

data
  .groupBy("user", "device")
  .agg(sum("time_spent").as("total_time_spent_by_device"))
  .groupBy("user")
  .pivot("device", Seq("mob", "desk"))
  .agg(first(col("total_time_spent_by_device")))
  .withColumn("total_time_spent", col("mob") + col("desk"))
  .withColumn("device_distribution", makeDistribution(col("mob"), col("desk")))
  .select("user", "total_time_spent", "device_distribution")
  .show

// Result
+-----+----------------+-------------------+
| user|total_time_spent|device_distribution|
+-----+----------------+-------------------+
|userA|              25|      [60.0%,40.0%]|
|userB|              20|      [50.0%,50.0%]|
+-----+----------------+-------------------+

NB: with the pivot function you need an aggregation function. Here since there is only one value by device, you can simply use first.

The device_distribution column format isn't exactly what you're looking for but:

  • after the pivot line you can do everything you want with your values (that includes the formatting you want)
  • with this case class when saving your output data in a json format for instance, this will have exactly the format you want.
0
On

Florent Moiny,

Thanks to answer my question.

However I found this solution has some issues if I want to push it into production.

For example, I need to know in advance that how many types of devices are possible in my TB datasource. Event pivot is also little difficult to understand in this situation.

I have provided complete solution this problem in Java. You can see it here.

I have used UserDefinedAggregateFunction for this purpose which UDF specially for Aggregate situation.

Basically first I have grouped on User and Device and then called this custom UDF to find device distribution at same time, do the other aggregation on user level.

https://github.com/himanshu-parmar-bigdata/spark-java-udf-demo

Thanks, Himanshu