I have my kafka running on my docker. When I create a topic from CLI and start producer and consumer on CLI then kafka is able to transfer messages. but when I do same thing with python, then I don't receive any message on consumer side.
My docker-compose file looks like
version: '3'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- kafka-net
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "chatgpt:1:1"
networks:
- kafka-net
expose:
- 9092
networks:
kafka-net:
driver: bridge
producer -
import json
import time
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
producer.send("chatgpt", f"Hello 1", f"key1".encode("utf-8"))
producer.flush()
consumer -
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer("chatgpt", bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
consumer.subscribe(['chatgpt'])
consumer.subscription()
for msz in consumer:
print(msz)
Can anyone suggest what I'm doing wrong?