adding flag based on occurrence of repetitive pattern in column categories using pyspark

258 Views Asked by At

I have a pyspark dataframe like this:

port#|       log_date    |code
1111 |2022-05-16 08:07:23|AAA
1111 |2022-05-16 08:08:23|XXX
1111 |2022-05-16 08:09:23|BBB
1111 |2022-05-16 08:10:23|CCC
1111 |2022-05-16 08:11:23|YYY
1111 |2022-05-16 08:12:23|DDD
1111 |2022-05-16 08:13:23|EEE
2222 |2022-05-17 09:07:23|AAA
2222 |2022-05-17 09:08:23|XXX
2222 |2022-05-17 09:09:23|BBB
2222 |2022-05-17 09:10:23|CCC
2222 |2022-05-17 09:11:23|YYY
2222 |2022-05-17 09:12:23|DDD
2222 |2022-05-17 09:13:23|EEE

I want to flag the rows that occur between codes XXX and YYY (inclusive) group by each port# and sort by log_date.

I tried window functions that partition by port# and order by log_date but could not get the desired result. The expected result would be something like below:

port#|       log_date    |code|flag
1111 |2022-05-16 08:07:23|AAA | 0
1111 |2022-05-16 08:08:23|XXX | 1
1111 |2022-05-16 08:09:23|BBB | 1
1111 |2022-05-16 08:10:23|CCC | 1
1111 |2022-05-16 08:11:23|YYY | 1
1111 |2022-05-16 08:12:23|DDD | 0
1111 |2022-05-16 08:13:23|EEE | 0
2222 |2022-05-17 09:07:23|AAA | 0
2222 |2022-05-17 09:08:23|XXX | 1
2222 |2022-05-17 09:09:23|BBB | 1
2222 |2022-05-17 09:10:23|CCC | 1
2222 |2022-05-17 09:11:23|YYY | 1
2222 |2022-05-17 09:12:23|DDD | 0
2222 |2022-05-17 09:13:23|EEE | 0

Can anyone help on how to write this logic in pyspark.

1

There are 1 best solutions below

0
On BEST ANSWER

Assuming there is only single XXX and YYY in each port#

We can try to find XXX and YYY timestamp for each port and flag the values which fall between those time range for each port

Below is data preparation and code to execute above step

import pandas as pd
from io import StringIO
from pyspark.sql import functions as F
from pyspark.sql import Window

s="""
port#|log_date|code
1111 |2022-05-16 08:07:23|AAA
1111 |2022-05-16 08:08:23|XXX
1111 |2022-05-16 08:09:23|BBB
1111 |2022-05-16 08:10:23|CCC
1111 |2022-05-16 08:11:23|YYY
1111 |2022-05-16 08:12:23|DDD
1111 |2022-05-16 08:13:23|EEE
2222 |2022-05-17 09:07:23|AAA
2222 |2022-05-17 09:08:23|XXX
2222 |2022-05-17 09:09:23|BBB
2222 |2022-05-17 09:10:23|CCC
2222 |2022-05-17 09:11:23|YYY
2222 |2022-05-17 09:12:23|DDD
2222 |2022-05-17 09:13:23|EEE"""

pdf=pd.read_csv(StringIO(s),sep="|")
spdf=spark.createDataFrame(pdf)

spdf.withColumn("test",F.last(F.when(F.col("code")=="XXX",F.col("log_date")).otherwise(None),True).\
                over(Window.partitionBy("port#").\
                orderBy("log_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).\
withColumn("test2",F.last(F.when(F.col("code")=="YYY",F.col("log_date")).otherwise(None),True).\
                over(Window.partitionBy("port#").\
                orderBy("log_date").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).\
withColumn("flag",F.when(( F.col("log_date")>=F.col("test")) & ( F.col("log_date")<=F.col("test2")),1).otherwise(0)).drop("test","test2").show()


#output
+-----+-------------------+----+----+
|port#|           log_date|code|flag|
+-----+-------------------+----+----+
| 1111|2022-05-16 08:07:23| AAA|   0|
| 1111|2022-05-16 08:08:23| XXX|   1|
| 1111|2022-05-16 08:09:23| BBB|   1|
| 1111|2022-05-16 08:10:23| CCC|   1|
| 1111|2022-05-16 08:11:23| YYY|   1|
| 1111|2022-05-16 08:12:23| DDD|   0|
| 1111|2022-05-16 08:13:23| EEE|   0|
| 2222|2022-05-17 09:07:23| AAA|   0|
| 2222|2022-05-17 09:08:23| XXX|   1|
| 2222|2022-05-17 09:09:23| BBB|   1|
| 2222|2022-05-17 09:10:23| CCC|   1|
| 2222|2022-05-17 09:11:23| YYY|   1|
| 2222|2022-05-17 09:12:23| DDD|   0|
| 2222|2022-05-17 09:13:23| EEE|   0|
+-----+-------------------+----+----+