I want to combine kafka-node and node tail to watch some log files and send any new entry to a kafka cluster. I have the following code which works.
var info={
"zookeepers":"127.0.0.1:9500",
"topics":{
"server":[
"/tmp/server-1.log",
"/tmp/server-2.log"
],
"audit":[
"/tmp/audit-1.log",
"/tmp/audit-2.log"
]
}
}
function dldmProducer (logFiletoTail, fileTopic, zookeepers){
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var client = new kafka.Client(zookeepers);
var producer = new Producer(client);
var partitionNo = 0
producer.on('ready', function () {
var Tail = require('tail').Tail;
var tail = new Tail(logFiletoTail);
tail.on("line", function(data) {
dataArray=data.split("\n")
var payloads = [{ topic: fileTopic, messages: dataArray, partition: partitionNo },];
partitionNo = (partitionNo +1)%3;
producer.send(payloads, function(err, data){
//console.log(payloads);
//console.log(data);
});
});
tail.on("error", function(error) {
//console.log('TAIL ERROR: ', error);
});
});
producer.on('error', function(error){
//console.log('PRODUCER ERROR: ', error)
});
}
for (var topicItem in info['topics']){
for (var i = 0; i<info['topics'][topicItem].length; i++){
dldmProducer(info['topics'][topicItem][i],topicItem,info['zookeepers'])
}
}
But then the problem is, error handling is non-existent, so i'm worried that if things go wrong during producer.send(payloads,function(err,data){})
, the payloads
would just be lost forever. I rewrote the function dldmProducer
to the following trying to catch any unsuccessfully sent payloads and resend them. (if the payloads are successfully sent, the response would be in data
. data
is undefined
otherwise, or at least that's my understanding)
function dldmProducer (logFiletoTail, fileTopic, zookeepers){
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var client = new kafka.Client(zookeepers);
var producer = new Producer(client);
var partitionNo = 0
var buffer = []
var Tail = require('tail').Tail;
var tail = new Tail(logFiletoTail);
tail.on("line", function(data) {
var dataArray=data.split("\n")
console.log(dataArray)
buffer = buffer.concat(dataArray)
});
tail.on("error", function(error) {
console.log('TAIL ERROR: ', error);
});
producer.on('ready', function () {
while (true){
var logItem = buffer.shift()
if (logItem){
console.log(logItem)
var payloads = [{ topic: fileTopic, messages: [logItem], partition: partitionNo },];
partitionNo = (partitionNo +1)%3;
producer.send(payloads, function(error, data){
if (!data){
console.log(data)
buffer.push(logItem)
}
if (error){
console.log('KAFKA RESPONSE ERROR: ', error)
}
});
}
}
});
producer.on('error', function(error){
console.log('PRODUCER ERROR: ', error)
});
}
The function doesn't seem to work. It doesn't even tail on the files correctly and hardly any information from the file is logged on screen even though i'm constantly writing dummy contents into the files. I'm still not used to javascript
kind of thinking so any pointer would be great.