My dataset:

|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|   Event_time_NoUTC|    Event_timestamp|day_of_week|hour|primaryCategory|secondaryCategory|eventVisits|productCount|secondaryCategoryCount|     AvgCatExpense|SessCount|
|2019-10-06 07:04:...|      view|   1004565|2053013555631882655||  huawei| 169.84|231943435|428ebb99-3568-4e1...|2019-10-06 07:04:50|2019-10-06 07:04:50|          1|   7|    electronics|       smartphone|          1|           1|                     1| 380.2349402627628|        1|
|2019-10-25 03:50:...|      view|   5100337|2053013553341792533|  electronics.clocks|   apple| 319.34|266287781|f55edf02-3fd4-48f...|2019-10-25 03:50:28|2019-10-25 03:50:28|          6|   3|    electronics|           clocks|          7|           7|                     7| 369.7054359810376|        4|
|2019-10-25 03:52:...|      view|   1005105|2053013555631882655||   apple|1397.09|266287781|118dbcd6-fe31-4cc...|2019-10-25 03:52:09|2019-10-25 03:52:09|          6|   3|    electronics|       smartphone|          7|           7|                     7| 369.7054359810376|        4|
|2019-10-26 12:15:...|      view|   6000157|2053013560807654091|auto.accessories....|starline|  91.12|266287781|992d03b4-c561-4fb...|2019-10-26 12:15:56|2019-10-26 12:15:56|          7|  12|           auto|      accessories|          7|           7|                     7| 369.7054359810376|        4|

The event type has three categories: View, cart and Purchase. I want to classify the user_id and product_id with a new column is_purchased=1 if it has event type as purchase and others will be 0. After that, I would remove the redundant rows as shown below which would basically help me classify my data whether a customer will churn or not.

Pictorial representation of removing redundant data

I am thinking of partitioning data with user_id and product_id and then classify for those which has purchase. Please suggest your approaches to solve this?


There are 2 best solutions below


Step 1: group the data by user and product and mark if each group contains the event purchase:

from pyspark.sql import functions as F

data = [("A",123, "view", "other attributes 1"),
        ("A",123, "cart", "other attributes 2"),
        ("A",123, "purchase", "other attributes 3"),
        ("B",123, "cart", "other attributes 4")]
df = spark.createDataFrame(data, schema = ["user", "product", "event", "other"])

is_purchased = df.groupBy("user", "product").agg(
    F.array_contains(F.collect_set("event"), "purchase").alias("is_purchased"))

# +----+-------+------------+
# |user|product|is_purchased|
# +----+-------+------------+
# |   A|    123|        true|
# |   B|    123|       false|
# +----+-------+------------+

Step 2: join the result from step 1 with the original data and filter out the redundant rows:

result = df.join(is_purchased, on=["user", "product"], how="left") \
    .filter("event= 'cart'")

# +----+-------+-----+------------------+------------+
# |user|product|event|             other|is_purchased|
# +----+-------+-----+------------------+------------+
# |   A|    123| cart|other attributes 2|        true|
# |   B|    123| cart|other attributes 4|       false|
# +----+-------+-----+------------------+------------+

You can also apply a window function and get all events of each user and product, then filter (I'm using same sample data as @werner)

from pyspark.sql import functions as F
from pyspark.sql import Window as W

    .withColumn('events', F.collect_set('event').over(W.partitionBy('user', 'product')))
    .withColumn('is_purchased', F.array_contains(F.col('events'), 'purchase'))
    .withColumn('is_purchased', F.array_contains(F.col('events'), 'purchase'))
    .where(F.col('event') == 'cart')
    .show(10, False)

|user|product|event|other             |events                |is_purchased|
|A   |123    |cart |other attributes 2|[cart, view, purchase]|true        |
|B   |abc    |cart |other attributes 4|[cart]                |false       |