Embedded Kafka Failed to Start After Spring Starter Parent Version 3.1.10

21 Views Asked by At

I'm trying to run some integration tests for Kafka consumer with,

org.springframework.kafka.test.context.EmbeddedKafka

Currently letting spring-boot-starter-parent to do the dependency version management responsibility. Here is the pom.xml file.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
    <java.version>17</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

Here is the Kafka consumer code,

@Configuration
public class KafkaEventListener {

    @RetryableTopic(
            attempts = "#{'${kafka.max.retry.attempts}'}",
            autoCreateTopics = "#{'${kafka.auto.create.retry.topics}'}",
            backoff = @Backoff(
                    delayExpression = "#{'${kafka.retry.init-interval}'}",
                    multiplierExpression = "#{'${kafka.retry.backoff.multiplier}'}"),
            include = { Exception.class },
            timeout = "#{'${kafka.max.retry.duration}'}",
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = DltStrategy.FAIL_ON_ERROR)
    @KafkaListener(topics = "${kafka.topic.test}", groupId = "${kafka.group.id.test}",
            containerFactory = "testKafkaListenerContainerFactory")
    public void listen(@Payload MetadataMessage input, @Header(KafkaHeaders.OFFSET) String offset) {
        System.out.println(input.getValue());
    }

    @DltHandler
    public void deadLetterHandler(@Payload(required = false) MetadataMessage data,
                                  @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        System.out.println(String.format("Event from topic %s has been dead-lettered. Event data : %s", topic, data.toString()));
    }
}

Here is the Kafka configuration class,

