Integration Flow does not receive new message on each test method

407 Views Asked by At

I am testing my integration flow with 2 different scenarios: reading an XML file and try to validate it, so in case it's a valid XML then it should be moved to processed Dir, otherwise to Error Dir.

this is the main Integration flow:

package com.stackoverflow.questions.config;


import static java.util.Arrays.asList;

import com.stackoverflow.questions.dto.WriteResult;
import com.stackoverflow.questions.handler.FileReaderHandler;
import com.stackoverflow.questions.handler.StudentErrorHandler;
import com.stackoverflow.questions.handler.StudentWriterHandler;
import com.stackoverflow.questions.service.DirectoryManagerService;
import com.stackoverflow.questions.transformer.FileToStudentTransformer;
import java.io.File;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.DirectoryScanner;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.RecursiveDirectoryScanner;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
@RequiredArgsConstructor
public class MainIntegrationFlow {

  @Value("${regex.filename.pattern}")
  private String regexFileNamePattern;

  @Value("${root.file.dir}")
  private String rootFileDir;

  @Value("${default.polling.rate}")
  private Long defaultPollingRate;

  private final DirectoryManagerService directoryManagerService;
  private final StudentErrorHandler studentErrorHandler;
  private final FileReaderHandler fileReaderHandler;
  private final StudentWriterHandler studentWriterHandler;
  private final FileToStudentTransformer fileToStudentTransformer;

  @Bean("mainStudentIntegrationFlow")
  public IntegrationFlow mainStudentIntegrationFlow(
      @Qualifier("mainFileReadingSourceMessage") MessageSource<File> mainFileReadingSourceMessage,
      @Qualifier("fileReaderChannel") MessageChannel fileReaderChannel) {
    return IntegrationFlows.from(mainFileReadingSourceMessage)
        .handle(fileReaderHandler)
        .transform(fileToStudentTransformer)
        .handle(studentWriterHandler)
        .<WriteResult, Boolean>route(WriteResult::isWriten,
            mapping -> mapping
                .subFlowMapping(true, moveToProcessedDirFlow())
                .subFlowMapping(false, moveToErrorDirFlow()))
        .get();
  }


  public IntegrationFlow moveToProcessedDirFlow() {
    return flow -> flow.handle(message ->
        directoryManagerService
            .moveToProcessedDir(((WriteResult) message.getPayload()).getFilename()));
  }

  public IntegrationFlow moveToErrorDirFlow() {
    return flow -> flow.channel("studentErrorChannel")
        .handle(message ->
            directoryManagerService
                .moveToErrorDir(((WriteResult) message.getPayload()).getFilename()));
  }

  @Bean(name = "errorHandlerMainFlow")
  public IntegrationFlow errorHandlerMainFlow() {
    return IntegrationFlows.from("errorChannel")
        .handle(studentErrorHandler)
        .get();
  }

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata mainPollerMetadata() {
    return Pollers.fixedRate(defaultPollingRate, TimeUnit.SECONDS)
        .maxMessagesPerPoll(0)
        .get();
  }

  @Bean(name = "fileReaderChannel")
  public MessageChannel fileReaderChannel() {
    return MessageChannels.direct().get();
  }

  @Bean("mainDirectoryScanner")
  public DirectoryScanner mainDirectoryScanner() {
    DirectoryScanner recursiveDirectoryScanner = new RecursiveDirectoryScanner();

    CompositeFileListFilter<File> compositeFileListFilter = new CompositeFileListFilter<>(
        asList(new AcceptOnceFileListFilter<>(),
            new RegexPatternFileListFilter(regexFileNamePattern)));

    recursiveDirectoryScanner.setFilter(compositeFileListFilter);
    return recursiveDirectoryScanner;
  }

  @Bean("mainFileReadingSourceMessage")
  public MessageSource<File> mainFileReadingSourceMessage(
      @Qualifier("mainDirectoryScanner") DirectoryScanner mainDirectoryScanner) {
    FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource();
    fileReadingMessageSource.setDirectory(new File(rootFileDir));
    fileReadingMessageSource.setScanner(mainDirectoryScanner);

    return fileReadingMessageSource;
  }
}

but when running the tests, the first one succeeds but the second fails - and I can see from the logs that the message does not go through the integration flow for the second test:

package com.stackoverflow.questions;

import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

import com.stackoverflow.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;

@SpringBootTest
@DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD)
public class MainFlowIntegrationTests {


  private static final String MOCK_FILE_DIR = "intFiles/";
  private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
  private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";

  @Autowired
  private MessageChannel fileReaderChannel;

  @Autowired
  private DirectoryManagerService directoryManagerService;

  private File queueDir;
  private File processed;
  private File error;

