I know that Kafka Streams uses Murmur3 to hash the value of a message in order to tackle race-conditions. But this implementation has a major drawback in a way that a join can fail if we introduce schema changes. Kafka streams does an unnecessary re-serialization to calculate the hash before comparing it. This means that if we change the structure of our message, the hash also changes due this extra serialization step. This mismatch in hash results in a failed join.
This is the case when we add a primitive type or remove/rename fields in Java. In case of json, re-ordering/sorting properties will also result in a different hash.
For example, lets say we have a Person and a City object we want to join:
Person:
public class Person {
private String id;
private String name;
private String cityId; // foreign-key
}
City:
public class City {
private String id;
private String name;
}
We can now join the two tables like below:
persons.join(cities, person -> person.getCityId(), (person, city) -> // join function...
Next, publish a message with id p1 to the persons topic and a message with id c1 to cities topic and the join will work. Now, stop the streams app and add a new primitive attribute to the Person class:
public class Person {
private String id;
private String name;
private String cityId; // foreign-key
private boolean deleted; // new primitve attribute
}
Run the streams app again and publish a message with id c1 to the cities topic, you will see that the join will not work. However, if we publish a message with id p1 to the persons topic, the join will work.
Is there a way to fix this issue. We use a json serde for serialization/deserialization.
What you describe sounds like a known issue: https://issues.apache.org/jira/browse/KAFKA-15303
I am not aware of a workaround unfortunately.