Spring batch integration remote partitioning - running parallel jobs

496 Views Asked by At

We have a usecase where we need to read data from some paginated API and write to some downstream Kafka topic.

We have been able to implement the solution via spring batch integration remote partitioning where the manager takes care of partitioning the task by creating executionContext which containes the page number and offset to read data for. The manager creates this executionContext and puts them on the messagingChannel(I can use rabbitMQ and Kafka topic whichever provides a solution). The workers(more than 1) pick that executionContext from the messagingChannel and complete the task of reading the data from the API and writing it to the required Kafka topic.

The above implementation works just fine. This also works fine if I run the same job for different clients one after another. The challenge comes when we want to run the same job for multiple clients in parallel. For example, we launch the jobs for 2 clients in parallel. It creates 1 manager and 2 workers for each client. Now the issue comes when both of the managers pushed the executionContext on the same messagingChannel and workers don't know which one to pick and execute. Also, both the jobs share the same database spring batch tables, so I suspect it would create problems at that level as well.

Any input or references on how to implement running multiple spring batch reporter partitioning jobs in parallel.

Update[18 Jan 2022]

I tried adding @StepScoped to MessageChannelPartitionHandler at here and below is the error I am getting:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'partitioningMessageHandler': FactoryBean threw exception on object creation; nested exception is java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:178) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.getObjectFromFactoryBean(FactoryBeanRegistrySupport.java:101) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getObjectForBeanInstance(AbstractBeanFactory.java:1821) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getObjectForBeanInstance(AbstractAutowireCapableBeanFactory.java:1266) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.resolveTargetBeanFromMethodWithBeanAnnotation(AbstractMethodAnnotationPostProcessor.java:536) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:154) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor.java:230) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor.java:220) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor.java:141) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:912) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.6.RELEASE.jar:5.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at spring.batch.integration.Manager.main(Manager.java:11) ~[main/:na]
Caused by: java.lang.IllegalStateException: Target object of type [class com.sun.proxy.$Proxy78] has no eligible methods for handling Messages.
    at org.springframework.util.Assert.state(Assert.java:94) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.validateFallbackMethods(MessagingMethodInvokerHelper.java:751) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:740) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:294) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.<init>(MethodInvokingMessageListProcessor.java:63) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.<init>(MethodInvokingMessageGroupProcessor.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:211) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AggregatorFactoryBean.createHandler(AggregatorFactoryBean.java:53) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.createHandlerInternal(AbstractSimpleMessageHandlerFactoryBean.java:198) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:186) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean.getObject(AbstractSimpleMessageHandlerFactoryBean.java:60) ~[spring-integration-core-5.3.1.RELEASE.jar:5.3.1.RELEASE]
    at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:171) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 20 common frames omitted
1

There are 1 best solutions below

5
On

In such a setup, the MessageChannelPartitionHandler should be step-scoped. There is a note about that in the Javadoc:

Note: The reply channel for this is instance based.
Sharing this component across multiple step instances may result in the
crossing of messages. It's recommended that this component be step or job scoped.

Making this bean step-scoped should fix the issue.