Create topic in KafkaContainer for Integration Test

437 Views Asked by At

I'm trying to create a topic in my KafkaContainer using @ServiceConnection - when running the application I get an error message when I'm trying to produce to the topic saying Topic myTopic not present in metadata after 1000 ms.

Also seeing: Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected Which makes me think my @ServiceConnection isn't setup correctly.

I am using the @Testcontainers to spin up an SFTP instance, to grab the mounted file. That file then goes through integration flows, until it is produced onto the topic.

Code:

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
@ActiveProfiles("test")
class FullIntegrationTest extends Specification {

    @Bean
    @ServiceConnection
    KafkaContainer kafkaContainer() {
        return new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
                .withEnv("TOPIC_AUTO_CREATE", "true")
    }

I can provide more information as needed. If this is not the correct way of configuring the topics I was wondering if someone can provide me with an example. I have also been following this article, but still couldn't get it configure the new topic.

EDIT: application-test.properties

spring.kafka.properties.schema.registry.url                  = mock://notused
spring.kafka.bootstrap-servers                               = localhost:9092
spring.kafka.properties.sasl.mechanism                       = ""
spring.kafka.properties.security.protocol                    = PLAINTEXT
spring.kafka.properties.basic.auth.credentials.source        = ""

spring.kafka.properties.schema.registry.basic.auth.user.info = ""
spring.kafka.properties.sasl.jaas.config                     = ""
1

There are 1 best solutions below

0
On BEST ANSWER

I had to remove the @ServiceConnection and added Zookeeper, with the env variables.

    @Shared
    static final KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:latest'))
            .withEmbeddedZookeeper()
            .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
            .withEnv("KAFKA_CREATE_TOPICS", Constants.KAFKA_TOPIC + ":1:1")

    @DynamicPropertySource
    static void dynamicProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", () -> kafkaContainer.getBootstrapServers())
        registry.add("engage.server", () -> sftpDockerContainer.getServiceHost("sftp_server", 22))
        registry.add("engage.port", () -> sftpDockerContainer.getServicePort("sftp_server", 22))
    }