Embedded kafka in tests with JUnit failing when running all tests

286 Views Asked by At

I'm using embedded kafka, spring and junit to run the integration with my listener, today if I run just this class the tests pass, but if I run all the application tests with or without jacoco coverage it falls into timeout, follow the current listener code, embedded kafka config and test.

@Component
public class CategoryEventListener {

    public static final TypeReference<MessageValue<OutboxEventEntity>> CATEGORY_MESSAGE = new TypeReference<>() {
    };
    private static final Logger LOG = LoggerFactory.getLogger(CategoryEventListener.class);

    private final DefaultSaveCategoryUseCase saveCategoryUseCase;
    private final DefaultRemoveCategoryUseCase removeCategoryUseCase;

    public CategoryEventListener(
            final DefaultSaveCategoryUseCase saveCategoryUseCase,
            final DefaultRemoveCategoryUseCase removeCategoryUseCase
    ) {
        this.saveCategoryUseCase = Objects.requireNonNull(saveCategoryUseCase);
        this.removeCategoryUseCase = Objects.requireNonNull(removeCategoryUseCase);
    }

    @KafkaListener(
            concurrency = "${kafka.consumers.categories.concurrency}",
            containerFactory = "kafkaListenerFactory",
            topics = "${kafka.consumers.categories.topics}",
            groupId = "${kafka.consumers.categories.group-id}",
            id = "${kafka.consumers.categories.id}",
            properties = {
                    "auto.offset.reset=${kafka.consumers.categories.auto-offset-reset}"
            }
    )
    public void onMessage(@Payload final String payload, final Acknowledgment ack) {
        LOG.debug("Message received from Kafka: {}", payload);
        final var aOutBoxEvent = Json.readValue(payload, CATEGORY_MESSAGE).payload().after();

        switch (aOutBoxEvent.getEventType()) {
            case EventsTypes.CATEGORY_CREATED -> {
                final var aCategoryCreated = Json.readValue(aOutBoxEvent.getData(), CategoryCreatedEvent.class);
                final var aCategory = aCategoryCreated.toDomain();
                this.saveCategoryUseCase.execute(aCategory);
                ack.acknowledge();
                LOG.debug("Category created received from Kafka: {}", aCategory.getName());
            }
            case EventsTypes.CATEGORY_UPDATED -> {
                final var aCategoryUpdated = Json.readValue(aOutBoxEvent.getData(), CategoryUpdatedEvent.class);
                final var aCategory = aCategoryUpdated.toDomain();
                this.saveCategoryUseCase.execute(aCategory);
                ack.acknowledge();
                LOG.debug("Category updated received from Kafka: {}", aCategory.getName());
            }
            case EventsTypes.CATEGORY_DELETED -> {
                final var aCategoryDeleted = Json.readValue(aOutBoxEvent.getData(), CategoryDeletedEvent.class);
                this.removeCategoryUseCase.execute(RemoveCategoryCommand
                        .with(aCategoryDeleted.rootCategoryId(), aCategoryDeleted.subCategoryId()
                                .orElse(null)));
                ack.acknowledge();
                LOG.debug("Category deleted received from Kafka: {}", aOutBoxEvent.getData());
            }
            default -> LOG.warn("Event type not supported: {}", aOutBoxEvent.getEventType());
        }
    }
}

embedded kafka config:

@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
@ActiveProfiles("test-integration-kafka")
@EnableAutoConfiguration(exclude = {ElasticsearchRepositoriesAutoConfiguration.class})
@SpringBootTest(
        classes = {Main.class, AmqpTestConfiguration.class, IntegrationTestConfiguration.class},
        properties = {"kafka.bootstrap-servers=${spring.embedded.kafka.brokers}"})
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@ExtendWith(JpaCleanUpExtension.class)
@Tag("heavyIntegrationTest")
public abstract class AbstractEmbeddedKafkaTest {

    @Autowired
    protected EmbeddedKafkaBroker kafkaBroker;

    private Producer<String, String> producer;

    @BeforeAll
    void init() {
        producer =
                new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaBroker), new StringSerializer(), new StringSerializer())
                        .createProducer();
    }

    @AfterAll
    void shutdown() {
        producer.close();
    }

    protected Producer<String, String> producer() {
        return producer;
    }

    protected Source aSource() {
        return new Source("ecommerce-mysql", "ecommerce", "outbox");
    }
}

application.yml

kafka:
  auto-create-topics: true
  bootstrap-servers: localhost:9092
  pool-timeout: 1_000
  auto-commit: false
  consumers:
    categories:
      auto-offset-reset: earliest
      concurrency: 1
      id: kafka-listener-categories
      topics: category-topic
      group-id: categories-group

consumer config:

@Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerFactory() {
        final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setPollTimeout(kafkaProperties.getPoolTimeout());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    private ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        final var props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, kafkaProperties.isAutoCreateTopics());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.isAutoCommit());
        return props;
    }

tests:

@Execution(ExecutionMode.CONCURRENT)
@Order(1)
public class CategoryEventListenerTest extends AbstractEmbeddedKafkaTest {

    @MockBean
    private DefaultSaveCategoryUseCase saveCategoryUseCase;

    @MockBean
    private DefaultRemoveCategoryUseCase removeCategoryUseCase;

    @Value("${kafka.consumers.categories.topics}")
    private String categoryTopic;

