I'm processing thousands of data through a stream Connect and process multiple streams consisting of readable and transform

The problem occurs in the pipe that updates the data in the db. The logic of the pipe continues to put the processed data into the variable, and when 5000 is gathered, the flow is paused with stream.pause(), updated in the db, and the variable containing the data is initialized. Resume flow through stream.resume() again

When 7741 data is processed, the log is output as follows

5000 basket length before update
5046 basket length after update
2695 basket length before update
2695 basket length after update

I expect the basket length to be the same before and after the update, but there is a difference

And if you do not run the dbUpdate() function, the number is exactly as follows

5000 basket length before update
5000 basket length after update
2741 basket length before update
2741 basket length after update

Why does this difference occur even after running stream.pause()?

example code

const { promisify } = require('util')
const { Transform, pipeline } = require('stream')

...

let basket = []

const processPipe_1 = () => {
  return new Transform({
    objectMode: true,
    transform(chunk, encoding, done) {
      const processed_chunk = {}
      // Processing ...
      done(null, processed_chunk)
    }
  })
}
const processPipe_2 = () => { ... }
const processPipe_3 = () => { ... }

const dbUpdatePipe = () => {
  return new Transform({
    objectMode: true,
    async transform(chunk, encoding, done) {
      basket.push(chunk)

      if (basket.length === 5000) {
        this.pause()
        console.log(basket.length, 'basket length before update')
        await dbUpdate(basket)
        console.log(basket.length, 'basket length after update')
        basket = []
        this.resume()
      }

      done(null, chunk)
    }
  })
}

await pipeline(promisify)(
  ReadableStream, // object mode, 7741
  processPipe_1,
  processPipe_2,
  processPipe_3,
  dbUpdatePipe, // db update
  writeableStream
)
await dbUpdate(basket) // Update remaining data in the basket
0

There are 0 best solutions below