KTable causes unsubscribe from topics

270 Views Asked by At

I'm writing a basic Kafka streams app in Java which reads wikipedia events provided by a producer and attempts to count the amount of created and recently changed events according to user type (bot or human).

I created a custom serdes for the wikipedia events and am able to successfully print both the created and modified events to the screen from my KStreams.

My next step was to create a KTable in which I will count the created events per user type. It seems that after the KTable is created the rest of the code does not execute. I don't get an error message and my app seems to be running, but nothing is printed and maybe not even processed.

My code is as following:

StreamsBuilder builder = new StreamsBuilder();

KStream<String, WikiEvent> allEvents = 
builder.stream(topicList, Consumed.with(Serdes.String(), WikiEventSerdes.WikiEvent()));

KStream<String, WikiEvent> createEvents = allEvents.filter((key, value) -> value.getStream().equals("create"));

KStream<String, WikiEvent> changeEvents = allEvents.filter((key, value) -> value.getStream().equals("change"));

createEvents.foreach((k,v)->System.out.println("p2 Key= " + k + " Value=" + v.getStream()));
             
KTable<String, Long> createdPagesUserTypeTable = createEvents.groupBy((key, value) -> value.getUserType()).count();

KStream<String, Long> tableStream = createdPagesUserTypeTable.toStream();
tableStream.foreach((k,v)->System.out.println("Key= " + k + " Value=" + v));

The reason I suspect that nothing executes past the KTable is because the print of the createEvents stream never happens when the KTable definition is present. Once I remove all lines from the KTable down, I get the prints.

What's gone wrong here? Also, is there a log of some sort where I can see the execution of my code?

An update: After looking at the server logs I see this when defining the KTable:

[2022-05-27 19:58:38,983] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group streams-wiki in Empty state. Created a new member id streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:38,995] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki in state PreparingRebalance with old generation 2 (__consumer_offsets-22) (reason: Adding new member streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:38,999] INFO [GroupCoordinator 0]: Stabilized group streams-wiki generation 3 (__consumer_offsets-22) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,274] INFO [GroupCoordinator 0]: Assignment received from leader streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 for group streams-wiki for generation 3. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,934] INFO [GroupCoordinator 0]: Preparing to rebalance group streams-wiki in state PreparingRebalance with old generation 3 (__consumer_offsets-22) (reason: Removing member streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231 on LeaveGroup; client reason: the consumer unsubscribed from all topics) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,934] INFO [GroupCoordinator 0]: Group streams-wiki with generation 4 is now empty (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
[2022-05-27 19:58:39,938] INFO [GroupCoordinator 0]: Member MemberMetadata(memberId=streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer-aa4e311e-2712-4054-ac59-9b56f13d2231, groupInstanceId=None, clientId=streams-wiki-8ea96db7-0052-421a-b7c0-a56cedf9f43e-StreamThread-1-consumer, clientHost=/127.0.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=300000, supportedProtocols=List(stream)) has left group streams-wiki through explicit `LeaveGroup`; client reason: the consumer unsubscribed from all topics (kafka.coordinator.group.GroupCoordinator)

so it appears that my KTable has somehow caused an unsubscribe from all topics. Any idea why this is happening?

1

There are 1 best solutions below

0
On

In the end it turned out that my java consumer was failing due to a missing StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG definition.

The way to understand this was by adding the following lines of code after KafkaStreams streams = new KafkaStreams(topology, props); :

streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
            System.out.println(e);
        });

This will output debug to the command window and show additional logs for the consumer.