Copy header tag in xml spring batch application

917 Views Asked by At

I am using spring-batch in spring-boot application. The Spring Boot version is 2.3.3.RELEASE.

What I intend to achieve

I have to read a xml file containing thousands of Transactions with header tag (fileInformation). Do some business logic on transaction and then write the file back with the updated values in transaction. I am using StaxEventItemReader for reading the file and StaxEventItemWriter for writing to the file. Then i have couple of ItemProcessors for handling the business logic. Xml file looks like :

<?xml version="1.0" encoding="UTF-8"?>
<reportFile>
   <fileInformation>
      <sender>200GH7XZ60</sender>
      <timestamp>2020-12-23T09:05:34Z</timestamp>
      <environment>PRO</environment>
      <version>001.60</version>
   </fileInformation>
   <record>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
      <transaction>
         <buyer><buyer/>
      </transaction>
   </record>
</reportFile>

Problem that I am facing is with the value of header tags.

I have configured the OmegaXmlHeaderCallBack which generates the desired header tags but the value in those tags should be copied from the input file. As I am aware the StaxWriterCallback is initialized before reader, processor and writer. So I am not able to inject the value using late binding. This looked like a basic requirement, but couldn't find any solution on stackoverflow.

Here is the code snippets to configure spring batch job.

@Slf4j
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
PIExtractorItemProcessor pIExtractorItemProcessor;

@Autowired
JobBuilderFactory jobBuilderFactory;
 
@Autowired
StepBuilderFactory stepBuilderFactory;

@Value( "${eugateway.batch.chunk.size}" )
private int chunkSize;

@Bean
public Step jobStep(ItemStreamReader<CustomHeaderTransactionXmlElement> reader,
        CompositeItemProcessor<CustomHeaderTransactionXmlElement, 
        ProcessorWriterDto> processor,
        CompositeItemWriter<ProcessorWriterDto> writer,
        EdsClientItemWriteListener<ProcessorWriterDto> writeListener, 
        StepBuilderFactory stepBuilderFactory) {
    return stepBuilderFactory.get("extractAndReplacePersonalDataStep")
            .<CustomHeaderTransactionXmlElement, ProcessorWriterDto>chunk(chunkSize)
            .reader(reader)
            .processor(processor)
            .listener(writeListener)
            .writer(writer)
            .build();
}

@Bean
public Job extractPersonalDataJob(Step jobStep, JobResultListener jobListener,
        JobBuilderFactory jobBuilderFactory) {
    return jobBuilderFactory.get("extractAndReplacePersonalDataJob")
            .incrementer(new RunIdIncrementer())
            .start(jobStep)
            .listener(jobListener)
            .build();
}

@Bean
@StepScope
public ItemStreamReader<CustomHeaderTransactionXmlElement> itemReader(@Value("#{jobParameters[file.path]}") String path) {
    Jaxb2Marshaller transactionMarshaller = new Jaxb2Marshaller();
    transactionMarshaller.setClassesToBeBound (FileInformation.class, TransactionPositionReport.class);
    log.info("Generating StaxEventItemReader");

    return new StaxEventItemReaderBuilder<CustomHeaderTransactionXmlElement>()
            .name("headerTransaction")
            .resource(new FileSystemResource(new FileSystemResource(path)))
            .addFragmentRootElements("fileInformation", "transaction")
            .unmarshaller(transactionMarshaller)
            .build();
}

@Bean
@StepScope
OmegaXmlHeaderCallBack getOmegaXmlHeaderCallBack(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version){
    return new OmegaXmlHeaderCallBack(sender, timestamp, environment, version);
}

@Bean
@StepScope
OmegaXmlFooterCallBack getOmegaXmlFooterCallBack(){
    return new OmegaXmlFooterCallBack();
}

@StepScope
@Bean(name = "staxTransactionWriter")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    String exportFilePath = "C:\\Users\\sasharma\\Documents\\TO_BE_DELETED\\eugateway\\outputfile.xml";
    Resource exportFileResource = new FileSystemResource(exportFilePath);

    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    marshaller.setSupportDtd(true);
    marshaller.setSupportJaxbElementClass(true);
    marshaller.setClassesToBeBound(TransactionPositionReport.class);

    return new StaxEventItemWriterBuilder<TransactionPositionReport>()
            .name("transactionWriter")
            .version("1.0")
            .resource(exportFileResource)
            .marshaller(marshaller)
            .rootTagName("reportFile")
            .headerCallback(getOmegaXmlHeaderCallBack(sender, timestamp, environment, version))
            .footerCallback(getOmegaXmlFooterCallBack())
            .shouldDeleteIfEmpty(true)
            .build();
}

