Spring batch restart job failing with transaction exception

1.4k Views Asked by At

I'm testing my spring batch job with restart feature, I'm processing total of nine records with chunk size 5, after processing the first chunk - I'm intentionally failing the second chunk to test the failure scenario. As expected, my batch got failed after the first chunk processed successfully and in my table - batch_job_execution I've the record with execution id and status as FAILED. Now I'm running the restart job by passing the execution id to verify the failed records are processing or not. But I'm getting the below exception when I run the failed job:

2022-05-03 18:58:44,829 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [RestartJobTasklet] 
Exception java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).while restart the failed job executionId8

Could you please assist me here - what I'm missing here. Please find my code below:

Appreciated your help in advance!

TestJobConfig.java

@Configuration
@Profile("myJob-config")
public class TestJobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private MyItemWriter myItemWriter;

    @Autowired
    private RestartJobTasklet restartJobTasklet;


    @Bean("myJob-config")
    public Job job(@Qualifier("validateStep") Step validateStep,
                   @Qualifier("processRecords") Step processRecords) {
        Job job = jobBuilderFactory.get("myJob-config")
                .incrementer(new RunIdIncrementer())
                .start(validateStep)
                .on("FAILED")
                .end()
                .from(validateStep).on("*").to(processRecords)
                .end()
                .build();
        return job;
    }

    @Bean("restart-myjob")
    public Job restartJob(@Qualifier("restartMyJobStep") Step restartMyJobStep) {
        return jobBuilderFactory.get("restart-myjob")
                .incrementer(new RunIdIncrementer())
                .start(restartMyJobStep)
                .build();
    }

    @Bean(name = "restartMyJobStep")
    public Step restartMyJobStep() {
        return this.stepBuilderFactory.get("restart-failed-job")
                .tasklet(restartJobTasklet)
                .build();
    }


    @Bean(name = "processRecords")
    public Step processRecords() {
        return this.stepBuilderFactory.get("process-csv-records").<Employee, Employee>chunk(5)
                .reader(reader())
                .writer(itemWriter())
                .build();
    }


    @Bean(name = "validateStep")
    public Step validateStep(@Qualifier("validateTasklet") Tasklet validateTasklet) {
        return stepBuilderFactory.get("validateStep")
                .tasklet(validateTasklet)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean(name = "validateTasklet")
    public Tasklet validateTasklet() {
        return new Tasklet() {
            @Override
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                return RepeatStatus.FINISHED;
            }
        };
    }

    @Bean
    public FlatFileItemReader<Employee> reader() {
        FlatFileItemReader<Employee> flatFileItemReader = new FlatFileItemReader<>();

        flatFileItemReader.setLinesToSkip(1);
        flatFileItemReader.setResource(new ClassPathResource("/csv/emps.csv"));

        DefaultLineMapper<Employee> empDefaultLineMapper = new DefaultLineMapper<>();
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setNames(new String[]{"id", "firstName", "lastName"});

        empDefaultLineMapper.setLineTokenizer(lineTokenizer);
        empDefaultLineMapper.setFieldSetMapper(new EmployeeFieldSetMapper());
        empDefaultLineMapper.afterPropertiesSet();

        flatFileItemReader.setLineMapper(empDefaultLineMapper);

        return flatFileItemReader;
    }

    @Bean
    public MyItemWriter<Employee> itemWriter() {
        return myItemWriter;
    }
}

RestartJobTasklet.java

@Component
public class RestartJobTasklet implements Tasklet, StepExecutionListener {

    @Autowired
    JobExplorer jobExplorer;
    @Autowired
    JobOperator jobOperator;
    private StepExecution stepExecution;
    private JobExecution jobExecution;
    @Autowired
    private OpsJobProperties props;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.jobExecution = stepExecution.getJobExecution();
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution,
                                ChunkContext chunkContext) throws Exception {

        Long executionId = 8l;
        try {
            Long restartId = jobOperator.restart(executionId);
            JobExecution restartExecution = jobExplorer.getJobExecution(restartId);
        } catch (JobRestartException e) {
            throw e;
        } catch (Exception exception) {
            throw exception;
        }
        return RepeatStatus.FINISHED;
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return ExitStatus.COMPLETED;
    }
}

