I am trying to connect to my local kafka. I created a topic "test_producer" and created a producer for it using:
bin/kafka-console-producer.sh --topic test_producer --bootstrap-server localhost:9092
Now i am trying to connect this using node-kafka in my node application. Normal consumer is working and receiving messages but if i create a consumer group it is throwing request timeout exception
Consumer group code:
import { ConsumerGroup } from "kafka-node";
const options = {
kafkaHost: "127.0.0.1:9092",
batch: undefined,
ssl: true,
groupId: "ExampleGroup",
sessionTimeout: 15000,
protocol: ["roundrobin"],
encoding: "utf8",
fromOffset: "latest", // default
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: "earliest",
onRebalance: (isAlreadyMember, callback) => {
callback();
},
};
const consumerGroup = new ConsumerGroup(options, ["test_producer"]);
consumerGroup.on("message", (message) => {
console.log(`group:${message}`);
});
But if i create consumer it is able to connect and receive messages.
Edit:
config/server.properties
############################# Server Basics #############################
broker.id=0
############################# Socket Server Settings #############################
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/home/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enable = true
advertised.listeners=PLAINTEXT://127.0.0.1:9092
listeners = PLAINTEXT://127.0.0.1:9092
offsets.topic.replication.factor
defaults to 3, and you won't be able to use consumer group features without the offsets topic.Edit your broker config to set it to 1, and restart Kafka.
Also, you don't need port and advertised.host.name ; both are deprecated properties