Pause/Start Kafka Stream processors in Spring boot

595 Views Asked by At

I'm going to implement a Circuit breaker pattern for messages. Basic requirement is that if microservice cannot publish messages to publishing topic it should stop accepting messages from other Kafka topics. When the publishing topics become available microservice should start accepting messages from other Kafka topics.

Is there a way I can achieve this in Spring boot Kafka Streams?

1

There are 1 best solutions below

1
On

I was able to achieve this by using BindingsEndpoint.

private final BindingsEndpoint binding;

@Override
public void stop() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Stopping Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::stop);
        log.info("Stopped Kafka topics ");
    }
}

@Override
public void start() {
    List<?> objects = binding.queryStates();
    if (!objects.isEmpty()) {
        log.info("Starting Kafka topics ");
        List<Binding> bindings = getBindings(objects);
        bindings.forEach(Binding::start);
        log.info("Started Kafka topics ");
    }
}

protected List<Binding> getBindings(List<?> objects) {
    return objects.stream().filter(object -> object instanceof Binding)
            .map(object -> (Binding) object).collect(Collectors.toList());
}