How can I sniff the ACK signal from a Kafka broker using Toxiproxy?

41 Views Asked by At

I am working on a JUnit test involving a Spring KafkaTemplate sending a message to a sole Kafka broker, and I want to simulate a network cut upon sending an ACK from the broker to the producer just to get it to retry the delivery. I am trying Toxiproxy, but it seems that it is only able to simulate network issues (latency, network cutoffs) as a whole (meaning: with no regard as to the signal type), but my aim is to work only at ACK signal level, and allowing all other signals to pass. I wonder if Toxiproxy provides a workaround to achieve this or, otherwise, what other sniffing tools can help me.

See code excerpt attached:

@Container
    static ToxiproxyContainer toxiproxy =
            new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.7.0").withNetwork(network);

    @Container
    static GenericContainer zookeeper = new GenericContainer(DockerImageName.parse("zookeeper:3.8.0")) // "confluentinc/cp-zookeeper:latest"
            .withNetwork(network)
            .withNetworkAliases("zookeeper")
            .withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(KafkaContainer.ZOOKEEPER_PORT));

    @Container
    public static KafkaContainer kafka = new ToxicKafkaWithExternalZookeeperContainer(
            "confluentinc/cp-kafka:7.4.0", "broker-1", 9092, 29092)
            .withAdditionalListener(() -> String.format("%s:%s", toxiproxy.getHost(), toxiproxy.getMappedPort(8666)))
            .withNetwork(network)
            .withNetworkAliases("kafka")
            .dependsOn(toxiproxy, zookeeper)
            .withExternalZookeeper("zookeeper:" + KafkaContainer.ZOOKEEPER_PORT)
            .withEnv("KAFKA_BROKER_ID", "1")
            .withEnv("KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT_MS", "1000")
            .withEnv("KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS", "5000");

@DynamicPropertySource
    static void registerDynamicProperties(DynamicPropertyRegistry registry) throws Exception {

        toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());

        proxyKafka1 = toxiproxyClient.createProxy(
                "kafka",
                "0.0.0.0:8666",
                "kafka:19092");

        registry.add("spring.kafka.bootstrap-servers", () -> "PLAINTEXT://%s:%d".formatted(toxiproxy.getHost(), toxiproxy.getMappedPort(8666)));
        registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
    }

@Test
    void whenSendingEventAndNotReceivingACK_RetryAndDuplicateMessageEntails() throws Exception {
        // Given
        var request = PublishEventRequestMother.createEventRequest();
        Consumer<String, Envelope<?>> consumer = consumerFactory.createConsumer();
        consumer.subscribe(Collections.singleton(topicName));

        // When
        proxyKafka1.toxics().bandwidth("CUT_CONNECTION_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
//        proxyKafka1.toxics().latency("CUT_CONNECTION_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);

        kafkaTemplate.send(topicName, envelopeMapper.map(request, (Map<String, Object>) request.getResources())).get(Long.parseLong(timeOutInSeconds), TimeUnit.SECONDS);
//        kafkaTemplate.send(topicName, envelopeMapper.map(request, (Map<String, Object>) request.getResources()));

        // Then
        ConsumerRecords<String, Envelope<?>> records = KafkaTestUtils.getRecords(consumer);
        assertEquals(2, records.count());
        records.forEach(singleRecord -> then(singleRecord.value()).extracting("payload").isEqualTo(request.getPayload()));

        proxyKafka1.toxics().get("CUT_CONNECTION_DOWNSTREAM").remove();

    }
0

There are 0 best solutions below