DBConfig.java

@Configuration
public class DBConfig extends DefaultBatchConfigurer {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Bean
    public JobRepository jobRepository(@Autowired DataSource dataSource,
                                       @Autowired PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactory = new JobRepositoryFactoryBean();
        jobRepositoryFactory.setDatabaseType(DatabaseType.POSTGRES.name());
        jobRepositoryFactory.setDataSource(dataSource);
        jobRepositoryFactory.setTransactionManager(transactionManager);
        jobRepositoryFactory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        jobRepositoryFactory.setTablePrefix("BATCH_");
        jobRepositoryFactory.setMaxVarCharLength(1000);
        jobRepositoryFactory.setValidateTransactionState(Boolean.FALSE);
        return jobRepositoryFactory.getObject();
    }

    @Bean()
    public DataSource dataSource() {
        PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
        pgSimpleDataSource.setServerName("my-db-server");
        pgSimpleDataSource.setDatabaseName("test-db");
        pgSimpleDataSource.setUser("test");
        pgSimpleDataSource.setPassword("test");
        return pgSimpleDataSource;
    }

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(final JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
        postProcessor.setJobRegistry(jobRegistry);
        return postProcessor;
    }

    @Bean
    public JobOperator jobOperator(final JobLauncher jobLauncher, final JobRepository jobRepository,
                                   final JobRegistry jobRegistry, final JobExplorer jobExplorer) {
        final SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobLauncher(jobLauncher);
        jobOperator.setJobRepository(jobRepository);
        jobOperator.setJobRegistry(jobRegistry);
        jobOperator.setJobExplorer(jobExplorer);
        return jobOperator;
    }

    @Bean
    public JobExplorer jobExplorer(@Autowired DataSource dataSource) throws Exception {
        final JobExplorerFactoryBean bean = new JobExplorerFactoryBean();
        bean.setDataSource(dataSource);
        bean.setTablePrefix("BATCH_");
        bean.setJdbcOperations(new JdbcTemplate(dataSource));
        bean.afterPropertiesSet();
        return bean.getObject();
    }

    @Bean
    public PlatformTransactionManager transactionManager(@Autowired DataSource dataSource) {
        return new DataSourceTransactionManager(dataSource);
    }

}

Error Log

2022-05-03 18:58:44,865 [ JOB=scheduler.id_IS_UNDEFINED ] [THREAD=main] ERROR [org.springframework.batch.core.step.AbstractStep] 
Encountered an error executing step restart-failed-job in job restart-myjob
java.lang.IllegalStateException: Existing transaction detected in JobRepository. Please fix this and try again (e.g. remove @Transactional annotations from client).
        at org.springframework.batch.core.repository.support.AbstractJobRepositoryFactoryBean$1.invoke(AbstractJobRepositoryFactoryBean.java:177)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy68.createJobExecution(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:137)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:128)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
        at com.sun.proxy.$Proxy73.run(Unknown Source)
        at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
        at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$e5e87de1.restart(<generated>)
1

There are 1 best solutions below

0
On

You are trying to restart a job from within a tasklet in another job. This tasklet (RestartJobTasklet) is already running in a transaction, and calling a method that results in another transaction being created with JobRepository#createJobExecution down the line. Hence the error. In other words, you should not use the JobRepository in a transactional context.

I am not sure if it's a good idea to create a job to restart another job. Restarting a failed job instance is typically a technical task that does not require a job. I would recommend designing jobs to implement business logic, but not for technical tasks.

That said, if you extract the restart code in a main method, your sample should work as expected.