By default uncaught Exceptions result in shutting down the KStream client. This is according to the default SHUTDOWN_CLIENT of the StreamThreadExceptionResponse. The problem is that message processing is stopped and messages pile up in the topic.

A suggested way to deal with that is to use StreamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler and supply a handler that returns REPLACE_THREAD.

@Configuration
@Slf4j
public class KafkaStreamsConfig {
    @Bean
    public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return factoryBean -> factoryBean.setStreamsUncaughtExceptionHandler(exception -> {
            log.error(exception.getMessage());
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
    }
}

But this had no effect. It was still SHUTDOWN_CLIENT and the processing was stopped.

I found out that this cannot work with the current version 4.0.4 of spring-clound-stream-binder-kafka-streams because StreamsBuilderFactoryManager overwrites this setting with a handler that returns SHUTDOWN_CLIENT in Line 63:

streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler((exception) -> {
  return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
});

So what is the use of StreamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler and how can the client shutdown be prevented in case of an uncaught exception ?

1

There are 1 best solutions below

0
On

I found a solution to effectively set the exception handler of the kafkaStreams.

The factory bean offers a customizer that can do this:

@Configuration
@Slf4j
public class KafkaStreamsConfig {
    @Bean
    public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
        return factoryBean -> factoryBean.setKafkaStreamsCustomizer(ks -> ks.setUncaughtExceptionHandler(exception -> {
            log.error(exception.getMessage());
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        }));
    }
}

This works because StreamsBuilderFactoryBean calls "customize" after it passes its own exception handler to its kafkaStreams.