Group by a dataset and run Custom function in Spark Scala

755 Views Asked by At

I am using Spark Scala and have a dataset which I want to group by and then send the GroupedData to a a custom function. In the custom function, I would process the rows and update an empty dataframe.

I have the below dataframe DF1:

+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
| ACC_SECURITY|ACCOUNT_NO|COSTCENTER|    BU|   MPU|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|POS_NEG_QUANTITY|PROCESSED|ALLOC_QUANTITY|NET_QUANTITY|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2|   18063|               P|         |             0|           0|
|3FA34782290X2|  3FA34782|    0800TS|BOXXBU|BOXXMP|    0102|     5322|      290X2|    -863|               N|         |             0|           0|
|3FA34789290X2|  3FA34789|    0800TS|BOXXBU|BOXXMP|    0101|     5279|      290X2| -108926|               N|         |             0|           0|
|9211530135G71|  92115301|    08036C|BOXXBU|BOXXMP|    0154|     8380|      35G71|    8003|               P|         |             0|           0|
|9211530235G71|  92115302|    08036C|BOXXBU|BOXXMP|    0144|     8382|      35G71|   -2883|               N|         |             0|           0|
+-------------+----------+----------+------+------+--------+---------+-----------+--------+----------------+---------+--------------+------------+

After grouping on SECURITY_ID, I get 2 datasets based on SECURITY_ID values (290X2 and 35G71). These datasets have to be sent to a custom function.

I tried:

  1. Groupby on SECURITY_ID but it needs some aggregation to be done, which I don't have:

    DF1.groupBy("SECURITY_ID").agg(max("SECURITY_ID")).apply(F) 
    

    I dont want aggregation but I can still drop the aggregated column as long as I can pass a function F in the apply block, on the grouped dataset. But apply doesnt take any custom function.

  2. Window function on SECURITY_ID but I don't know how to execute a custom function based on each window:

    val window = Window.partitionBy("security_id") 
    val option2DF = DF1.withColumn("Quantity_Row", F over(window))
    

    I want to see how I can call a function F over the window but not by adding a column.

0

There are 0 best solutions below