I am trying to implement an error handling. To test it out I throw an exception from mu Function method like this:
@Configuration
@EnableIntegration
class MessageProcessorConfig {
@Bean
Function<PersRequest, ResolvedEvent> aud() {
return request -> {
throw new IllegalStateException();
};
}
}
Based on the documentation I also created an ServiceActivator object:
@Slf4j
@Component
public class KinesisErrorHandling {
@Bean(name = "aud-in-0.errors")
public MessageChannel customErrorChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "aud-in-0.errors")
public void receiveProduceError(ErrorMessage receiveMsg) {
System.out.println("receive error msg: " + receiveMsg);
}
my application.yml looks like this:
spring:
cloud:
stream:
bindings:
aud-in-0:
error-handler-definition:
destination: co-p
aud-out-0:
destination: co-d
kinesis:
binder:
auto-create-stream: false
enable-observation: false
locks:
table: co-l
checkpoint:
table: co-c
When running an application I see that the error message goes always to the stream named like this: kinesis-65022344-aud-in-0.errors but my subscriber is attached to aud-in-0.errors.
I did some debugging and I see that kinesis-65022344 is binderIndentity and the number is just a hashCode.
Is it working as expected? If yes then what is the recommended way to write error handling?