NodeJS — Stream pipeline silently crashes

102 Views Asked by At

I noticed the usage of NodeJS's pipeline (through the "stream" package) silently crashing consistently without understanding why. Here is a minimal reproduction:

const { pipeline, Readable, Transform } = require('stream');
const util = require('util');

const pipelineAsync = util.promisify(pipeline);

class Stream extends Readable {

  constructor(options) {
    super(options);
    this._rowsToSend = [];
    const nbRows = 100;
    for (let i = 0; i < nbRows; ++i) {
      this._rowsToSend.push(`row n°${i + 1}/${nbRows}`);
    }
    this.on("error", (error) => {
      console.log("STREAM ERROR", error);
    });
  }

  _read(size) {
    const row = this._rowsToSend.shift();
    if (row) {
      console.log(`SENDING: ${row}`);
      this.push(this.readableObjectMode ? row : Buffer.from(JSON.stringify(row), this.readableEncoding || undefined));
    }
    else {
      console.log(`END OF STREAM`);
      this.push(null);
    }
  }

  _destroy(error, callback) {
    this._rowsToSend = [];
    console.log("STREAM DESTROYED");
    callback(error);
  }

}

class LoggerTransform extends Transform {

  constructor(options) {
    super(options);
    this.on('error', (error) => {
      console.log("TRANSFORM ERROR: ", error);
    });
  }

  _transform(chunk, encoding, callback) {
    console.log(`RECEIVED: ${chunk}`);
    callback(null, chunk);
  }

  _flush(callback) {
    console.log("TRANSFORM FLUSHED");
    callback();
  }

}

(async () => {
  try {
    await pipelineAsync(
      new Stream({ objectMode: true }),
      new LoggerTransform({ objectMode: true })
    );
    console.log("SUCCESS");
  }
  catch (error) {
    console.log("FAILURE", error);
  }
})();

I consistently reproduced the issue in Node 10+ (tried on 10, 14 and 18) using Windows (10/11) or Linux (Debian 11/Ubuntu 22.04).

1

There are 1 best solutions below

0
theofarus On

A Writable object is required and missing at the end of the pipeline.

A simple solution could be using fs:

const fs = require('fs');
/* ... */
await pipelineAsync(
  new Stream({ objectMode: true }),
  new LoggerTransform({ objectMode: true }),
  fs.createWriteStream("/dev/null")
);