How to create table using Table Api in flink for modelling

I am working on creating a table using Table API from the data-stream of confluent-avro (from kafka topic). I am trying using the below code ( Flink version is 1.17) and it is creating table in such a way that resultTable column contains the complete json in one column of table. Can someone please help me in how to create table in such a way that each field of event is mapped to one column in the table.

DataStream<GenericRecord> dataStream = ...;

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table attribution = tableEnv.fromDataStream(dataStream);

tableEnv.createTemporaryView("attribution", attribution);
Table resultTable = tableEnv.sqlQuery("SELECT f0 FROM attribution");
tableEnv.createTemporaryView("resultTable", resultTable);
tableEnv.sqlQuery("SELECT `field_name` from resultTable").execute().collect().forEachRemaining(System.out::println); // Throwing Exception.
1

There are 1 best solutions below

2
David Anderson On

You can use the variant of fromDataStream that takes a Schema as the second argument. Here's an example I copied from the documentation:

Table table =
    tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .column("event_time", "TIMESTAMP_LTZ(3)")
            .column("name", "STRING")
            .column("score", "INT")
            .watermark("event_time", "SOURCE_WATERMARK()")
            .build());

SOURCE_WATERMARK() is for cases where the DataStream being converted to a Table already has suitable watermarks.