I am trying to implement remote chunking in spring batch. I have used ActiveMQ Artemis as the broker for the JMS services. I have been able to configure it to run and process jobs successfully.
Next up I was trying to see how it will handle the failed jobs, and found out that the manager is not able to connect to the queues in the ActiveMQ Artemis server.
Connection Factory
@Configuration
public class ConnectionFactory {
@Value("${broker.url}")
private String brokerUrl;
@Bean
public ActiveMQConnectionFactory ActiveMQconnectionFactory() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(this.brokerUrl);
return connectionFactory;
}
}
Manager class
@Configuration
@EnableBatchIntegration
@EnableIntegration
@Profile("manager")
@Slf4j
public class ManagerConfiguration {
@Autowired
private DataSource dataSource;
@Autowired
private JobRepository jobRepository;
@Autowired
private JobListener jobListener;
@Autowired
private StepListener stepListener;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private ManagerItems managerItems;
@Autowired
private ManagerChannel channel;
final SimpleJobOperator jobOp = new SimpleJobOperator();
private List<Long> failedJobs;
@Bean
public TaskletStep managerStep() {
return new RemoteChunkingManagerStepBuilder<CustomersInfoDTO,CustomersInfoDTO>("managerStep",jobRepository)
.chunk(50)
.outputChannel(channel.requests())
.inputChannel(channel.replies())
.listener(stepListener)
.transactionManager(transactionManager)
.reader(managerItems.infoReader()) // consider some generic reader
.allowStartIfComplete(Boolean.TRUE)
.build();
}
@Bean
public Job remoteChunkingJob() {
log.info("Outside -> JOB");
return new JobBuilder("remoteChunkingJob", jobRepository)
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.start(managerStep())
.build();
}
@Bean
public JobLauncher Launcher() throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobOperator jobOp(final JobRegistry jobRegistry) throws Exception {
jobOp.setJobLauncher(Launcher());
jobOp.setJobRepository(jobRepository);
jobOp.setJobRegistry(jobRegistry);
jobOp.setJobExplorer(jobExp());
return jobOp;
}
@Bean
public JobExplorer jobExp() throws Exception {
final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
bean.setDataSource(dataSource);
bean.setTransactionManager(transactionManager);
bean.setTablePrefix("batch_");
bean.setJdbcOperations(new JdbcTemplate(dataSource));
bean.afterPropertiesSet();
return bean.getObject();
}
@Bean
@BeforeStep
public Long getFailedInstance() throws JobInstanceAlreadyCompleteException, NoSuchJobException, NoSuchJobExecutionException, JobParametersInvalidException, JobRestartException {
String sql = "SELECT job_execution_id from batch_job_execution where status='FAILED'";
failedJobs =new JdbcTemplate(dataSource).query(sql, (rs, rowNum) -> {
return rs.getLong("job_execution_id");
});
if(!failedJobs.isEmpty()) {
jobOp.restart((Long) failedJobs.get(0));
}
return 1L;
}
}
ManagerChannel class
@Configuration
@Slf4j
@Profile("manager")
public class ManagerChannel {
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow managerOutboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow.from(requests())
.handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
.get();
}
@Bean
public IntegrationFlow managerInboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
}
}
Worker class
@Configuration
@Slf4j
@EnableBatchIntegration
@EnableIntegration
@Profile("worker")
public class WorkerConfiguration {
@Autowired
private RemoteChunkingWorkerBuilder<CustomersInfoDTO, CustomersInfoDTO> remoteChunkingWorkerBuilder;
@Autowired
private WorkerChannel channel;
@Autowired
private WorkerItems workerItems;
@Bean
public IntegrationFlow workerIntegrationFlow(){
return new RemoteChunkingWorkerBuilder<CustomersInfoDTO,CustomersInfoDTO>()
.inputChannel(channel.requests())
.outputChannel(channel.replies())
.itemProcessor(workerItems.itemProcessor()) //consider generic processor
.itemWriter(workerItems.infoWriter()) //consider generic writer
.build();
}
}
WorkerChannel class
@Configuration
@Profile("worker")
public class WorkerChannel {
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow inboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
.channel(requests())
.get();
}
@Bean
public IntegrationFlow outboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlow.from(replies())
.handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
.get();
}
}
application.properties
spring.profiles.active=manager
broker.url=tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=root
spring.activemq.password=root
spring.datasource.url=jdbc:postgresql://localhost:5432/remote-chunking-artemis?createDatabaseIfNotExist=true
spring.datasource.username=postgres
spring.datasource.password=12345
spring.batch.jdbc.initialize-schema=always
spring.jpa.hibernate.ddl-auto=create
spring.datasource.driver-class-name=org.postgresql.Driver
spring.batch.jdbc.schema=classpath:/org/springframework/batch/core/schema-postgresql.sql
file.input = data/customers-100.csv
I run worker and manager on different terminals, using the cmd mvn spring-boot:run -D spring-boot.run.profiles="{profile}" where profile is worker or manager.
For continuous jobs it runs fine and produces the required results, and manager logs the job completed status, but for failed jobs, when I try to run the batch application the worker is able to connect to the server, but since the manager is not able to connect to the server the application comes to a standstill.
EDIT :
When I run the jobs in sequence, it basically logs that the job with a certain id was successfully completed or has failed. But when I try to run the application with those failed jobs, it picks up the first failed job (configured like this for testing purpose), and then logs the following things which is also logged when running normal jobs :
2024-03-08T00:13:47.622+05:30 INFO 19656 --- [ main] o.s.b.c.l.support.SimpleJobOperator : Checking status of job execution with id=689
2024-03-08T00:13:47.678+05:30 INFO 19656 --- [ main] o.s.b.c.l.support.SimpleJobOperator : Attempting to resume job with name=remoteChunkingJob and parameters={'run.id':'{value=5, type=class java.lang.Long, identifying=true}'}
2024-03-08T00:13:47.814+05:30 INFO 19656 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=remoteChunkingJob]] launched with the following parameters: [{'run.id':'{value=5, type=class java.lang.Long, identifying=true}'}]
2024-03-08T00:13:47.847+05:30 INFO 19656 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [managerStep]
2024-03-08T00:13:47.875+05:30 INFO 19656 --- [ main] o.s.b.i.c.ChunkMessageChannelItemWriter : Waiting for 2 results
after logging these things, the manager application is basically stuck. When I check the ActiveMQ console, I see that the manager is not subscribed to the replies queue, and there is only 1 connection that is from the worker.