Apache Flink issue with JOINS on Kinesis Streams Rowtime attributes must not be in the input rows of a regular join

241 Views Asked by At

i am attempting a simple exercise i have Two kinesis data stream

  • order-stream
  • shipment-stream

SQL 1 Orders


%flink.ssql

CREATE TABLE orders (
    orderid VARCHAR(6),
    orders VARCHAR,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'order-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

SQL 2 shipment

CREATE TABLE shipment (
    orderid VARCHAR(6),
    shipments  VARCHAR(6),
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'shipment-stream',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'TRIM_HORIZON',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

Generating Fake Data into Kinesis Via Python

try:
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from dotenv import load_dotenv
    load_dotenv(".env")
except Exception as e:
    pass

STREAM_NAME_Order = "order-stream"
STREAM_NAME_Shipments = "shipment-stream"


def send_data(kinesis_client):

    order_items_number = random.randrange(1, 10000)

    order_items = {
        "orderid": order_items_number,
        "orders": "1",
        'ts': datetime.datetime.now().isoformat()

    }
    shipping_data = {
        "orderid": order_items_number,
        "shipments": random.randrange(1, 10000),
        'ts': datetime.datetime.now().isoformat()
    }

    partition_key = uuid.uuid4().__str__()
    res = kinesis_client.put_record(
        StreamName=STREAM_NAME_Order,
        Data=json.dumps(order_items),
        PartitionKey=partition_key)
    print(res)
    time.sleep(2)

    res = kinesis_client.put_record(
        StreamName=STREAM_NAME_Shipments,
        Data=json.dumps(shipping_data),
        PartitionKey=partition_key)
    print(res)


if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis',
                                  aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
                                  aws_secret_access_key=os.getenv("DEV_SECRET_KEY"),
                                  region_name="us-east-1",
                                  )
    for i in range(1, 10):
        send_data(kinesis_client)

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time
FROM orders as oo
GROUP BY orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE); 

enter image description here

issue with Joining

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid , TUMBLE_START(oo.ts, INTERVAL '10' MINUTE) as event_time , ss.shipments
FROM orders as oo
JOIN  shipment AS ss  ON oo.orderid = ss.orderid
GROUP BY oo.orderid , TUMBLE(oo.ts, INTERVAL '10' MINUTE) , ss.shipments

Error Messages

TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

java.io.IOException: Fail to run stream sql job
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:172)
    at org.apache.zeppelin.flink.sql.AbstractStreamSqlJob.run(AbstractStreamSqlJob.java:105)
    at org.apache.zeppelin.flink.FlinkStreamSqlInterpreter.callInnerSelect(FlinkStreamSqlInterpreter.java:89)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callSelect(FlinkSqlInterrpeter.java:503)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:266)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:160)
    at org.apache.zeppelin.flink.FlinkSqlInterrpeter.internalInterpret(FlinkSqlInterrpeter.java:112)
    at org.apache.zeppelin.interpreter.AbstractInterpreter.interpret(AbstractInterpreter.java:47)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:852)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:744)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
    at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:132)
    at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:46)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 


Also tried

%flink.ssql(type=update)

SELECT DISTINCT oo.orderid ,
                TUMBLE_START( oo.ts, INTERVAL '1' MINUTE),
                ss.shipments
FROM orders as oo
JOIN  shipment AS ss  ON oo.orderid = ss.orderid
GROUP BY oo.orderid ,
         TUMBLE(CAST(oo.ts AS TIME) ,INTERVAL '1' MINUTE) ,
         ss.shipments

Error Message : SQL validation failed. From line 2, column 17 to line 2, column 57: Call to auxiliary group function 'TUMBLE_START' must have matching call to group function '$TUMBLE' in GROUP BY clause

i am not sure what exactly needs to be done here any help would be great. looking fwed to hear back from expert

1

There are 1 best solutions below

1
On BEST ANSWER
TableException: Rowtime attributes must not be in the input rows of a regular join.
As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

It is not possible for a regular join to have time attributes in its results, because the time attribute could not be well-defined. This is because the rows in a dynamic table must be at least roughly ordered by the time attribute, and there's no way to guarantee this for the result of a regular join (as opposed to an interval join, temporal join, or lookup join).

In versions of Flink before 1.14, the implementation dealt with this by not allowing regular joins to have time attributes in the input tables. While this avoided the problem, it was overly restrictive.

In your case I suggest you rewrite the join as an interval join, so that the output of the join will have a time attribute, making it possible to apply windowing to it.


In the second query, I'm not 100% sure what the problem is, but I suspect the problem is that in one case you are using oo.ts, vs CAST(oo.ts AS TIME) in the other. I think they need to be the same. I don't think Flink's SQL planner is smart enough to figure out what's going on here.