If I have one topic schema (that is Kstream):

{
    "type": "record",
    "name": "Value",
    "namespace": "test1",
    "fields": [
          {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
          },
          {
            "name": "createdAt",
            "type": [
                "null",
                {
                    "type": "string",
                    "connect.version": 1,
                    "connect.name": "io.debezium.time.ZonedTimestamp"
                }
            ],
            "default": null
          }
    ],
    "connect.name": "test1.Value"
}

Schema for other topic

{
  "type": "record",
  "name": "Envelope",
  "namespace": "test2",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "test2.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "test2.Envelope"
}

I want to implement join between these two topics KStream and Ktable.

How to implement by using test1 topic id and test2 topic id(which is inside the after obj), how can I extract the id from an object (after obj by using envelope schema) for implenting join.

1

There are 1 best solutions below

1
ChristDist On

Left Join (KStream, KTable) → KStream

  • It performs a LEFT JOIN of a stream with the table, effectively doing a table lookup.
  • The input data for both sides must be co-partitioned.
  • It causes data re-partitioning of the stream if and only if the stream was marked for re-partitioning

KStream<String, Long> left = ...;

KTable<String, Double> right = ...;

KStream<String, String> joined = left.leftJoin(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */

Joined.keySerde(Serdes.String()) /* key / .withValueSerde(Serdes.Long()) / left value */ );

Detailed behaviour

  • The join is key-based, i.e. with the join predicate leftRecord.key == rightRecord.key.
  • The join will be triggered under the conditions listed below whenever new input is received. When it is triggered, the user-supplied ValueJoiner will be called to produce join output records.
  • Only input records for the left side (stream) trigger the join. Input records for the right side (table) update only the internal right-side join state.
  • Input records for the stream with a null key or a null value are ignored and do not trigger the join.
  • Input records for the table with a null value are interpreted as tombstones for the corresponding key, which indicate the deletion of the key from the table. Tombstones do not trigger the join.
  • For each input record on the left side that does not have any match on the right side, the ValueJoiner will be called with ValueJoiner#apply(leftRecord.value, null).

Very low level detailed implementation here https://developer.confluent.io/learn-kafka/kafka-streams/joins/

Also refer the session 2.7 in https://mydeveloperplanet.com/2019/10/30/kafka-streams-joins-explored/