I am using Flink 1.18 with Java and I am trying use a User-defined table aggregate function (UDTAGG). The documentation only includes an example with Table API for UDTAGG. However, when I try achieving same results by using a query in Flink SQL, I get planning errors.
My records consist of "Event" logs, where each event has a timestamp, eventId and some payload:
{
"ts": BIGINT,
"eventId": BIGINT,
"payload": VARCHAR
}
I am trying to group events by their EventId values and apply my user-defined table aggregate function to them. The following program, which uses the Table API, works fine:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final DataStream<Event> events = env.fromCollection(
Arrays.asList(
new Event(1710104562L, 1001L, "ABC"),
new Event(1710105162L, 1002L, "DEF")
...));
final Table table = tableEnv.fromDataStream(events);
tableEnv.createTemporaryView("tab", table);
// Define a UDTAGG
tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION aggrFunc as '...' using jar '...'");
final Table results = tableEnv.from("tab")
.groupBy($("eventId"))
.flatAggregate(
Expressions.call("aggrFunc", Row.of($("ts"), $("eventId"), $("payload")))
.as("ts", "eid", "payload"))
.select($("ts"), $("eid").as("eventId"), $("payload"));
results.execute().print();
Now, I am trying to use a Flink SQL query to do the same and get a type mismatch error from Calcite:
String sql = "SELECT eventId, aggrFunc(CAST((ts, eventId, payload) AS Row<ts BIGINT, eventId BIGINT, payload STRING>))
FROM tab
GROUP BY eventId";
tableEnv.executeSql(sql).print();
// error
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(BIGINT EXPR$0) NOT NULL
equiv rowtype: RecordType(RecordType:peek_no_expand(BIGINT ts, BIGINT id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" payload) EXPR$0) NOT NULL
Difference:
EXPR$0: BIGINT -> RecordType:peek_no_expand(BIGINT ts, BIGINT id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" payload)
at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
...
Any suggestions on how a UDTAGG should be used via the SQL API in Flink?
As of Flink 1.18, UDTAGG is only supported via Table API. Check here