KSQL-Kafka Streams

28 Views Asked by At

I have Kafka streams as follows:

topic1: event example in Avro:

{
  "ID": {"string":"S1"},
  "text": {"string":"department"}
}
{
  "ID": {"string":"S2"},
  "text":{"string":"cellphone"}
}

{
  "ID": {"string":"S3"},
  "text":{"string":"fax"}
}
{
  "ID": {"string":"S4"},
  "text":{"string":"telephone"}
}

stream1 which is a reference has details on any abbrevations built on topic1:
stream1 definitation:
full_name string

example values:
___+______
|ID|text|
___+______
|S1|department
|S2|cellphone
|S3|fax
|S4|telephone
__________________________________________

topic2:
event example in avro:

{
  "ID": {"string":"S1"},
  "value":{"string":"department"}
}
{
  "ID": {"string":"S2"},
  "value":{"string":"cellphone"}
}

{
  "ID": {"string":"S3"},
  "value":{"string":"maximum"}
}
{
  "ID": {"string":"S4"},
  "value":{"string":"telephone"}
}

stream2 which is a reference has details on any abbrevations built on topic1:
stream2 definitation:
full_name string

example values:
___+______
|ID|value|
___+______
|S1|dt
|S2|cp
|S3|fax
|S4|th
_________________________________________
topic3: 
event example in avro:
{
  "ID_k": {"string":"K1"},
  "received_abbrevation":{"string":"dt"}
}
{
  "ID_k": {"string":"K2"},
  "received_abbrevation":{"string":"cp"}
}
         
stream3 which receives event from topic2

stream3 definitation:

ID string
received_abbrevation string

example values:
_____+_____________________
|ID_k|received_abbrevation|
_____+_____________________
|K1  |dt                  |
|K2  |cp                  |

what I want as a result is, whenever, there is new event occuring in stream3 it should query stream1 and stream2 from row1 till ..n always
and display the "text" value from stream1 by comparing ID of stream1 and stream2 and get the stream2.value as (select received_abbrevation from stream3) diplay the result as  stream4.

note: If there a new event in topic1/stream1, there is no need to query stream2.

the resultant stream should look like:
-------------
|actual_name|
-------------
|department |
|cellphone |

 

For example, as in SQL query we can write as below: select stream1.text as actual_name from stream1, stream2 where stream1.ID=stream2.ID and stream2.value(select received_abbrevation from stream3).

Kindly let me know how I can achieve this in KSQL.

0

There are 0 best solutions below