kafka-python not receiving message on consumer side

24 Views Asked by At

I have my kafka running on my docker. When I create a topic from CLI and start producer and consumer on CLI then kafka is able to transfer messages. but when I do same thing with python, then I don't receive any message on consumer side.

My docker-compose file looks like

version: '3'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-net
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "chatgpt:1:1"
    networks:
      - kafka-net
    expose:
      - 9092
networks:
  kafka-net:
    driver: bridge

producer -

import json
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x:
                         json.dumps(x).encode('utf-8'))

producer.send("chatgpt", f"Hello 1", f"key1".encode("utf-8"))
producer.flush()

consumer -

import json
from kafka import KafkaConsumer

consumer = KafkaConsumer("chatgpt", bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         group_id='my-group',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

consumer.subscribe(['chatgpt'])
consumer.subscription()

for msz in consumer:
    print(msz)

Can anyone suggest what I'm doing wrong?

0

There are 0 best solutions below