Spring batch exception handling sended as ResponseEntity

559 Views Asked by At

I'm new in Spring boot, I'm training on a small project with Spring batch to get experience.

Here my context: I have 2 .csv files, one holds employees, the other contains all managers of the company. I have to read the files, then add each record in database. To make it simple, I just need to call an endpoint from my controller, upload my .csv file (multipartfile), then the job will start. I actually was able to do that, my problem is the following.

I have to manage multiple kind of validation (I'm using jsr 380 validation for my entities and I have also to check business exception). A kind of business exception can be the following rule: an employee is supervised by a manager of his department (the employee can't be supervised by a manager, if he's not on same department, otherwise should throw exception). So for mistaken records, with some invalid or "Illogic" input, I have to skip them (don't save in the database) but store them in an Map or List* that should be sent as response entity to the client.

Hence the client would know which row need to be fixed. I suppose I have to take a look at Listeners, but I really can t store exceptions in a map or list then send it as ResponseEntity.

This is an example of what I want to achieve:

My csv files screenshots

EmployeeBatchConfig.java:

@Configuration
@EnableBatchProcessing
@AllArgsConstructor
public class EmployeeBatchConfig {

  private JobBuilderFactory jobBuilderFactory;
  private StepBuilderFactory stepBuilderFactory;
  private EmployeeRepository employeeRepository;
  private EmployeeItemWriter employeeItemWriter;


 @Bean
 @StepScope
 public FlatFileItemReader<EmployeeDto> itemReader(@Value("# 
 {jobParameters[fullPathFileName]}") final String pathFile) {
     FlatFileItemReader<EmployeeDto> flatFileItemReader = new 
     FlatFileItemReader<>();
     flatFileItemReader.setResource(new FileSystemResource(new 
     File(pathFile)));
     flatFileItemReader.setName("CSV-Reader");
     flatFileItemReader.setLinesToSkip(1);
     flatFileItemReader.setLineMapper(lineMapper());
     return flatFileItemReader;
 }

 private LineMapper<EtudiantDto> lineMapper() {
    DefaultLineMapper<EtudiantDto> lineMapper = new DefaultLineMapper<> 
    ();
    DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
    lineTokenizer.setDelimiter(",");
    lineTokenizer.setStrict(false);
    lineTokenizer.setNames("Username", "lastName", "firstName", 
    "departement", "supervisor");
    BeanWrapperFieldSetMapper<EmployeeDto> fieldSetMapper = new 
    BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(EmployeeDto.class);
    lineMapper.setLineTokenizer(lineTokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);

  return lineMapper;
}

@Bean
public EmployeeProcessor processor() {
  return new EmployeeProcessor(); /*Create a bean processor to skip 
    invalid rows*/
}

@Bean
public RepositoryItemWriter<Employee> writer() {
  RepositoryItemWriter<Employee> writer = new RepositoryItemWriter<>();
  writer.setRepository(employeeRepository);
  writer.setMethodName("save");
  return writer;
}


@Bean
public Step step1(FlatFileItemReader<EmployeeDto> itemReader) {
    return stepBuilderFactory.get("slaveStep").<EmployeeDto, 
    Employee>chunk(5)
            .reader(itemReader)
            .processor(processor())
            .writer(employeeItemWriter)
            .faultTolerant()
            .listener(skipListener())
            .skip(SkipException.class)
            .skipLimit(10)
            .skipPolicy(skipPolicy())
            .build();
 }


 @Bean
 @Qualifier("executeJobEmployee")
 public Job runJob(FlatFileItemReader<Employee> itemReader) {
    return jobBuilderFactory
            .get("importEmployee")
            .flow(step1(itemReader))
            .end()
            .build();
 }


@Bean
public SkipPolicy skipPolicy(){
    return new ExceptionSkipPolicy();
}

@Bean
public SkipListener<EmployeeDto, Employee> skipListener(){
    return new StepSkipListener();
}

/*@Bean
public ExecutionContext executionContext(){
   return new ExecutionContext();
}*/
}

EmployeeProcessor.java:

public class EmployeeProcessor implements ItemProcessor<EmployeeDto, 
Employee>{

   @Autowired
   private SupervisorService managerService;

   @Override
   public Employee process(@Valid EmployeeDto item) throws Exception, 
   SkipException {          
      ManagerDto manager = 
      SupervisorService.findSupervisorById(item.getSupervisor());    
      //retrieve the manager of the employee    and compare departement
      if(!(manager.getDepartement().equals(item.getDepartement()))) {
        throw new SkipException("Manager Invalid", item);
        //return null;
      }
      return ObjectMapperUtils.map(item, Employee.class);
   }
}

MySkipPolicy.java:

public class MySkipPolicy implements SkipPolicy {

