spring-cloud-stream-binder-kinesis - kinesis-hashcode added as a prefix to the error stream [Java]

36 Views Asked by At

I am trying to implement an error handling. To test it out I throw an exception from mu Function method like this:

 @Configuration
@EnableIntegration
class MessageProcessorConfig {
   @Bean
    Function<PersRequest, ResolvedEvent> aud() {
        return request -> {
            throw new IllegalStateException();
        };
    }
}

Based on the documentation I also created an ServiceActivator object:

@Slf4j
@Component
public class KinesisErrorHandling {

    @Bean(name = "aud-in-0.errors")
    public MessageChannel customErrorChannel() {
        return new DirectChannel();
    }

    @ServiceActivator(inputChannel = "aud-in-0.errors")
    public void receiveProduceError(ErrorMessage receiveMsg) {
        System.out.println("receive error msg: " + receiveMsg);
    }

my application.yml looks like this:

spring:
  cloud:
    stream:
      bindings:
        aud-in-0:
          error-handler-definition:
          destination: co-p
        aud-out-0:
          destination: co-d
      kinesis:
        binder:
          auto-create-stream: false
          enable-observation: false
          locks:
            table: co-l
          checkpoint:
            table: co-c

When running an application I see that the error message goes always to the stream named like this: kinesis-65022344-aud-in-0.errors but my subscriber is attached to aud-in-0.errors.

I did some debugging and I see that kinesis-65022344 is binderIndentity and the number is just a hashCode.

Is it working as expected? If yes then what is the recommended way to write error handling?

0

There are 0 best solutions below