@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${kafka.group.id.test}")
    private String groupId;


    private Map<String, Object> consumerFactoryConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        // Disabled kafka auto acknowledgement to gain more flexibility
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        return props;
    }

    @Bean
    public ConsumerFactory<String, MetadataMessage> metadataConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerFactoryConfigs(), new StringDeserializer(),
                new ErrorHandlingDeserializer<>(new JsonDeserializer<>(MetadataMessage.class)));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MetadataMessage> testKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, MetadataMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(metadataConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

Here is the test class,

@SpringBootTest(classes = EmbeddedKafkaApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestPropertySource(locations = { "classpath:application.properties" })
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:29092", "port=29092" })
public class EmbeddedKafkaTest {

    @Autowired
    private KafkaEventListener kafkaEventListener;

    @Test
    public void testKafkaEvent() {
        kafkaEventListener.listen(new MetadataMessage("kafka message from test"), "1", mock(Acknowledgment.class));
    }
}

With spring-boot-starter-parent version 3.1.10 the test is working. However, when I switch to a newer version of spring-boot-starter-parent, the test fails.

I can see these logs when start the application when I ran the test cases,

 .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::               (v3.1.10)

2024-03-31T16:37:37.354+05:30  INFO 26280 --- [           main] k.utils.Log4jControllerRegistration$     : Registered kafka:type=kafka.Log4jController MBean
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   ______                  _                                          
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  |___  /                 | |                                         
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :     / /    ___     ___   | | __   ___    ___   _ __     ___   _ __   
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :    / /    / _ \   / _ \  | |/ /  / _ \  / _ \ | '_ \   / _ \ | '__|
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :   / /__  | (_) | | (_) | |   <  |  __/ |  __/ | |_) | |  __/ | |    
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :  /_____|  \___/   \___/  |_|\_\  \___|  \___| | .__/   \___| |_|
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               | |                     
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     :                                               |_|                     
2024-03-31T16:37:37.448+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : 
2024-03-31T16:37:37.460+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:zookeeper.version=3.6.4--d65253dcf68e9097c6e95a126463fd5fdeb4521c, built on 12/18/2022 18:10 GMT
2024-03-31T16:37:37.460+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:host.name=SADEEP-M.Zone24x7.lk
2024-03-31T16:37:37.460+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.version=17.0.8.1
2024-03-31T16:37:37.460+05:30  INFO 26280 --- [           main] o.a.zookeeper.server.ZooKeeperServer     : Server environment:java.vendor=Amazon.com Inc.

Final log lines of success execution with 3.1.10 version,

2024-03-31T16:37:41.551+05:30  INFO 26280 --- [ner#0-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroupId-dlt: partitions assigned: [testTopic-dlt-0]
2024-03-31T16:37:41.551+05:30  INFO 26280 --- [0-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroupId-retry-1: partitions assigned: [testTopic-retry-1-0]
2024-03-31T16:37:41.551+05:30  INFO 26280 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroupId: partitions assigned: [testTopic-0]
2024-03-31T16:37:41.551+05:30  INFO 26280 --- [0-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : testGroupId-retry-0: partitions assigned: [testTopic-retry-0-0]
kafka message from test

I can see the ZooKeeperServer is running clearly.

But after I changed to the newer version such as 3.2.0 or any latest versions (3.2.4). I cannot see that ZooKeeperServer logs and test also is falling to execute. Here are some logs from that,

2024-03-31T16:46:29.072+05:30  INFO 25024 --- [embedded-kafka] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.6.1
2024-03-31T16:46:29.072+05:30  INFO 25024 --- [embedded-kafka] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 5e3c2b738d253ff5
2024-03-31T16:46:29.072+05:30  INFO 25024 --- [embedded-kafka] [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1711883789072
2024-03-31T16:46:29.075+05:30  INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.075+05:30  WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.191+05:30  INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.191+05:30  WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.300+05:30  INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.
2024-03-31T16:46:29.300+05:30  WARN 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2024-03-31T16:46:29.518+05:30  INFO 25024 --- [embedded-kafka] [| adminclient-2] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-2] Node -1 disconnected.

That Zookeeper logs are also not visible,

/\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.2.4)

2024-03-31T16:46:26.369+05:30  INFO 25024 --- [embedded-kafka] [           main] k.utils.Log4jControllerRegistration$     : Registered kafka:type=kafka.Log4jController MBean
2024-03-31T16:46:26.387+05:30  INFO 25024 --- [embedded-kafka] [           main] org.apache.zookeeper.common.X509Util     : Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2024-03-31T16:46:26.501+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-1] kafka.server.ControllerServer            : Formatting C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0 with metadata.version 3.3-IV0.
2024-03-31T16:46:26.503+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.server.BrokerServer                : [BrokerServer id=0] Transition from SHUTDOWN to STARTING
2024-03-31T16:46:26.503+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-2] kafka.server.ControllerServer            : [ControllerServer id=0] Starting controller
2024-03-31T16:46:26.504+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.server.SharedServer                : [SharedServer id=0] Starting SharedServer
2024-03-31T16:46:26.524+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-2] o.a.k.s.network.EndpointReadyFutures     : authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY.
2024-03-31T16:46:26.596+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$                    : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Loading producer state till offset 0 with message format version 2
2024-03-31T16:46:26.597+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$                    : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Reloading from producer snapshot and rebuilding producer state from offset 0
2024-03-31T16:46:26.597+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.log.UnifiedLog$                    : [LogLoader partition=__cluster_metadata-0, dir=C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0] Producer state recovery took 0ms for snapshot load and 0ms for segment recovery from offset 0
2024-03-31T16:46:26.634+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] kafka.raft.KafkaMetadataLog$             : Initialized snapshots with IDs SortedSet() from C:\Users\sandeepm\AppData\Local\Temp\kafka-16755125128047995461\controller_0\__cluster_metadata-0
2024-03-31T16:46:26.671+05:30  INFO 25024 --- [embedded-kafka] [piration-reaper] ExpirationService$ExpiredOperationReaper : [raft-expiration-reaper]: Starting
2024-03-31T16:46:26.716+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState        : [RaftManager id=0] Completed transition to Unattached(epoch=0, voters=[0], electionTimeoutMs=1955) from null
2024-03-31T16:46:26.722+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState        : [RaftManager id=0] Completed transition to CandidateState(localId=0, epoch=1, retries=1, voteStates={0=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1252) from Unattached(epoch=0, voters=[0], electionTimeoutMs=1955)
2024-03-31T16:46:26.727+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-3] org.apache.kafka.raft.QuorumState        : [RaftManager id=0] Completed transition to Leader(localId=0, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, voterStates={0=ReplicaState(nodeId=0, endOffset=Optional.empty, lastFetchTimestamp=-1, lastCaughtUpTimestamp=-1, hasAcknowledgedLeader=true)}) from CandidateState(localId=0, epoch=1, retries=1, voteStates={0=GRANTED}, highWatermark=Optional.empty, electionTimeoutMs=1252)
2024-03-31T16:46:26.800+05:30  INFO 25024 --- [embedded-kafka] [-kit-executor-2] kafka.network.ConnectionQuotas           : Updated connection-accept-rate max connection creation rate to 2147483647

I'm having trouble with the dependency version and need some assistance. I want to run all my test cases using an embedded Kafka broker without using an external Kafka cluster. Could you please help me with this? Thank you!

0

There are 0 best solutions below