Flag element which are close to each other in streaming

94 Views Asked by At

Currently I built a table in pyflink, which has schema as below

(
  `namespace` STRING,
  `pod_name` STRING,
  `ts` TIMESTAMP(3) *ROWTIME*,
  `delta` FLOAT,
  `tag` STRING
)

here is data sample

+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| op |                      namespace |                       pod_name |                      ts |                          delta |                            tag |
+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| +I |                      sock-shop |     catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:28.301 |                            0.0 |     2024-01-31 09:30:28.301000 |
| +I |                      sock-shop |        payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:28.435 |                          134.0 |     2024-01-31 09:30:28.435000 |
| +I |                      sock-shop |        payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:28.786 |                          351.0 |     2024-01-31 09:30:28.786000 |
| +I |                      sock-shop |     catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:30.253 |                         1467.0 |     2024-01-31 09:30:30.253000 |
| +I |                      sock-shop |          user-7ff97ccc4d-258m5 | 2024-01-31 09:30:29.415 |                          629.0 |     2024-01-31 09:30:29.415000 |
| +I |                      sock-shop |          user-7ff97ccc4d-258m5 | 2024-01-31 09:30:29.415 |                            0.0 |     2024-01-31 09:30:29.415000 |
| +I |                      sock-shop |        payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:31.434 |                         1181.0 |     2024-01-31 09:30:31.434000 |
| +I |                      sock-shop |     catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:31.301 |                         1048.0 |     2024-01-31 09:30:31.301000 |
| +I |                      sock-shop |        payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:31.786 |                          352.0 |     2024-01-31 09:30:31.786000 |

I want to add a new column to make nearby elements have the same flag(the last column to be specific, A better implementation is to update tag column, so far, I don't know how to make it either), below is what I did with the origin table

@udaf(result_type=DataTypes.STRING(),  func_type="pandas")
def update_tag(ps1, ps2):
    if ps2.shape[0] > 1 and ps2.iloc[-1] < 3:
        ps1.iloc[-1] = str(ps1.iloc[-2])
    return ps1.iloc[-1]

table = table\
                .over_window(Over.partition_by(col('namespace'))\
                                .order_by(col('ts'))
                                .preceding(row_interval(2))
                                .following(CURRENT_ROW).alias('qq'))\
                .select(col('namespace'),
                    col('pod_name'),
                    col('ts'),
                    col('delta'),
                    col('tag'),
                    update_tag(col('tag'), col('delta')).over(col('qq'))
                    )

The actual output is wrong and I corrected it with arrow mark

enter image description here


I came up with an idea, it has faults, it's right in most time, but when the track_beginning return from ps1.iloc[0], it would be wrong.

@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def track_beginning(ps1, ps2):
    for i, x in enumerate(ps2.values[::-1], 1):
        if x > 600:
            return ps1.iloc[-i]
    return ps1.iloc[0]


table = table\
                .over_window(Over.partition_by(col('namespace'))\
                                .order_by(col('ts'))
                                .preceding(row_interval(100))
                                .following(CURRENT_ROW).alias('qq'))\
                .select(col('namespace'),
                    col('pod_name'),
                    col('ts'),
                    col('delta'),
                    col('tag'),
                    get_window_num(col('ts')).over(col('qq')).alias('oiw'),
                    track_beginning(col('tag'), col('delta')).over(col('qq')).alias('flag'),
                    )\
                .order_by(col('ts'))
0

There are 0 best solutions below