By default uncaught Exceptions result in shutting down the KStream client. This is according to the default SHUTDOWN_CLIENT of the StreamThreadExceptionResponse. The problem is that message processing is stopped and messages pile up in the topic.
A suggested way to deal with that is to use StreamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler and supply a handler that returns REPLACE_THREAD.
@Configuration
@Slf4j
public class KafkaStreamsConfig {
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return factoryBean -> factoryBean.setStreamsUncaughtExceptionHandler(exception -> {
log.error(exception.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
}
}
But this had no effect. It was still SHUTDOWN_CLIENT and the processing was stopped.
I found out that this cannot work with the current version 4.0.4 of spring-clound-stream-binder-kafka-streams because StreamsBuilderFactoryManager overwrites this setting with a handler that returns SHUTDOWN_CLIENT in Line 63:
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler((exception) -> {
return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});
So what is the use of StreamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler and how can the client shutdown be prevented in case of an uncaught exception ?
I found a solution to effectively set the exception handler of the kafkaStreams.
The factory bean offers a customizer that can do this:
This works because StreamsBuilderFactoryBean calls "customize" after it passes its own exception handler to its kafkaStreams.