Joining Kafka Streams containing Java Hash Map Objects

1.4k Views Asked by At

Currently I'm working on building a data pipeline. I'm reading from a sql database 2 tables and I have to store them in the denormalized format in a OLAP data warehouse after joining them in stream using Kafka streams.

Instead of having an individual topic for each table I'm having two tables inserting data to a single topic.

I'm converting the row to hashmap and then using bytes serializer convert this information to bytes array and push to topics, so all the information in a row is stored in a single object. The code for which is :

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
     out = new ObjectOutputStream(bos);
     out.writeObject(record);
     // here record is the row hashmap
     out.flush();
     yourBytes = bos.toByteArray();
}
catch (IOException ex) {
    // ignore close exception
}

In the stream processing application I deserialize the bytes array back to hashmap and filter records into two separate streams each for one table.

So my records at the processing phase after deserializing the bytes array back to hashmap object, records looks like the following where one record for each stream pertaining to each table is shown below :

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})

Now I have to join the data in two streams where each value is a hash map, and join on the key PRODUCTID which is the common field for both the tables and finally generate a single hashmap for each row and push that stream to a topic.

So the joined records there will look like the following :

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})

is it possible to do this using the Kafka streams and if yes, then how?

1

There are 1 best solutions below

1
On BEST ANSWER

If you want to join in Kafka Streams, you need to extract the join attribute and set it as the key for the message:

KStream streamOfTable1 = ...
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1");

KStream streamOfTable2 = ...
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2");

KTable table1 = builder.table("newTopic1");
KTable table2 = builder.table("newTopic2");

table1.join(table2, ...).to("resultTopic");

For more details see the docs: http://docs.confluent.io/current/streams/developer-guide.html#joining

I did assume that a KTable-KTable join is what you need. Note, that you need to create "newTopic1" and "newTopic2" manually and that both need to have the same number of partitions. (cf. http://docs.confluent.io/current/streams/developer-guide.html#user-topics)

Also check out the other available join types, too, in case KTable-KTable joins is not what you want.