I'm trying to build this Microservices project But get this error on consumer.py and also vsc says its unable to import confluent_
Here´s the code for consumer.py:
import os
import json
import django
from confluent_kafka import Consumer
from rest_framework.exceptions import ValidationError
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
django.setup()
consumer = Consumer({
'bootstrap.servers': os.environ.get('KAFKA_BOOTSTRAP_SERVER'),
'security.protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'),
'sasl.mechanism': 'PLAIN',
'sasl.username': os.environ.get('KAFKA_USERNAME'),
'sasl.password': os.environ.get('KAFKA_PASSWORD'),
'group.id': os.environ.get('KAFKA_GROUP'),
'auto.offset.reset': 'earliest'
})
consumer.subscribe([os.environ.get('KAFKA_TOPIC')])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
if msg is not None and not msg.error():
topic = msg.topic()
value = msg.value()
data = json.loads(value)
print(f'Got this message with Topic: {topic} and value: {value}, with Data: {data}')
if topic == os.environ.get('KAFKA_TOPIC'):
if msg.key() == b'create_user':
try:
print(f"Order created successfully for user {data['userID']}")
except ValidationError as e:
print(f"Failed to create order for user {data['userID']}: {str(e)}")
consumer.close()
Here's producer.py:
from confluent_kafka import Producer
import os
producer= Producer({
'bootstrap.servers':os.environ.get('KAFKA_BOOTSTRAP_SERVER'),
'security.protocol':'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username':os.environ.get('KAFKA_USERNAME'),
'sasl.password':os.environ.get('KAFKA_PASSWORD'),
})
I have tried downgrading my Python image from 3.12 to 3.10 but still doesnt work
tried changing this
FROM python:3.10 ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 WORKDIR /app COPY requirements.txt /app/requirements.txt RUN pip install -r requirements.txt COPY . /app/ CMD python manage.py runserver 0.0.0.0:8000