I am using flink 1.13. I am trying to convert table results to the datastream in a following way but keep getting error.
public class HybridTrial {
public static class Address {
public String street;
public String houseNumber;
public Address() {}
public Address(String street, String houseNumber) {
this.street = street;
this.houseNumber = houseNumber;
}
}
public static class User {
public String name;
public Integer score;
public LocalDateTime event_time;
public Address address;
// default constructor for DataStream API
public User() {}
// fully assigning constructor for Table API
public User(String name, Integer score, LocalDateTime event_time, Address address) {
this.name = name;
this.score = score;
this.event_time = event_time;
this.address = address;
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<User> dataStream =
env.fromElements(
new User("Alice", 4, LocalDateTime.now(), new Address()),
new User("Bob", 6, LocalDateTime.now(), new Address("NBC", "204")),
new User("Alice", 10, LocalDateTime.now(), new Address("ABC", "1033")))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(60)));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table =
tableEnv.fromDataStream(
dataStream, Schema.newBuilder().build());
table.printSchema();
Table t = table.select($("*"));
DataStream<User> dsRow = tableEnv.toDataStream(t,User.class);
dsRow.print();
env.execute();
}
}
Error I got is :
Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'event_time' at position 2.
Query schema: [name: STRING, score: INT, event_time: RAW('java.time.LocalDateTime', '...'), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
Sink schema: [name: STRING, score: INT, event_time: TIMESTAMP(9), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:437)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:256)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:198)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:143)
I tried with custom conversion from DataStream to table as well but while converting from table to DataStream still run into error. I am stuck so any help is appreciated.
The automatic, reflection-based type extraction in DataStream is not as powerful as the one of the Table API. This is also due to state backwards compatibility issues in the DataStream API.
The
event_time
field is aGenericType
in DataStream API which results inRAW
in Table API. You have the following possibilities:TypeInformation
infromElements
TypeInformation
usingDataType
infromDataStream