How I fix the KafkaJS connection error write after end

314 Views Asked by At

Error connecting: write after finish leaving producer unaware of ClientId

My JS can connect and transfer data to Kafka when I use KafkaJS, but it errors. "Connection error: write after end" It makes my Kafka unknow of the clientId,this is not an error of this kind, it only occurred in my producer.

{
"level":"WARN",
"timestamp":"2023-11-29T05:02:58.087Z",
"logger":"kafkajs",
"message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""
}
{
"level":"ERROR",
"timestamp":"2023-11-29T05:02:58.259Z",
"logger":"kafkajs",
"message":"[Connection] Connection error: write after end",
"broker":"localhost:9094",
"clientId":"my-app",
"stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end\n    at new NodeError (node:internal/errors:405:5)\n    at _write (node:internal/streams/writable:322:11)\n    at Writable.write (node:internal/streams/writable:337:10)\n    at Object.sendRequest (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\connection.js:409:27)\n    at SocketRequest.send [as sendRequest] (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:134:23)\n    at SocketRequest.send (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\socketRequest.js:88:10)\n    at RequestQueue.sendSocketRequest (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:182:19)\n    at RequestQueue.push (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:162:12)\n    at C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\connection.js:404:29\n    at new Promise (<anonymous>)"
}

this is my sample code


const kafka = new Kafka({
  clientId: "my-app",
  brokers: ["localhost:9094", "localhost:9095", "localhost:9096"],
  logLevel: logLevel.ERROR,
  retry: {
    initialRetryTime: 100,
    retries: 8
  },
  connectionTimeout: 3000,
});

kafka.logger().setLogLevel(logLevel.WARN)

const topic = "my-topic"; // Replace with your Kafka topic name

// Create a Kafka producer
const producer = kafka.producer();
producer.logger().setLogLevel(logLevel.INFO)

// Produce a message to the topic
const produceMessage = async () => {
  await producer.connect();
  await producer.send({
    topic,
    messages: [{ value: "Hello, KafkaJS!" }],
  });
  await producer.disconnect();
};

// Create a Kafka consumer

// Run the producer and consumer
produceMessage();
0

There are 0 best solutions below