Currently I built a table in pyflink, which has schema as below
(
`namespace` STRING,
`pod_name` STRING,
`ts` TIMESTAMP(3) *ROWTIME*,
`delta` FLOAT,
`tag` STRING
)
here is data sample
+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| op | namespace | pod_name | ts | delta | tag |
+----+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+
| +I | sock-shop | catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:28.301 | 0.0 | 2024-01-31 09:30:28.301000 |
| +I | sock-shop | payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:28.435 | 134.0 | 2024-01-31 09:30:28.435000 |
| +I | sock-shop | payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:28.786 | 351.0 | 2024-01-31 09:30:28.786000 |
| +I | sock-shop | catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:30.253 | 1467.0 | 2024-01-31 09:30:30.253000 |
| +I | sock-shop | user-7ff97ccc4d-258m5 | 2024-01-31 09:30:29.415 | 629.0 | 2024-01-31 09:30:29.415000 |
| +I | sock-shop | user-7ff97ccc4d-258m5 | 2024-01-31 09:30:29.415 | 0.0 | 2024-01-31 09:30:29.415000 |
| +I | sock-shop | payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:31.434 | 1181.0 | 2024-01-31 09:30:31.434000 |
| +I | sock-shop | catalogue-5655947b6c-k5rsc | 2024-01-31 09:30:31.301 | 1048.0 | 2024-01-31 09:30:31.301000 |
| +I | sock-shop | payment-7c6b5c47d-wfhtq | 2024-01-31 09:30:31.786 | 352.0 | 2024-01-31 09:30:31.786000 |
I want to add a new column to make nearby elements have the same flag(the last column to be specific, A better implementation is to update tag column, so far, I don't know how to make it either), below is what I did with the origin table
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def update_tag(ps1, ps2):
if ps2.shape[0] > 1 and ps2.iloc[-1] < 3:
ps1.iloc[-1] = str(ps1.iloc[-2])
return ps1.iloc[-1]
table = table\
.over_window(Over.partition_by(col('namespace'))\
.order_by(col('ts'))
.preceding(row_interval(2))
.following(CURRENT_ROW).alias('qq'))\
.select(col('namespace'),
col('pod_name'),
col('ts'),
col('delta'),
col('tag'),
update_tag(col('tag'), col('delta')).over(col('qq'))
)
The actual output is wrong and I corrected it with arrow mark
I came up with an idea, it has faults, it's right in most time, but when the track_beginning return from ps1.iloc[0], it would be wrong.
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def track_beginning(ps1, ps2):
for i, x in enumerate(ps2.values[::-1], 1):
if x > 600:
return ps1.iloc[-i]
return ps1.iloc[0]
table = table\
.over_window(Over.partition_by(col('namespace'))\
.order_by(col('ts'))
.preceding(row_interval(100))
.following(CURRENT_ROW).alias('qq'))\
.select(col('namespace'),
col('pod_name'),
col('ts'),
col('delta'),
col('tag'),
get_window_num(col('ts')).over(col('qq')).alias('oiw'),
track_beginning(col('tag'), col('delta')).over(col('qq')).alias('flag'),
)\
.order_by(col('ts'))
