Compare consecutive rows and extract words(excluding the subsets) in spark

610 Views Asked by At

I am working on a spark dataframe. Input dataframe looks like below (Table 1). I need to write a logic to get the keywords with maximum length for each session ids. There are multiple keywords that would be part of output for each sessionid. expected output looks like Table 2.

Input dataframe:

(Table 1)
|-----------+------------+-----------------------------------|
| session_id| value      |  Timestamp                        |
|-----------+------------+-----------------------------------|
|     1     | cat        | 2021-01-11T13:48:54.2514887-05:00 |
|     1     | catc       | 2021-01-11T13:48:54.3514887-05:00 |
|     1     | catch      | 2021-01-11T13:48:54.4514887-05:00 |
|     1     | par        | 2021-01-11T13:48:55.2514887-05:00 |
|     1     | part       | 2021-01-11T13:48:56.5514887-05:00 |
|     1     | party      | 2021-01-11T13:48:57.7514887-05:00 |
|     1     | partyy     | 2021-01-11T13:48:58.7514887-05:00 |
|     2     | fal        | 2021-01-11T13:49:54.2514887-05:00 |
|     2     | fall       | 2021-01-11T13:49:54.3514887-05:00 |
|     2     | falle      | 2021-01-11T13:49:54.4514887-05:00 |
|     2     | fallen     | 2021-01-11T13:49:54.8514887-05:00 |
|     2     | Tem        | 2021-01-11T13:49:56.5514887-05:00 |
|     2     | Temp       | 2021-01-11T13:49:56.7514887-05:00 |
|-----------+------------+-----------------------------------|

Expected Output:

 (Table 2)
|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     1     | partyy     |
|     2     | fallen     |
|     2     | Temp       |
|-----------+------------|

Solution I tried:

I added another column called col_length which captures the length of each word in value column. later on tried to compare each row with its subsequent row to see if it is of maximum lenth. But this solution only works party.

val df = spark.read.parquet("/project/project_name/abc")

val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))

val ts = Window
        .orderBy("session_id")
        .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = dfM
            .withColumn("running_max", max("col_length") over ts)
            .where($"running_max" === $"col_length")
            .select("session_id", "value", "Timestamp")

Current Output:

|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     2     | fallen     |
|-----------+------------|

Multiple columns does not work inside an orderBy clause with window function so I didn't get desired output.I got 1 output per sesison id. Any suggesions would be highly appreciated. Thanks in advance.

1

There are 1 best solutions below

0
On BEST ANSWER

You can solve it by using lead function:

val windowSpec = Window.orderBy("session_id")
dfM
  .withColumn("lead",lead("value",1).over(windowSpec))
  .filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
  .drop("lead")
  .show