  @BeforeEach
  public void setup() throws IOException {
    createRequiredDirectories();
    moveFilesToQueueDir();
    injectProperties();
  }

  @AfterEach
  public void tearDown() throws IOException {
    deleteRequiredDirectories();
  }

  @Test
  public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a valid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());

    // Then: the valid XML file should be sent to the processedDir
    await().until(() -> processed.list().length == 1);
  }

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a invalid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());

    // Then: the invalid XML file should be sent to the errorDir
    await().until(() -> error.list().length == 1);
  }

  private void injectProperties() {
    ReflectionTestUtils
        .setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
    ReflectionTestUtils
        .setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
  }

  private void moveFilesToQueueDir() throws IOException {
    File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();

    for (String filename : intFiles.list()) {
      FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
    }
  }

  private void createRequiredDirectories() throws IOException {
    queueDir = Files.createTempDirectory("queueDir").toFile();
    processed = Files.createTempDirectory("processedDir").toFile();
    error = Files.createTempDirectory("errorDir").toFile();
  }

  private void deleteRequiredDirectories() throws IOException {
    forceDelete(queueDir);
    forceDelete(processed);
    forceDelete(error);
  }

}

StackTrace:



org.awaitility.core.ConditionTimeoutException: Condition with lambda expression in com.stackoverflow.questions.MainFlowIntegrationTests was not fulfilled within 10 seconds.

    at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:165)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:78)
    at org.awaitility.core.CallableCondition.await(CallableCondition.java:26)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:895)
    at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:864)
    at com.stackoverflow.questions.MainFlowIntegrationTests.readingFileAndMoveItToProcessedDir(MainFlowIntegrationTests.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
    at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
    at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)


The second test fails, unless I reload the application context after each test - using @DirtiesContext(classMode = BEFORE_EACH_TEST_METHOD) or autowiring my integration as a StandardIntegrationFlow and start/stop it manually as below:

package com.stackoverflow.questions;

import static org.apache.commons.io.FileUtils.forceDelete;
import static org.awaitility.Awaitility.await;
import static org.springframework.test.annotation.DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD;

import com.stackoverflow.questions.service.DirectoryManagerService;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.io.ClassPathResource;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.util.ReflectionTestUtils;

@SpringBootTest
public class MainFlowIntegrationTests {


  private static final String MOCK_FILE_DIR = "intFiles/";
  private static final String VALID_XML_MOCK_FILE = "valid01-student-01.xml";
  private static final String INVALID_XML_MOCK_FILE = "invalid02-student-02.xml";

  @Autowired
  private MessageChannel fileReaderChannel;

  @Autowired
  private StandardIntegrationFlow mainStudentIntegrationFlow;

  @Autowired
  private DirectoryManagerService directoryManagerService;

  private File queueDir;
  private File processed;
  private File error;

  @BeforeEach
  public void setup() throws IOException {
    createRequiredDirectories();
    moveFilesToQueueDir();
    injectProperties();
    mainStudentIntegrationFlow.start();
  }

  @AfterEach
  public void tearDown() throws IOException {
    mainStudentIntegrationFlow.stop();
    deleteRequiredDirectories();
  }

  @Test
  public void readingFileAndMoveItToProcessedDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a valid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, VALID_XML_MOCK_FILE)).build());

    // Then: the valid XML file should be sent to the processedDir
    await().until(() -> processed.list().length == 1);
  }

  @Test
  public void readingInvalidFileAndMoveItToErrorDir() throws IOException, InterruptedException {
    // When: the fileReaderChannel receives a invalid XML file
    fileReaderChannel
        .send(MessageBuilder.withPayload(new File(queueDir, INVALID_XML_MOCK_FILE)).build());

    // Then: the invalid XML file should be sent to the errorDir
    await().until(() -> error.list().length == 1);
  }

  private void injectProperties() {
    ReflectionTestUtils
        .setField(directoryManagerService, "errorDir", error.getAbsolutePath().concat("/"));
    ReflectionTestUtils
        .setField(directoryManagerService, "processedDir", processed.getAbsolutePath().concat("/"));
  }

  private void copyFilesToQueueDir() throws IOException {
    File intFiles = new ClassPathResource(MOCK_FILE_DIR).getFile();

    for (String filename : intFiles.list()) {
      FileUtils.copyFile(new File(intFiles, filename), new File(queueDir, filename));
    }
  }

  private void createRequiredDirectories() throws IOException {
    queueDir = Files.createTempDirectory("queueDir").toFile();
    processed = Files.createTempDirectory("processedDir").toFile();
    error = Files.createTempDirectory("errorDir").toFile();
  }

  private void deleteRequiredDirectories() throws IOException {
    forceDelete(queueDir);
    forceDelete(processed);
    forceDelete(error);
  }

}

0

There are 0 best solutions below