    @Test
    void givenAValidCategoryCreatedEvent_whenReceive_shouldPersistCategory() throws Exception {
        // given
        final var aCategory = Fixture.Categories.tech();
        final var aCategoryEvent = CategoryCreatedEvent.from(aCategory);
        final var aOutboxEvent = OutboxEventEntity.from(aCategoryEvent);

        final var aMessage = Json.writeValueAsString(new MessageValue<>(new ValuePayload<>(aOutboxEvent, aOutboxEvent, aSource(), Operation.CREATE)));

        final var latch = new CountDownLatch(1);

        Mockito.doAnswer(t -> {
            latch.countDown();
            return aCategory;
        }).when(saveCategoryUseCase).execute(Mockito.any());

        // when
        producer().send(new ProducerRecord<>(categoryTopic, aMessage));
        producer().flush();

        Assertions.assertTrue(latch.await(3, TimeUnit.MINUTES));

        // then
        Mockito.verify(saveCategoryUseCase, Mockito.times(1)).execute(eq(aCategory));
    }

    @Test
    void givenAValidCategoryUpdatedEvent_whenReceive_shouldPersistCategory() throws Exception {
        // given
        final var aCategory = Fixture.Categories.home();
        final var aCategoryEvent = CategoryUpdatedEvent.from(aCategory);
        final var aOutboxEvent = OutboxEventEntity.from(aCategoryEvent);

        final var aMessage = Json.writeValueAsString(new MessageValue<>(new ValuePayload<>(aOutboxEvent, aOutboxEvent, aSource(), Operation.CREATE)));

        final var latch = new CountDownLatch(1);

        Mockito.doAnswer(t -> {
            latch.countDown();
            return aCategory;
        }).when(saveCategoryUseCase).execute(Mockito.any());

        // when
        producer().send(new ProducerRecord<>(categoryTopic, aMessage));
        producer().flush();

        Assertions.assertTrue(latch.await(3, TimeUnit.MINUTES));

        // then
        Mockito.verify(saveCategoryUseCase, Mockito.times(1)).execute(eq(aCategory));
    }
}

error on test fail:

2023-11-30T13:48:32.426-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761', protocol='range'}
2023-11-30T13:48:32.427-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e', protocol='range'}
2023-11-30T13:48:32.427-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860', protocol='range'}
2023-11-30T13:48:32.427-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Successfully joined group with generation Generation{generationId=2, memberId='consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3', protocol='range'}
2023-11-30T13:48:32.427-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Finished assignment for group at generation 2: {consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761=Assignment(partitions=[]), consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e=Assignment(partitions=[]), consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3=Assignment(partitions=[category-topic-0]), consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860=Assignment(partitions=[])}
2023-11-30T13:48:32.428-03:00  INFO 287799 --- [quest-handler-3] k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Assignment received from leader consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3 for group categories-group for generation 2. The group has 4 members, 0 of which are static.
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-10-3469919c-1c4e-4946-91fd-97a56f1ae761', protocol='range'}
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-8-381000c5-ae86-40d6-8f21-8e6b2638279e', protocol='range'}
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-9-9444b7c4-693c-4945-952a-998898c91860', protocol='range'}
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[])
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Successfully synced group in generation Generation{generationId=2, memberId='consumer-categories-group-1-b832fc02-3070-45de-a5fa-ffb1c5ddb5b3', protocol='range'}
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Notifying assignor about the new Assignment(partitions=[category-topic-0])
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-9, groupId=categories-group] Adding newly assigned partitions: 
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-8, groupId=categories-group] Adding newly assigned partitions: 
2023-11-30T13:48:32.440-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-10, groupId=categories-group] Adding newly assigned partitions: 
2023-11-30T13:48:32.441-03:00  INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : categories-group: partitions assigned: []
2023-11-30T13:48:32.441-03:00  INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : categories-group: partitions assigned: []
2023-11-30T13:48:32.441-03:00  INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : categories-group: partitions assigned: []
2023-11-30T13:48:32.442-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Adding newly assigned partitions: category-topic-0
2023-11-30T13:48:32.451-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Found no committed offset for partition category-topic-0
2023-11-30T13:48:32.459-03:00  INFO 287799 --- [ategories-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-categories-group-1, groupId=categories-group] Resetting offset for partition category-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}.
2023-11-30T13:48:32.459-03:00  INFO 287799 --- [ategories-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : categories-group: partitions assigned: [category-topic-0]
2023-11-30T13:48:36.241-03:00  INFO 287799 --- [er-event-thread] kafka.controller.KafkaController         : [Controller id=0] Processing automatic preferred replica leader election

Expected :true
Actual   :false
<Click to see difference>

org.opentest4j.AssertionFailedError: expected: <true> but was: <false>
    at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
    at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
    at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
    at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
    at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
    at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180)
    at com.kaua.ecommerce.infrastructure.listeners.CategoryEventListenerTest.givenAValidCategoryCreatedEvent_whenReceive_shouldPersistCategory(CategoryEventListenerTest.java:65)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:578)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
    at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:118)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:93)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:88)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:578)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
    at jdk.proxy2/jdk.proxy2.$Proxy5.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
    at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
    at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
    at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
    at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)

I expect the tests to pass both when I run all the application tests and using jacoco coverage. I continued testing here, replacing latch with completablefuture and in both cases, if the test that uses Kafka runs before the other tests in the application, they pass.

1

There are 1 best solutions below

0
On BEST ANSWER

I solved this by adding @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS). I added it to all my tests, I have annotation for each type and I added the DirtiesContext to them, it solved it, apparently some mock or other test was interfering