  @Override
  public boolean shouldSkip(Throwable throwable, int i) throws 
  SkipLimitExceededException {
    return true;
  }
}

StepSkipListenerPolicy.java:

public class StepSkipListener implements SkipListener<EmployeeDto, 
Number> {

  @Override // item reader
  public void onSkipInRead(Throwable throwable) {
    System.out.println("In OnSkipReader");
  }

  @Override // item writer
  public void onSkipInWrite(Number item, Throwable throwable) { 
    System.out.println("Nooooooooo ");
  }

  //@SneakyThrows
  @Override // item processor
  public void onSkipInProcess(@Valid EmployeeDto employee, Throwable 
  throwable){
    System.out.println("Process... ");
   /* I guess this is where I should work, but how do I deal with the
   exception occur? How do I know which exception I would get ? */
  }
}

SkipException.java:

public class SkipException extends Exception {

  private Map<String, EmployeeDto> errors = new HashMap<>();

  public SkipException(String errorMessage, EmployeeDto employee) {
    super();
    this.errors.put(errorMessage, employee);
  } 

  public Map<String, EmployeeDto> getErrors() {
    return this.errors;
  }
}

JobController.java:

@RestController
@RequestMapping("/upload")
public class JobController {

  @Autowired    
  private JobLauncher jobLauncher;

  @Autowired
  @Qualifier("executeJobEmployee")
  private Job job;  

  private final String EMPLOYEE_FOLDER = "C:/Users/Project/Employee/";

  @PostMapping("/employee")
  public ResponseEntity<Object> importEmployee(@RequestParam("file") 
   MultipartFile multipartFile) throws JobInterruptedException, 
   SkipException, IllegalStateException, IOException, 
   FlatFileParseException{         
        
   try {
      String fileName = multipartFile.getOriginalFilename();
      File fileToImport= new File(EMPLOYEE_FOLDER + fileName);
      multipartFile.transferTo(fileToImport);
    
      JobParameters jobParameters = new JobParametersBuilder()
        .addString("fullPathFileName", EMPLOYEE_FOLDER + fileName)
        .addLong("startAt", System.currentTimeMillis())
        .toJobParameters();

      JobExecution jobExecution = this.jobLauncher.run(job, 
       jobParameters);

      ExecutionContext executionContext = 
       jobExecution.getExecutionContext();

      System.out.println("My Skiped items : " + 
     executionContext.toString());

  } catch (ConstraintViolationException | FlatFileParseException | 
           JobRestartException | JobInstanceAlreadyCompleteException | 
           JobParametersInvalidException | 
           JobExecutionAlreadyRunningException e) {
           e.printStackTrace();
    return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
  }    
    return new ResponseEntity<>("Employee inserted succesfully", 
    HttpStatus.OK);   
  }
}
1

There are 1 best solutions below

2
On

That requirement forces your implementation to wait for the job to finish before returning the web response, which is not the typical way of launching batch jobs from web requests. Typically, since batch jobs can run for several minutes/hours, they are launched in the background and a job ID is returned back to the client for later status check.

In Spring Batch, the SkipListener is the extension point that allows you to add custom code when a skippable exception happens when reading, processing or writing an item. I would add the business validation in an item processor and throw an exception with the skipped item and the reason for that skip (both encapsulated in the exception class that should be declared as skippable).

Skipped items are usually stored somewhere for later analysis (like a table or a file or the job execution context). In your case, you need to send them back in the web response, so you can read them from the store of your choice before returning them attached in the web response. In pseudo code in your controller, this should be something like the following:

- run the job and wait for its termination (the skip listener would write skipped items in the storage of your choice)
- get skipped items from storage
- return web response

For example, if you choose to store skipped items in the job execution context, you can do something like this in your controller:

JobExecution jobExecution = jobLauncher.run(job, jobParameters);
ExecutionContext executionContext = jobExecution.getExecutionContext();
// get skipped items from the execution context
// return the web response