@Bean
@StepScope
public PIExtractorItemProcessor extractItemProcessor() {
    log.info("Generating PIExtractorItemProcessor");
    return new PIExtractorItemProcessor();
}

@Bean
public PIRemoverItemProcessor removeItemProcessor() {
    log.info("Generating PIRemoverItemProcessor");
    return new PIRemoverItemProcessor();
}

@Bean
@StepScope
CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> extractAndRemoveItemProcessor() {
    log.info("Generating CompositeItemProcessor");
    CompositeItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> itemProcessor = new CompositeItemProcessor<>();
    itemProcessor.setDelegates((List<? extends ItemProcessor<?, ?>>) Arrays.asList(extractItemProcessor(), removeItemProcessor()));
    return itemProcessor;
}

@Bean
@StepScope
public EdsClientItemWriter<ProcessorWriterDto> edsClientItemWriter() {
    log.info("Generating EdsClientItemWriter");
    return new EdsClientItemWriter<>();
}

@Bean
@StepScope
public OmegaXmlFileWriter<ProcessorWriterDto> omegaXmlFileWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating OmegaXmlFileWriter");
    return new OmegaXmlFileWriter(staxTransactionItemWriter(sender, timestamp, environment, version));
}


@Bean
@StepScope
public CompositeItemWriter<ProcessorWriterDto> compositeItemWriter(@Value("#{jobExecutionContext['header.sender']}") String sender,
        @Value("#{jobExecutionContext['header.timestamp']}") String timestamp,
        @Value("#{jobExecutionContext['header.environment']}") String environment,
        @Value("#{jobExecutionContext['header.version']}") String version) {
    log.info("Generating CompositeItemWriter");
    CompositeItemWriter<ProcessorWriterDto> compositeItemWriter = new CompositeItemWriter<>();
    compositeItemWriter.setDelegates(Arrays.asList(edsClientItemWriter(), omegaXmlFileWriter(sender, timestamp, environment, version)));
    return compositeItemWriter;
 }  
}

Below is the OmegaXmlHeaderCallBack class. Due to no late binding I always end up getting empty values in header tag.

@Slf4j
public class OmegaXmlHeaderCallBack implements StaxWriterCallback {
    private String sender;
    private String timestamp;
    private String environment;
    private String version;
    
    public OmegaXmlHeaderCallBack(String sender, String timestamp, String environment, String version) {
        super();
        this.sender = sender;
        this.timestamp = timestamp;
        this.environment = environment;
        this.version = version;
    }

    @Override
    public void write(XMLEventWriter writer) {
        XMLEventFactory factory = XMLEventFactory.newInstance();
        try {
            writer.add(factory.createStartElement("", "", "fileInformation"));

            writer.add(factory.createStartElement("", "", "sender"));
            writer.add(factory.createCharacters(sender));
            writer.add(factory.createEndElement("", "", "sender"));


            writer.add(factory.createStartElement("", "", "timestamp"));
            writer.add(factory.createCharacters(timestamp));
            writer.add(factory.createEndElement("", "", "timestamp"));

            writer.add(factory.createStartElement("", "", "environment"));
            writer.add(factory.createCharacters(environment));
            writer.add(factory.createEndElement("", "", "environment"));

            writer.add(factory.createStartElement("", "", "version"));
            writer.add(factory.createCharacters(version));
            writer.add(factory.createEndElement("", "", "version"));
            
            writer.add(factory.createEndElement("", "", "fileInformation"));
            
            writer.add(factory.createStartElement("", "", "record"));
            
        } catch (XMLStreamException e) {
            log.error("Error writing OMEGA XML Header: {}", e.getMessage());
            throw new OmegaXmlHeaderWriterException(e.getMessage());
        }
    }
}

Code for ItemProcessor is below. I am setting the header data into ExecutionContext which was intended to be read by headerCallback (sadly not going to happen).

