first.js
let data = {}
const ingSelf = this
const prom = await new Promise((resolve, reject) => {
data = inputs.input[0].pipe(through2.obj(function (chunk, enc, next) {
const throughSelf = this
ingestionSelf.myFunction(node, { input: [chunk] }, inputData, request, internalCall).then((resp) => {
if (R.type(resp) === "String") {
resp = JSON.parse(resp)
}
throughSelf.push(resp[0])
resolve(resp)
next()
})
}))
})
if (prom) {
return data
}
Second.js
data.on("data", (chunk) => {
if (R.includes(R.type(chunk), ["Object", "Array"])){
pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
} else {
pushToKafkaQueue(topicName, chunk, request)
}
})
Getting data upto 32 records after that stream stopped. Actual records are 5000. If i write code as below then 5000 records are getting.
data.on("data", (chunk) => {
data.pause();
if (R.includes(R.type(chunk), ["Object", "Array"])){
pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
} else {
pushToKafkaQueue(topicName, chunk, request)
}
setTimeout(() => {
data.resume();
}, 0);
})
But this solution is not proper one. For every record/chunk pause the stream and again resume it immediately. Is there any good solution to resolve this issue in proper way?