Error connecting: write after finish leaving producer unaware of ClientId
My JS can connect and transfer data to Kafka when I use KafkaJS, but it errors. "Connection error: write after end" It makes my Kafka unknow of the clientId,this is not an error of this kind, it only occurred in my producer.
{
"level":"WARN",
"timestamp":"2023-11-29T05:02:58.087Z",
"logger":"kafkajs",
"message":"KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option \"createPartitioner: Partitioners.LegacyPartitioner\". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable \"KAFKAJS_NO_PARTITIONER_WARNING=1\""
}
{
"level":"ERROR",
"timestamp":"2023-11-29T05:02:58.259Z",
"logger":"kafkajs",
"message":"[Connection] Connection error: write after end",
"broker":"localhost:9094",
"clientId":"my-app",
"stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end\n at new NodeError (node:internal/errors:405:5)\n at _write (node:internal/streams/writable:322:11)\n at Writable.write (node:internal/streams/writable:337:10)\n at Object.sendRequest (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\connection.js:409:27)\n at SocketRequest.send [as sendRequest] (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:134:23)\n at SocketRequest.send (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\socketRequest.js:88:10)\n at RequestQueue.sendSocketRequest (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:182:19)\n at RequestQueue.push (C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\requestQueue\\index.js:162:12)\n at C:\\Users\\imed-\\OneDrive\\Kafka-mike\\kafkajs-example\\node_modules\\kafkajs\\src\\network\\connection.js:404:29\n at new Promise (<anonymous>)"
}
this is my sample code
const kafka = new Kafka({
clientId: "my-app",
brokers: ["localhost:9094", "localhost:9095", "localhost:9096"],
logLevel: logLevel.ERROR,
retry: {
initialRetryTime: 100,
retries: 8
},
connectionTimeout: 3000,
});
kafka.logger().setLogLevel(logLevel.WARN)
const topic = "my-topic"; // Replace with your Kafka topic name
// Create a Kafka producer
const producer = kafka.producer();
producer.logger().setLogLevel(logLevel.INFO)
// Produce a message to the topic
const produceMessage = async () => {
await producer.connect();
await producer.send({
topic,
messages: [{ value: "Hello, KafkaJS!" }],
});
await producer.disconnect();
};
// Create a Kafka consumer
// Run the producer and consumer
produceMessage();