communication of microservices through kafka

74 Views Asked by At

Good afternoon. I'm new in programming, I wanna write project on Fastapi with microservice architecture. I have two microservices. If I understand correctly, I can connect them with kafka, But I don't quite understand how. I read many articles, but still dont understand.

My microservices:

  1. Auth - authorization/authentication with library fastapi-users
  2. Tracker - main app,which have model Expense(column: id[int], source, amount, category_id, user_id). In tracker I write producer with aiokafka:
async def send_expense_info(expense_info):
    producer = AIOKafkaProducer(
        bootstrap_servers="localhost:9092",
        value_serializer=lambda v: json.dumps(v).encode("utf-8")
    )

    await producer.start()
    try:
        await producer.send_and_wait("tracker_to_auth", value=expense_info)
    finally:
        await producer.stop()

In auth consumer:

async def process_expense_messages():
    consumer = AIOKafkaConsumer(
        "tracker_to_auth",
        bootstrap_servers="localhost:9092",
        value_deserializer=lambda v: json.loads(v.decode("utf-8"))
    )

    await consumer.start()
    try:
        async for msg in consumer:
            expense_info = msg.value
            user_id = current_user() # func from fastapi-users
            print(expense_info)
            print(f"Received expense info: {expense_info} for user: ")
    finally:
        await consumer.stop()

Is it possible to do something like this: Send a message to the auth microservice from tracker, after creating the expense object, the auth microservice will transfer the id of the current user and assign the user_id to the column? Or something like that.

0

There are 0 best solutions below