Nodejs through2 stop writing streams after 32 chunks due to back pressure

243 Views Asked by At

first.js

let data = {}
  const ingSelf = this
  const prom = await new Promise((resolve, reject) => {
    data = inputs.input[0].pipe(through2.obj(function (chunk, enc, next) {
      const throughSelf = this
      ingestionSelf.myFunction(node, { input: [chunk] }, inputData, request, internalCall).then((resp) => {
        if (R.type(resp) === "String") {
          resp = JSON.parse(resp)
        }
        throughSelf.push(resp[0])
        resolve(resp)
        next()
      })
    }))
  })

  if (prom) {
    return data
  }

Second.js

data.on("data", (chunk) => {
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
    })

Getting data upto 32 records after that stream stopped. Actual records are 5000. If i write code as below then 5000 records are getting.

 data.on("data", (chunk) => {
      data.pause();
      if (R.includes(R.type(chunk), ["Object", "Array"])){
        pushToKafkaQueue(topicName, JSON.stringify(chunk), request)
      } else {
        pushToKafkaQueue(topicName, chunk, request)
      }
      setTimeout(() => {
        data.resume();
      }, 0);
    })

But this solution is not proper one. For every record/chunk pause the stream and again resume it immediately. Is there any good solution to resolve this issue in proper way?

0

There are 0 best solutions below