Unable to process failed jobs in spring batch when using remote chunking

50 Views Asked by At

I am trying to implement remote chunking in spring batch. I have used ActiveMQ Artemis as the broker for the JMS services. I have been able to configure it to run and process jobs successfully.

Next up I was trying to see how it will handle the failed jobs, and found out that the manager is not able to connect to the queues in the ActiveMQ Artemis server.

Connection Factory

@Configuration
public class ConnectionFactory {

    @Value("${broker.url}")
    private String brokerUrl;

    @Bean
    public ActiveMQConnectionFactory ActiveMQconnectionFactory() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(this.brokerUrl);
        return connectionFactory;
    }
}

Manager class

@Configuration
@EnableBatchIntegration
@EnableIntegration
@Profile("manager")
@Slf4j
public class ManagerConfiguration {
    @Autowired
    private DataSource dataSource;
    @Autowired
    private JobRepository jobRepository;
    @Autowired
    private JobListener jobListener;
    @Autowired
    private StepListener stepListener;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private PlatformTransactionManager transactionManager;
    @Autowired
    private ManagerItems managerItems;
    @Autowired
    private ManagerChannel channel;
    final SimpleJobOperator jobOp = new SimpleJobOperator();
    private List<Long> failedJobs; 

    @Bean
    public TaskletStep managerStep() {
        return new RemoteChunkingManagerStepBuilder<CustomersInfoDTO,CustomersInfoDTO>("managerStep",jobRepository)
                .chunk(50)
                .outputChannel(channel.requests()) 
                .inputChannel(channel.replies())
                .listener(stepListener)
                .transactionManager(transactionManager)
                .reader(managerItems.infoReader()) // consider some generic reader
                .allowStartIfComplete(Boolean.TRUE)
                .build();
    }

    @Bean
    public Job remoteChunkingJob() {
        log.info("Outside -> JOB");
        return new JobBuilder("remoteChunkingJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .listener(jobListener)
                .start(managerStep())
                .build();
    }

    @Bean
    public JobLauncher Launcher() throws Exception {
        TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Bean
    public JobOperator jobOp(final JobRegistry jobRegistry) throws Exception {
        jobOp.setJobLauncher(Launcher());
        jobOp.setJobRepository(jobRepository);
        jobOp.setJobRegistry(jobRegistry);
        jobOp.setJobExplorer(jobExp());
        return jobOp;
    }

    @Bean
    public JobExplorer jobExp() throws Exception {
        final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
        bean.setDataSource(dataSource);
        bean.setTransactionManager(transactionManager);
        bean.setTablePrefix("batch_");
        bean.setJdbcOperations(new JdbcTemplate(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }

    @Bean
    @BeforeStep
    public Long getFailedInstance() throws JobInstanceAlreadyCompleteException, NoSuchJobException, NoSuchJobExecutionException, JobParametersInvalidException, JobRestartException {
        String sql = "SELECT job_execution_id from batch_job_execution where status='FAILED'";
        failedJobs =new JdbcTemplate(dataSource).query(sql, (rs, rowNum) -> {
            return rs.getLong("job_execution_id");
        });
        if(!failedJobs.isEmpty()) {
            jobOp.restart((Long) failedJobs.get(0));
        }
        return 1L;
    }
}

ManagerChannel class

@Configuration
@Slf4j
@Profile("manager")
public class ManagerChannel {

    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }
    @Bean
    public QueueChannel replies() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow managerOutboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlow.from(requests())
                .handle(Jms.outboundAdapter(connectionFactory).destination("requests"))
                .get();
    }

    @Bean
    public IntegrationFlow managerInboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
                .channel(replies())
                .get();
    }

}

Worker class

@Configuration
@Slf4j
@EnableBatchIntegration
@EnableIntegration
@Profile("worker")
public class WorkerConfiguration {

    @Autowired
    private RemoteChunkingWorkerBuilder<CustomersInfoDTO, CustomersInfoDTO> remoteChunkingWorkerBuilder;
    @Autowired
    private WorkerChannel channel;
    @Autowired
    private WorkerItems workerItems;


    @Bean
    public IntegrationFlow workerIntegrationFlow(){
        return new RemoteChunkingWorkerBuilder<CustomersInfoDTO,CustomersInfoDTO>()
                .inputChannel(channel.requests())
                .outputChannel(channel.replies())
                .itemProcessor(workerItems.itemProcessor()) //consider generic processor
                .itemWriter(workerItems.infoWriter()) //consider generic writer
                .build();
    }
}

WorkerChannel class

@Configuration
@Profile("worker")
public class WorkerChannel {

    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlow.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests"))
                .channel(requests())
                .get();
    }

    @Bean
    public IntegrationFlow outboundFlow(@Autowired ActiveMQConnectionFactory connectionFactory) {
        return IntegrationFlow.from(replies())
                .handle(Jms.outboundAdapter(connectionFactory).destination("replies"))
                .get();
    }

}

application.properties

spring.profiles.active=manager
broker.url=tcp://localhost:61616
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=root
spring.activemq.password=root

spring.datasource.url=jdbc:postgresql://localhost:5432/remote-chunking-artemis?createDatabaseIfNotExist=true
spring.datasource.username=postgres
spring.datasource.password=12345
spring.batch.jdbc.initialize-schema=always
spring.jpa.hibernate.ddl-auto=create
spring.datasource.driver-class-name=org.postgresql.Driver
spring.batch.jdbc.schema=classpath:/org/springframework/batch/core/schema-postgresql.sql

file.input = data/customers-100.csv

CSV file

I run worker and manager on different terminals, using the cmd mvn spring-boot:run -D spring-boot.run.profiles="{profile}" where profile is worker or manager.

For continuous jobs it runs fine and produces the required results, and manager logs the job completed status, but for failed jobs, when I try to run the batch application the worker is able to connect to the server, but since the manager is not able to connect to the server the application comes to a standstill.

EDIT :

When I run the jobs in sequence, it basically logs that the job with a certain id was successfully completed or has failed. But when I try to run the application with those failed jobs, it picks up the first failed job (configured like this for testing purpose), and then logs the following things which is also logged when running normal jobs :

2024-03-08T00:13:47.622+05:30  INFO 19656 --- [           main] o.s.b.c.l.support.SimpleJobOperator      : Checking status of job execution with id=689
2024-03-08T00:13:47.678+05:30  INFO 19656 --- [           main] o.s.b.c.l.support.SimpleJobOperator      : Attempting to resume job with name=remoteChunkingJob and parameters={'run.id':'{value=5, type=class java.lang.Long, identifying=true}'}
2024-03-08T00:13:47.814+05:30  INFO 19656 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=remoteChunkingJob]] launched with the following parameters: [{'run.id':'{value=5, type=class java.lang.Long, identifying=true}'}]
2024-03-08T00:13:47.847+05:30  INFO 19656 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [managerStep]
2024-03-08T00:13:47.875+05:30  INFO 19656 --- [           main] o.s.b.i.c.ChunkMessageChannelItemWriter  : Waiting for 2 results

after logging these things, the manager application is basically stuck. When I check the ActiveMQ console, I see that the manager is not subscribed to the replies queue, and there is only 1 connection that is from the worker.

0

There are 0 best solutions below