NestJS API gateway sending message to Kafka

941 Views Asked by At

I was doing some POCs and working on trying to have a basic thing working but I hit a wall really fast, I created a docker container with Kafka and zookeeper using the bitnami images, they are up right now and working. Then I went and created a gateway to send messages to Kafka so a microservice could later receive them and consume them.

This is the service on the api gateway, its a basic thing.

@Injectable()
export class BankService implements IBankService, OnModuleInit {

    constructor(
        @Inject('BANK KAFKA SERVICE') private readonly _bankKafka: ClientKafka
    ) {}

    onModuleInit() {
        this._bankKafka.subscribeToResponseOf('bank_create');
    }

    async createBank(dto: BankRequestDto) {
        console.log(dto);
        let _res = await this._bankKafka.send('bank_create', dto);
        return _res;
    }
}

And when I send the message here I get this error:

[Nest] 46679  - 11/14/2022, 8:57:31 AM     LOG [NestApplication] Nest application successfully started +1ms
{ name: 'INTERBANK' }
[Nest] 46679  - 11/14/2022, 9:31:49 AM   ERROR [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2022-11-14T14:31:49.701Z","logger":"kafkajs","broker":"localhost:9092","clientId":"bank-client","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":91}
[Nest] 46679  - 11/14/2022, 9:31:50 AM     LOG [ClientKafka] INFO [Consumer] Starting {"timestamp":"2022-11-14T14:31:50.055Z","logger":"kafkajs","groupId":"bank-consumer-client"}
[Nest] 46679  - 11/14/2022, 9:31:50 AM     LOG [ClientKafka] INFO [ConsumerGroup] Consumer has joined the group {"timestamp":"2022-11-14T14:31:50.155Z","logger":"kafkajs","groupId":"bank-consumer-client","memberId":"bank-client-7cf98aae-7e0b-4473-a103-098bad295d2f","leaderId":"bank-client-7cf98aae-7e0b-4473-a103-098bad295d2f","isLeader":true,"memberAssignment":{"bank_create.reply":[0]},"groupProtocol":"NestReplyPartitionAssigner","duration":97}
[Nest] 46679  - 11/14/2022, 9:31:50 AM    WARN [ClientKafka] WARN [undefined] KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1" {"timestamp":"2022-11-14T14:31:50.156Z","logger":"kafkajs"}
[Nest] 46679  - 11/14/2022, 9:31:50 AM   ERROR [ClientKafka] ERROR [Connection] Response Metadata(key: 3, version: 6) {"timestamp":"2022-11-14T14:31:50.237Z","logger":"kafkajs","broker":"localhost:9092","clientId":"bank-client","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":85}

So I was wondering if I am missing something out from the Kafka side because all the videos I saw on this never touched the docker Kafka side of things.

0

There are 0 best solutions below