I want to concat two columns and use it as a timestamp field, and I have to write it in a CREATE STREAM statement because I want to use that timestamp for calculating WINDOW TUMBLING.
The SQL code:
create STREAM testStream (Price INT,concat(cast(RlcDate as varchar),LPAD(CAST(RlcTime AS VARCHAR),8,'0')) as timestamp, Id VARCHAR) WITH (kafka_topic='testTopic', partitions=1,value_format='JSON', timestamp = 'timestamp', timestamp_format = 'yyyyMMddHHmmssSS');
when I run this code, it returns the following error:
Caused by: line 1:42: mismatched input '(' expecting {'STRING', 'EMIT',
'CHANGES', 'FINAL', 'ESCAPE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP',
'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE',
'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'SHOW', 'TABLES',
'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP',
'SET', 'RESET', 'SESSION', 'DECIMAL', 'KEY', 'SINK', 'SOURCE', 'PRIMARY',
'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}
Caused by: org.antlr.v4.runtime.InputMismatchException
I want to know, is it correct to write CONCAT and CAST when creating a stream, or we just can use these kinds of queries in the select statement?
If it's not possible, is there any solution for solving this issue?
CREATE STREAMstatements define how a stream looks in terms of fields, data types, and constraints. It is not the right place to define calculation logic, i.e. how the fields should be filled. The right place to do this is in aSELECTstatement, as you correctly assumed.Try to first define your stream based on what is in your Kafka topic:
This stream is an alternative, queriable representation of your existing Kafka topic. Now you can select data according to your criteria and continuously expose the results of that query into a new stream (which in turn needs a backing Kafka topic):
If the provided logic is correct based on incoming data,
testStreamResultshould allow you to use it in aJOINwith the timestamp semantics you are looking for.