@Slf4j
public class PIExtractorItemProcessor implements ItemProcessor<CustomHeaderTransactionXmlElement, ProcessorWriterDto> {

    @Autowired
    PersonalDataExtractor personalDataExtractor;
    
    @Value("#{jobParameters['submission.account']}") 
    private String subAccntId;
    
    @Value("#{stepExecution}")
    private StepExecution stepExecution;
    
    @Override
    public ProcessorWriterDto process(CustomHeaderTransactionXmlElement headerTransactionElement) throws Exception {
        FileInformation header = null;
        TransactionPositionReport transaction = null;
        if(headerTransactionElement instanceof FileInformation) {
            header = (FileInformation)headerTransactionElement;
            stepExecution.getExecutionContext().put("header.sender", header.getSender());
            stepExecution.getExecutionContext().put("header.timestamp", header.getTimestamp());
            stepExecution.getExecutionContext().put("header.environment", header.getEnvironment());
            stepExecution.getExecutionContext().put("header.version", header.getVersion());
            log.debug("Header {} found.", header.toString());
            return null;
        } else {
            transaction = (TransactionPositionReport)headerTransactionElement;
            log.debug("NO header info found for transaction {}", transaction.getProcessingDetails().getCustomerTransactionId());
            log.info("Extracting personal data for transaction customer id {} and create EDS requestDto.", transaction.getProcessingDetails().getCustomerTransactionId());
            ProcessorWriterDto transferObject = new ProcessorWriterDto();
            transferObject.setEdsRequestDtoList(personalDataExtractor.extract(transaction, subAccntId));
            transferObject.setTransaction(transaction);
            return transferObject;
        }
    }
}

Links that have been referred by me :

1

There are 1 best solutions below

0
On BEST ANSWER

Your step is doing too much. I would beak things down to two steps:

  • Step 1: extracts the file information header and puts it in the job execution context
  • Step 2: reads the file information header from the execution context and uses it in whatever step-scoped bean needed for that step (for example the stax callback in your case)

Here is a quick example:

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

import javax.xml.stream.XMLEventWriter;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.xml.StaxWriterCallback;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class SO67909123 {

    @Bean
    public Step extractHeaderStep(StepBuilderFactory steps) {
        return steps.get("extractHeaderStep")
                .tasklet((contribution, chunkContext) -> {
                    Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();
                    String inputFile = (String) jobParameters.get("file");
                    FileInformation fileInformation = extractFileInformation(inputFile);
                    ExecutionContext jobExecutionContext =  chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
                    jobExecutionContext.put("file.information", fileInformation);
                    return RepeatStatus.FINISHED;
                }).build();
    }

    private FileInformation extractFileInformation(String inputFile) {
        // TODO extract header from inputFile
        FileInformation fileInformation = new FileInformation();
        fileInformation.sender = "200GH7XKDGO3GLZ60";
        fileInformation.version = "001.60";
        return fileInformation;
    }

    @Bean
    public Step processFile(StepBuilderFactory steps) {
        return steps.get("processFile")
                .tasklet((contribution, chunkContext) -> { // Change this to a chunk-oriented tasklet
                    Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
                    FileInformation fileInformation = (FileInformation) jobExecutionContext.get("file.information");
                    System.out.println("Step 2: " + fileInformation);
                    return RepeatStatus.FINISHED;
        }).build();
    }
    
    @Bean
    @StepScope
    public StaxWriterCallback staxWriterCallback(@Value("#{jobExecutionContext['file.information']}") FileInformation fileInformation) {
        return new StaxWriterCallback() {
            @Override
            public void write(XMLEventWriter writer) throws IOException {
                // use fileInformation as needed here 
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(extractHeaderStep(steps))
                .next(processFile(steps))
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(SO67909123.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("file", "transactions.xml")
                .toJobParameters();
        jobLauncher.run(job, jobParameters);
    }

    static class FileInformation implements Serializable {
        private String sender;
        private String version;
        // other fields

        @Override
        public String toString() {
            return "FileInformation{sender='" + sender + '\'' + ", version='" + version + '\'' + '}';
        }
    }

}

This example shows the idea. You only need to write the snippet that extracts an xml tag from the file (only the header, see TODO). The StaxWriterCallback in that example is a step-scoped bean and can use the header from the execution context. Other step-scoped components from step 2 can also be configured in the same way (processor, listener, etc).