I created a flink table that contains data type fields, and the error type does not match。 I want to know how to create a temporary table containing an array type in a flink table.
public class FlinkConnectorClickhouse {
public static void main(String[] args) throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<Order> dataStream = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 1, Arrays.asList("name01", "name02", "name03"), Arrays.asList(1, 2, 3)));
Table inputTable = tableEnv.fromDataStream(dataStreamMap, $("user").as("user_a"), $("product"), $("amount"), $("name_list"), $("id_list"));
// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
tableEnv.executeSql("CREATE TABLE sink_table (\n" +
" `user_a` BIGINT,\n" +
" `product` VARCHAR,\n" +
" `amount` BIGINT,\n" +
" `name_list` ARRAY<STRING>,\n" +
" `id_list` ARRAY<INT>,\n" +
" PRIMARY KEY (user_a) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */\n" +
") WITH (\n" +
")");
TableResult resultTable = tableEnv.executeSql("INSERT INTO sink_table SELECT user_a, product, amount,name_list,id_list FROM InputTable");
env.execute();
}
public static class Order {
private Long user;
private String product;
private Integer amount;
private List<String> name_list;
private List<Integer> id_list;
}
is probably where it's going wrong. Where is your sink connector? Is it Kafka? It is Hive? Is it a regular database with JDBC?
You'll find a list under the "Table API connectors" here: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/. Be sure that you follow the right JAR packaging/deployment/management policy.
If you're a beginner, you might be interested in this one which is bundled with Flink:
or this one:
or this one: