Optimizing the computation of rows using Pyspark

46 Views Asked by At

I'm encountering a problem that is working with loops in pandas is not efficient, and I'm new to Pyspark, so I would like to ask for ideas, solutions to complete 2 tasks illustrated below.

I have a dataset consisting of 700K rows like this:

Date format: MM-DD-YYYY

ID DATE REF_DATE A B C
1 01-31-2020 10-31-2019 3000 0.05 320
2 02-28-2020 10-31-2019 2750 0.04 300
3 03-31-2020 10-31-2019 2600 0.03 270

Tasks/steps:

1. Generate new rows for each id if the condition is satisfied

Condition: if Date > Ref_Date, add one more row, with the value of date decreasedcreased by 1 month unit until the new added date value = Ref_Date, keeping other variables' value constant. This is the expected output which will have in reality 30 million rows.

ID DATE REF_DATE A B C
1 01-31-2020 10-31-2019 3000 0.05 320
1 12-31-2019 10-31-2019 3000 0.05 320
1 11-30-2019 10-31-2019 3000 0.05 320
1 10-31-2019 10-31-2019 3000 0.05 320
2 02-28-2020 10-31-2019 2750 0.04 300
2 01-31-2020 10-31-2019 2750 0.04 300
2 12-31-2019 10-31-2019 2750 0.04 300
2 11-30-2019 10-31-2019 2750 0.04 300
2 10-31-2019 10-31-2019 2750 0.04 300
3 03-31-2020 10-31-2019 2600 0.03 270
3 02-28-2020 10-31-2019 2600 0.03 270
3 01-31-2020 10-31-2019 2600 0.03 270
3 12-31-2019 10-31-2019 2600 0.03 270
3 11-30-2019 10-31-2019 2600 0.03 270
3 10-31-2019 10-31-2019 2600 0.03 270

2. Create & compute 3 new columns named D, E, F as follows

IF it's the first observation within the group (by ID):
    E = A * B
    F = C - E
    D = A - C 
IF it's the second and later on rows within that group:
    E = lag(D) * B
    F = C - E
    D = lag(D) - C

D here indicates the value of D from the previous row, if we proceed row 2, we use D from row 1, then to proceed row 3, we use the value of D of row 2, ...

The expected output will be like, I show only the first ID as an example, the other ID will be processed in the same manner.

ID DATE REF_DATE A B C D E F
1 01-31-2020 10-31-2019 3000 0.05 320 2830 150 170
1 10-31-2019 10-31-2019 3000 0.05 320 2651.5 141.5 178.5
1 12-31-2019 10-31-2019 3000 0.05 320 2464.01 132.6 187.4
1 11-30-2019 10-31-2019 3000 0.05 320 2267.3 123.2 196.8

P.s: I need to process and store this whole dataset for other calculation later. Is there any way without using loops? Thank you!

I tried using pandas and multithreading but now I need to do it in Pyspark

0

There are 0 best solutions below