In PySpark, how do I relate data in one row, to another row, and then filter based on this?
Simplified example,
I have a pyspark dataframe with 9 rows of data per day, and I have many days of data, in the example below I give 2 days, 18 rows of data.
I have a pressurised vessel that refills 3 times in a day automatically. Firstly, I want the final end pressure for the first instance where the data is valid, and secondly to filter all the rows where this value is above a threshold, 3000 psi. In some cases the data is valid twice in a day, but I want the end pressure that corresponds to the first valid instance only.
Dataframe:
Sep_3_2022, p_01_start, 2600
Sep_3_2022, p_01_end, 3100
Sep_3_2022, p_02_start, 2700
Sep_3_2022, p_02_end, 2900
Sep_3_2022, p_03_start, 2700
Sep_3_2022, p_03_end, 3050
Sep_3_2022, p_01_validity, False
Sep_3_2022, p_02_validity, True
Sep_3_2022, p_03_validity, True
Sep_4_2022, p_01_start, 2600
Sep_4_2022, p_01_end, 3100
Sep_4_2022, p_02_start, 2700
Sep_4_2022, p_02_end, 3050
Sep_4_2022, p_03_start, 2700
Sep_4_2022, p_03_end, 3050
Sep_4_2022, p_01_validity, True
Sep_4_2022, p_02_validity, True
Sep_4_2022, p_03_validity, False
Desired outcome 1 (first valid data of the day):
Sep_3_2022, p_02_start, 2700
Sep_3_2022, p_02_end, 2900
Sep_3_2022, p_02_validity, True
Sep_4_2022, p_01_start, 2600
Sep_4_2022, p_01_end, 3100
Sep_4_2022, p_01_validity, True
Desired outcome 2 (first valid data of the day, above threshold, end pressure only):
Sep_4_2022, p_01_end, 3100
I have considered using substring() to determine the 01, 02, portions of the strings to relate them to each-other, but how to go further than that, I don't know.
Thanks for any suggestions
A good way to tackle your problem is to use the Fugue library. I converted your data to Pandas DataFrame to create the solution first as it is easier and faster than pyspark data frame.
https://fugue-tutorials.readthedocs.io/
The first function filters the DataFrame to only include the first instance of a valid sample.
The second function does the same thing as the first function but we added a condition that will remove any rows with results lower than 3000psi.
After the functions above are set up, we'll use fugue's Transform to get the data we need.
The above is running on Pandas only. Below is an example that will run on pyspark and will yield a pyspark DataFrame and will be partitioned based on the transform()'s partition parameter.