I noticed the usage of NodeJS's pipeline (through the "stream" package) silently crashing consistently without understanding why. Here is a minimal reproduction:
const { pipeline, Readable, Transform } = require('stream');
const util = require('util');
const pipelineAsync = util.promisify(pipeline);
class Stream extends Readable {
constructor(options) {
super(options);
this._rowsToSend = [];
const nbRows = 100;
for (let i = 0; i < nbRows; ++i) {
this._rowsToSend.push(`row n°${i + 1}/${nbRows}`);
}
this.on("error", (error) => {
console.log("STREAM ERROR", error);
});
}
_read(size) {
const row = this._rowsToSend.shift();
if (row) {
console.log(`SENDING: ${row}`);
this.push(this.readableObjectMode ? row : Buffer.from(JSON.stringify(row), this.readableEncoding || undefined));
}
else {
console.log(`END OF STREAM`);
this.push(null);
}
}
_destroy(error, callback) {
this._rowsToSend = [];
console.log("STREAM DESTROYED");
callback(error);
}
}
class LoggerTransform extends Transform {
constructor(options) {
super(options);
this.on('error', (error) => {
console.log("TRANSFORM ERROR: ", error);
});
}
_transform(chunk, encoding, callback) {
console.log(`RECEIVED: ${chunk}`);
callback(null, chunk);
}
_flush(callback) {
console.log("TRANSFORM FLUSHED");
callback();
}
}
(async () => {
try {
await pipelineAsync(
new Stream({ objectMode: true }),
new LoggerTransform({ objectMode: true })
);
console.log("SUCCESS");
}
catch (error) {
console.log("FAILURE", error);
}
})();
I consistently reproduced the issue in Node 10+ (tried on 10, 14 and 18) using Windows (10/11) or Linux (Debian 11/Ubuntu 22.04).
A
Writableobject is required and missing at the end of the pipeline.A simple solution could be using
fs: