Spring Integration Mail: Send email after all database inserts

249 Views Asked by At

Hello I have an integration flow that splits a file line by line, transforms each line into a POJO and then inserts that POJO into a db via JDBC outbound gateway.

I want to be able to send a single email once the process of the file has completed. I currently am sending to smtpFlow channel after my jdbcOutboundGateway, however this is sending an email after every db insert.

Here is my current flow DSL

IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .split(splitFile())
            .transform(this::transformToIndividualScore)
            .handle(jdbcOutboundGateway(null))
            .channel("smtpFlow")
            .get();

How do I get this flow to only send one email after all files have been processed in the jdbcOutboundGateway?

Here is my splitFile() method

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(true, false);
    fs.setFirstLineAsHeader("IndividualScore");
    return fs;

Here is my transformToIndividualScore method

@Transformer
private IndividualScore transformToIndividualScore(String payload) {
    String[] values = payload.split(",");
    IndividualScore is = new IndividualScore();
    is.setScorecardDate(values[0]);
    is.setVnSpId(values[1]);
    is.setPrimaryCat(values[2]);
    is.setSecondaryCat(values[3]);
    is.setScore(Integer.parseInt(values[4]));
    is.setActual(values[5]);
    return is;
}
3

There are 3 best solutions below

0
On BEST ANSWER

with the help of @ArtemBilan

I was able to use the publishSubscribeChannel() and chain 2 subscribe() methods in sequence below is the new IntegrationFlow

 @Bean
IntegrationFlow ftpFlow() {
    return IntegrationFlows.from(
            ftpSource(), spec -> spec.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
            .publishSubscribeChannel(channel -> channel
                    .subscribe(
                        a -> a
                                .split(splitFile())
                                .transform(this::transformToIndividualScore)
                                .handle(jdbcMessageHandler(null)))
                    .subscribe(
                        b -> b
                                .transform(this::transformToSuccessEmail)
                                .handle(emailHandler()))
            )
            .get();
0
On

So a solution to my problem, (kind of).

Marking iterator to false on my FileSplitter now allows the sequencing headers.

the updated splitFile() is below

@Bean
FileSplitter splitFile() {
    FileSplitter fs = new FileSplitter(false, false);
    fs.setFirstLineAsHeader("IndividualScore");
    fs.setApplySequence(true);
    return fs;
}

My intuition is telling me that the default .aggregate() release strategy must be that the message header sequenceSize == the list of messages aggregated.

When creating the FileSplitter with iterator set to true the sequenceSize is set to 0 which will never fulfill the release strategy of the default .aggregate()

However, this makes the FileSplitter use a List to store all lines of the file in memory. The aggregator is also storing another ArrayList of lines in memory..

Is there a better solution to creating a custom aggregator that handles the END FileMarker for allowing for usage of the iterator for splitting a file?

11
On

Add .aggregate() after the handle to assemble the results of the splits back into a single message.