Spring Cloud Stream Multiple Kafka Clusters Configuration

83 Views Asked by At

In my project, I need to connect to two different Kafka brokers.

My application.yaml looks somewhat like this:

spring:
  cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093

But it didn't work when I ran the application, this caused the following error:

2024-03-05T23:35:48.473-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 31 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:49.595-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 32 due to node 1001 being disconnected (elapsed time since creation: 5ms, elapsed time since send: 5ms, request timeout: 3600000ms)
2024-03-05T23:35:50.727-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Node 1001 disconnected.
2024-03-05T23:35:50.728-03:00  INFO 25569 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-1] Cancelled in-flight API_VERSIONS request with correlation id 33 due to node 1001 being disconnected (elapsed time since creation: 4ms, elapsed time since send: 4ms, request timeout: 3600000ms)
2024-03-05T23:35:51.086-03:00  INFO 25569 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager         : [AdminClient clientId=adminclient-1] Metadata update failed

org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: fetchMetadata

I want to separate the Kafka topics in two clusters:

  • kafka-one containing the order-created and order-created-dlq
  • kafka-two containing the order-processed and order-processed-dlq

I using:

  • Spring Boot 3.2.3
  • Spring Cloud 2023.0.0

I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.

How adjust this?

2

There are 2 best solutions below

0
Jonathan Henrique Medeiros On BEST ANSWER

The principal related problem has been caused by incorrect configurations on the docker containers.

Internally, in the docker network, all containers of Kafka were running on the 9092 port and it caused a don't expect behavior!

Old docker-compose.yaml:

version: "3.9"

services:

  spot.zookeeper.one:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.one
    restart: "no"
    hostname: spot.zookeeper.one
    ports:
      - "2282:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-one
    volumes:
      - zookeeper_data_one:/bitnami

  spot.broker.one:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.one
    hostname: spot.broker.one
    restart: "no"
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.one
    networks:
      - spot-network-one
    volumes:
      - kafka_data_one:/bitnami

  spot.zookeeper.two:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.two
    restart: "no"
    hostname: spot.zookeeper.two
    ports:
      - "2283:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-two
    volumes:
      - zookeeper_data_two:/bitnami

  spot.broker.two:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.two
    hostname: spot.broker.two
    restart: "no"
    ports:
      - "9093:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.two
    networks:
      - spot-network-two
    volumes:
      - kafka_data_two:/bitnami

  spot.kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: spot.kafka-ui
    restart: "no"
    environment:
      KAFKA_CLUSTERS_0_NAME: spot-one
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
      KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
      KAFKA_CLUSTERS_1_NAME: spot-two
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19092
      KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
    ports:
      - "8580:8080"
    depends_on:
      - spot.zookeeper.one
      - spot.broker.one
      - spot.zookeeper.two
      - spot.broker.two
    networks:
      - spot-network-one
      - spot-network-two

networks:
  spot-network-one:
    driver: bridge
  spot-network-two:
    driver: bridge

volumes:
  zookeeper_data_one:
    driver: local
  kafka_data_one:
    driver: local
  zookeeper_data_two:
    driver: local
  kafka_data_two:
    driver: local

The adjusted docker-compose.yaml:

version: "3.9"

services:

  spot.zookeeper.one:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.one
    restart: "no"
    hostname: spot.zookeeper.one
    ports:
      - "2282:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-one
    volumes:
      - zookeeper_data_one:/bitnami

  spot.broker.one:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.one
    hostname: spot.broker.one
    restart: "no"
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19092,LISTENER_DOCKER_EXTERNAL://:9092"
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.one:19092,LISTENER_DOCKER_EXTERNAL://localhost:9092"
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.one:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9092"
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.one
    networks:
      - spot-network-one
    volumes:
      - kafka_data_one:/bitnami

  spot.zookeeper.two:
    image: docker.io/bitnami/zookeeper:3.7
    container_name: spot.zookeeper.two
    restart: "no"
    hostname: spot.zookeeper.two
    ports:
      - "2283:2181"
    environment:
      ALLOW_ANONYMOUS_LOGIN: "yes"
    networks:
      - spot-network-two
    volumes:
      - zookeeper_data_two:/bitnami

  spot.broker.two:
    image: docker.io/bitnami/kafka:3
    container_name: spot.broker.two
    hostname: spot.broker.two
    restart: "no"
    ports:
      - "9093:9093" # modified here
    environment:
      KAFKA_CFG_LISTENERS: "LISTENER_DOCKER_INTERNAL://:19093,LISTENER_DOCKER_EXTERNAL://:9093" # modified here
      KAFKA_CFG_ADVERTISED_LISTENERS: "LISTENER_DOCKER_INTERNAL://spot.broker.two:19093,LISTENER_DOCKER_EXTERNAL://localhost:9093" # modified here
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT"
      KAFKA_CFG_ZOOKEEPER_CONNECT: "spot.zookeeper.two:2181"
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_CFG_PORT: "9093" # modified here
      KAFKA_INTER_BROKER_LISTENER_NAME: "LISTENER_DOCKER_INTERNAL"
      ALLOW_PLAINTEXT_LISTENER: "yes"
    depends_on:
      - spot.zookeeper.two
    networks:
      - spot-network-two
    volumes:
      - kafka_data_two:/bitnami

  spot.kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: spot.kafka-ui
    restart: "no"
    environment:
      KAFKA_CLUSTERS_0_NAME: spot-one
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: spot.broker.one:19092
      KAFKA_CLUSTERS_0_ZOOKEEPER: spot.zookeeper.one:2181
      KAFKA_CLUSTERS_1_NAME: spot-two
      KAFKA_CLUSTERS_1_BOOTSTRAPSERVERS: spot.broker.two:19093 # modified here
      KAFKA_CLUSTERS_1_ZOOKEEPER: spot.zookeeper.two:2181
    ports:
      - "8580:8080"
    depends_on:
      - spot.zookeeper.one
      - spot.broker.one
      - spot.zookeeper.two
      - spot.broker.two
    networks:
      - spot-network-one
      - spot-network-two

networks:
  spot-network-one:
    driver: bridge
  spot-network-two:
    driver: bridge

volumes:
  zookeeper_data_one:
    driver: local
  kafka_data_one:
    driver: local
  zookeeper_data_two:
    driver: local
  kafka_data_two:
    driver: local

All adjusted points have been marked with a # modified here comment on the line of the file.

5
sobychacko On

I think there are some issues with your Kafka setup defined here. Instead of that, I used the ones specified by Spring Cloud Stream provided for testing needs. That one gives a 3-node cluster with localhost:9091, localhost:9092, and localhost:9093. You use the ones running on 9092 and 9093 in your app in the feature/multiple-kafka-brokers branch. With those nodes used as Kafka brokers, I don't see any errors when running the app. When running the CURL command that you have in the README, I see the following output in my console:

2024-03-07T15:20:55.577-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"89c6ea7a-6fe5-4fa9-91c6-733a5f603b10","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":1000.00,"status":"REJECTED"}
2024-03-07T15:22:25.885-05:00  INFO 64186 --- [container-0-C-1] c.c.s.s.i.o.l.OrderProcessedListener     : {"order_id":"cf22490f-6d4c-477a-99ba-2088cee07804","customer_id":"36a8ea26-4eb0-4b9d-b609-d095175a2f7b","value":400.00,"status":"APPROVED"}

Therefore, that tells me there is nothing wrong with the code or configuration in application.yaml; rather, it indicates some config/connectivity issues on the Kafka brokers themselves, possibly because of how you set them up in your docker-compose script. I would suggest start looking at there and see if you see any issues.