I have a topic, let's say "topic_soure". Messages are in json format.
The top level fields are the same for all messages, but the 'data' field may have different models.
I do not knew exactly what 'data' field model might be. But every 'data' field model has a 'priority' field at the top level.
Examples:
{
"id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"time": "2023-12-14T10:45:36.913Z",
"data": {
"priority": 1,
"sub_data":{
"id":"some id",
"value": "some value"
}
}
}
{
"id": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"time": "2023-12-14T10:45:36.913Z",
"data": {
"priority": 2,
"related_data":{
"id":"some id2",
"value": "some value2"
}
}
}
As I understand it, AVRO and SchemaRegistry are not available for use in my environment.
I need to split 'topic_soure' into two topics by the field 'priority'.
To solve the issue, I create stream with backing topic 'topic_soure';
create stream soure_stream ( \
id varchar, \
time varchar, \
data varchar \
) with (key_format='kafka', kafka_topic = 'topic_soure', value_format='json');
then, I create two streams with sink topics for each priority
create stream soure_stream_priority_1 \
with (kafka_topic = 'topic_soure_priority_1', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = 1;
create stream soure_stream_priority_2 \
with (kafka_topic = 'topic_soure_priority_2', format='JSON')\
as select * from soure_stream \
where extractjsonfield(data, '$.priority') = '2';
It works well.
But, in result topics message model change.
{
"ID": "39f8582e-91e1-41b0-8590-f83f0a4b756a",
"TIME": "2023-12-14T10:45:36.913Z",
"DATA": "{\"priority\":1,\"sub_data\":{\"id\":\"some id\",\"value\":\"some value\"}}"
}
The 'data' field becomes an escaped json string. I need the message to go through unchanged, as is.
Is there any way to convert the 'data' field format? or.. Can I solve this problem in another way using ksqldb/streams?
If you are looking for resolving that using the kafka-streams java API, the answer can be like that :
Steps :
Like that you will send the same data(records) received.
Note : the part of checking is the periority is equal to 1 or 2 is your logic part, you can check how to do that in the best way, here I used the JSONObject to get the data as json .. but you can use anything else.