I'm writing a basic Kafka streams app in Java which reads wikipedia events provided by a producer and attempts to count the amount of created and recently changed events according to user type (bot or human).
I created a custom serdes for the wikipedia events and am able to successfully print both the created and modified events to the screen from my KStreams.
My next step was to create a KTable in which I will count the created events per user type. It seems that after the KTable is created the rest of the code does not execute. I don't get an error message and my app seems to be running, but nothing is printed and maybe not even processed.
My code is as following:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, WikiEvent> allEvents =
builder.stream(topicList, Consumed.with(Serdes.String(), WikiEventSerdes.WikiEvent()));
KStream<String, WikiEvent> createEvents = allEvents.filter((key, value) -> value.getStream().equals("create"));
KStream<String, WikiEvent> changeEvents = allEvents.filter((key, value) -> value.getStream().equals("change"));
createEvents.foreach((k,v)->System.out.println("p2 Key= " + k + " Value=" + v.getStream()));
KTable<String, Long> createdPagesUserTypeTable = createEvents.groupBy((key, value) -> value.getUserType()).count();
KStream<String, Long> tableStream = createdPagesUserTypeTable.toStream();
tableStream.foreach((k,v)->System.out.println("Key= " + k + " Value=" + v));
The reason I suspect that nothing executes past the KTable is because the print of the createEvents stream never happens when the KTable definition is present. Once I remove all lines from the KTable down, I get the prints.
What's gone wrong here? Also, is there a log of some sort where I can see the execution of my code?
An update: After looking at the server logs I see this when defining the KTable:
[2022-05-27 19:58:38,983] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki in Empty state. Created a new member id streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:38,995] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki in state PreparingRebalance with old generation 2 (__consumer_offsets-22) (reason: Adding new member streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:38,999] INFO [GroupCoordinator 0]: Stabilized group streams-wiki generation 3 (__consumer_offsets-22) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,274] INFO [GroupCoordinator 0]: Assignment received from leader streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 for group streams-wiki for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,934] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki in state PreparingRebalance with old generation 3 (__consumer_offsets-22) (reason: Removing member streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 on LeaveGroup; client reason: the consumer unsubscribed from all topics) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,934] INFO [GroupCoordinator 0]: Group streams-wiki with generation 4 is now empty (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,938] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231, groupInstanceId=None, clientId=streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(stream)) has left group streams-wiki through explicit `LeaveGroup`; client reason: the consumer unsubscribed from all topics (kafka.coordinator.group.GroupCoordinator)
so it appears that my KTable has somehow caused an unsubscribe from all topics. Any idea why this is happening?
In the end it turned out that my java consumer was failing due to a missing StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG definition.
The way to understand this was by adding the following lines of code after
KafkaStreams streams = new KafkaStreams(topology, props);
:This will output debug to the command window and show additional logs for the consumer.