KafkaJS Failed to connect to seed broker when disconnecting producer via loop

66 Views Asked by At

Error encountering: "[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: Client network socket disconnected before secure TLS connection was established","retryCount":0,"retryTime":301}"

Kafka is working fine, I'm able to use producer with no error, but when I start executing inside loop for some testing purposes, I encounter the problem stated above.

And here is another thing, when i remove the producer.disconnect and run again (inside loop again), I got no error.

Can someone explain to me why it is happening ?

For more better understading, here is my code:

Producer:

const { Kafka, Partitioners } = require('kafkajs');
const logger = require('../logger');

class Producer {
  #config;

  #producerInstance;

  /**
   * Kafka Config Object (Producer)
   * @type {
   * {
   *          topic: string,
   *          kafka: object,
   *          producer: object
   *          producerSendOptions: object
   * }
   * }
   */
  constructor(kafkaConfig = {}) {
    this.#config = kafkaConfig;
    this.#producerInstance = this.#initProducer();
  }

  #initProducer() {
    const kafka = new Kafka(this.#config.kafka);
    const producer = kafka.producer({
      ...this.#config?.producer,
      createPartitioner: Partitioners.DefaultPartitioner,
    });
    return producer;
  }

  async produceEvent(events = {}) {
    try {
      await this.#producerInstance.connect();

      await this.#producerInstance.send({
        topic: this.#config.topic,
        acks: -1,
        messages: [events],
      });

      await this.#producerInstance.disconnect();

      return events;
    } catch (error) {
      logger.error(error.message);
    }
  }
}

module.exports = {
  Producer,
};

Somewhere in other files, I'm calling above class like this:

Class TestProducer {
#sampleProducer;

constructor(){
this.#sampleClient = new SampleClient();
}

async testFunction() {
      const sampleMessage = // any message i want to produce
      const iterations = Array.from({ length: 5 });
      for (const _ of iterations) {
        await this.SampleClient.produceEvent(sampleMessage);
      }
 }
}

SampleClient file:

const { Producer } = require('../common/kafka/kafka-producer');
const { CONFIG_KAFKA } = require('../common/kafkaconfig');

class SampleClient extends Producer {
  constructor(config = CONFIG_KAFKA) {
    super(config);
  }
}

module.exports = { SampleClient };

Before posting this, I already tried/test multiple times, and it is consistent it is not just intermittent.

  1. With no loop execution (one time only), I got no error
  2. Inside loop, i got error stated above
  3. Inside loop but i comment the producer.disconnect, I got no error
0

There are 0 best solutions below