Apache Camel File Component: Exception after aggregation should move file to .error folder

297 Views Asked by At

My goal is to move a pdf and a xml file from an input to an output folder when both files are available in the input folder. If one file is not available it should retry a couple of times and then move the file to an .error folder.

I noticed that the File component's error handling seems to be affected by the aggregator and I don't know how exactly:

When throwing an exception after the aggregation, the File component moves the file to the .done folder instead of the .error folder, see (2) in code. However, when throwing the exception before the aggregation it works as expected, see (1).

My questions are:

  1. What is missing to make (2) work?
  2. Is it possible to achieve the same without throwing an exception? There's the discardOnCompletionTimeout option but this doesn't work for me as well.
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.commons.io.FilenameUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;

public class SimpleMockTest extends CamelTestSupport {
    public static final int COMPLETION_TIMEOUT = 1000;

    @TempDir
    Path tempDir;

    @Test
    public void exceptionAfterAggregation() throws Exception {
        final MockEndpoint mockEndpoint = getMockEndpoint("mock:result");
        mockEndpoint.setResultWaitTime(COMPLETION_TIMEOUT * 3);
        mockEndpoint.expectedFileExists(tempDir.resolve(".done/willMoveToDone.xml"));
        mockEndpoint.expectedFileExists(tempDir.resolve(".done/willMoveToDone.pdf"));
        mockEndpoint.expectedFileExists(tempDir.resolve(".error/shouldMoveToError.xml")); // this fails

        createFile("willMoveToDone.xml", "<xml>1</xml>");
        createFile("willMoveToDone.pdf", "pdfFileContent");
        createFile("shouldMoveToError.xml", "<xml>2</xml>");

        mockEndpoint.assertIsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() {
        return new RouteBuilder() {
            @Override
            public void configure() {
                from("file://" + tempDir + "?include=.*\\.(xml|pdf)&moveFailed=.error&move=.done")
//                        .process(exchange -> {
//                            throw new RuntimeException("Files would be moved to .error as expected");     // (1)
//                        })
                        .aggregate(CORRELATION_EXPRESSION, AggregationStrategies.flexible().accumulateInCollection(ArrayList.class))
                        .completionTimeout(COMPLETION_TIMEOUT)
                        .completionSize(2)

                        .choice()
                            .when(simple("${body.size()} != 2"))
                                .throwException(RuntimeException.class, "Why doesn't the File component move this file to .error?")     // (2)
                        .otherwise()
                            .to("mock:result")
                        .routeId("fileAggregationRoute");
            }

        };
    }

    private final Expression CORRELATION_EXPRESSION = new Expression() {
        @Override
        public <T> T evaluate(Exchange exchange, Class<T> type) {
            final String fileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
            final String correlationExpression = FilenameUtils.getBaseName(fileName);
            return exchange.getContext().getTypeConverter().convertTo(
                    type,
                    correlationExpression
            );
        }
    };

    private void createFile(String filename, String content) throws IOException {
        final File xmlFile = tempDir.resolve(filename).toFile();
        Files.write(xmlFile.toPath(), content.getBytes(StandardCharsets.UTF_8));
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.mycompany.app</groupId>
  <artifactId>camel-playground</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>my-app</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-bom</artifactId>
        <version>3.11.3</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test-junit5</artifactId>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>commons-io</groupId>
      <artifactId>commons-io</artifactId>
      <version>2.11.0</version>
    </dependency>
  </dependencies>
</project>
0

There are 0 best solutions below