PySpark - Relating Rows to other Rows for filtering

73 Views Asked by At

In PySpark, how do I relate data in one row, to another row, and then filter based on this?

Simplified example,

I have a pyspark dataframe with 9 rows of data per day, and I have many days of data, in the example below I give 2 days, 18 rows of data.

I have a pressurised vessel that refills 3 times in a day automatically. Firstly, I want the final end pressure for the first instance where the data is valid, and secondly to filter all the rows where this value is above a threshold, 3000 psi. In some cases the data is valid twice in a day, but I want the end pressure that corresponds to the first valid instance only.

Dataframe:

Sep_3_2022, p_01_start, 2600

Sep_3_2022, p_01_end, 3100

Sep_3_2022, p_02_start, 2700

Sep_3_2022, p_02_end, 2900

Sep_3_2022, p_03_start, 2700

Sep_3_2022, p_03_end, 3050

Sep_3_2022, p_01_validity, False

Sep_3_2022, p_02_validity, True

Sep_3_2022, p_03_validity, True

Sep_4_2022, p_01_start, 2600

Sep_4_2022, p_01_end, 3100

Sep_4_2022, p_02_start, 2700

Sep_4_2022, p_02_end, 3050

Sep_4_2022, p_03_start, 2700

Sep_4_2022, p_03_end, 3050

Sep_4_2022, p_01_validity, True

Sep_4_2022, p_02_validity, True

Sep_4_2022, p_03_validity, False


Desired outcome 1 (first valid data of the day):

Sep_3_2022, p_02_start, 2700

Sep_3_2022, p_02_end, 2900

Sep_3_2022, p_02_validity, True

Sep_4_2022, p_01_start, 2600

Sep_4_2022, p_01_end, 3100

Sep_4_2022, p_01_validity, True


Desired outcome 2 (first valid data of the day, above threshold, end pressure only):

Sep_4_2022, p_01_end, 3100


I have considered using substring() to determine the 01, 02, portions of the strings to relate them to each-other, but how to go further than that, I don't know.

Thanks for any suggestions

1

There are 1 best solutions below

0
On

A good way to tackle your problem is to use the Fugue library. I converted your data to Pandas DataFrame to create the solution first as it is easier and faster than pyspark data frame.

https://fugue-tutorials.readthedocs.io/

import pandas as pd
from fugue import transform

df = pd.DataFrame(
    [
        ("Sep_3_2022"," p_01_start","2600"),
        ("Sep_3_2022"," p_01_end","3100"),
        ("Sep_3_2022"," p_02_start","2700"),
        ("Sep_3_2022"," p_02_end","2900"),
        ("Sep_3_2022"," p_03_start","2700"),
        ("Sep_3_2022"," p_03_end","3050"),
        ("Sep_3_2022"," p_01_validity","False"),
        ("Sep_3_2022"," p_02_validity","True"),
        ("Sep_3_2022"," p_03_validity","True"),
    ], columns = ["index", "txt", "result"])

def logic(df: pd.DataFrame) -> pd.DataFrame:
    x = df.loc[(df['result'] == "True")].drop_duplicates(subset=['result'],keep='first')
    search = x['txt'].str[:5].to_string()[-4:]
    return df[df['txt'].str.contains(search)]

The first function filters the DataFrame to only include the first instance of a valid sample.

def logicII(df: pd.DataFrame) -> pd.DataFrame:
    x = df.loc[(df['result'] == "True")].drop_duplicates(subset=['result'],keep='first')
    search = x['txt'].str[:5].to_string()[-4:]
    valid = df[df['txt'].str.contains(search+"_end")]
    valid['result'] = valid['result'].apply(pd.to_numeric)
    return valid[valid['result'] > 3000]

The second function does the same thing as the first function but we added a condition that will remove any rows with results lower than 3000psi.

After the functions above are set up, we'll use fugue's Transform to get the data we need.

  1. The df is the dataframe that contains our data
  2. Logic is the function we created
  3. Schema is the datatype you'd like the end item to be
  4. Partition will function as a grouping mechanism as well as core distribution
    transform(df, 
              logic,
              schema="index:str, txt:str, result:str",
              partition={"by":"index"}
    )

    transform(df, 
              logicII,
              schema="index:str, txt:str, result:int",
              partition={"by":"index"}
    )

The above is running on Pandas only. Below is an example that will run on pyspark and will yield a pyspark DataFrame and will be partitioned based on the transform()'s partition parameter.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = transform(df,
 logic,
 schema="index:str, txt:str, result:int",
 partition={"by":"index"},
 engine=spark)

sdf_II = transform(df,
 logicII,
 schema="index:str, txt:str, result:int",
 partition={"by":"index"},
 engine=spark)