Propagating Node.js stream error events to async await style code

2.1k Views Asked by At

I use readable and transform streams which I later consume using for await.

I cannot find a way to process callee's stream errors so they can be caught in the caller function.

For example if transform throws, it results in uncaught error.

If I add on error listener to transform, naturally it doesn't propagate the error to main's catch.

function handler(err) {
  // Log the error here
  // If throw from here it won't be cauch in main
  // How do I propagate it to main?
}

function getStream() {
  // transform1 and transform2 are custom transform streams
  readable.pipe(csvParser).pipe(transform1).pipe(transform2);
  readable.on('error', handler);
  csvParser.on('error', handler);
  transform1.on('error', handler);
  transform2.on('error', handler);
  return transform2;
}

async function main() {
  try {
    const stream = getStream();
    for await(const chunk of stream) {
      // process chunk
    }
  } catch (ex) {
    // how to catch transform errors here?
  }
}

Is there any way to do it?

3

There are 3 best solutions below

3
Rod On

As I understand it async/await is largely syntactic sugar on top of promises. Specifically I think that since ReadableStream.pipe follows an event-based (on("error", errorHandler)) rather than promise-based pattern, the try { ... await ... } catch (ex) { ... } construction isn't going to handle asynchronous errors "thrown" within ReadableStream.pipe as seamlessly as you might hope.

Assuming my understanding is correct (and to be honest I don't use the aysnc/await syntax often, so it's possible I'm missing something too) one straightforward work-around is to fall back to adding a callback-based event handler like this:

function handleError(err) { ... }

readable.once("error", handleError);

// and if you want, re-use that handler within your catch block, like:
// `} catch (ex) { handleError(ex); }`
// but I'm not sure how often that will come up if you follow this pattern

but you may already be aware of that.

Failing that, if you really want to use the aysnc/await/try/catch-style construction in this case you could use something like util.promisify to convert the on("error", handler)-event-based API that ReadableStream is using to promises. This StackOverflow post - How to use Async await using util promisify? - covers that topic in more depth but (IMO) that seems like a lot of hoops to jump through just to avoid adding readable.once("error" /* ...whatever you'd otherwise have in your catch block ... */)

In short, I think because ReadableStream.pipe isn't designed around promises, the async/await syntax isn't enough (in and of itself) to ensure that the asynchronous errors that might be emitted as on-error events are trapped by the try/catch block. You need to handle those errors in the old-school way, either directly (by registering a handler for the error events emitted by your readable, in which case the await and try/catch stuff isn't directly applicable) or "indirectly" (by creating an adapter that makes those emitted-events bubble up like the catch case on a resolved promise, in which case you can make it look like a synchronous-style try/catch using async/await).

8
h-sifat On

I think a transform stream function should not throw an exception. Instead it should emit an error event or pass the error to the call back function.

Here is an example.

Edit

Wrap everything in the transform method with a try catch block. And it propagates to the main function.

const { Transform } = require("stream");

//Added on 1st edit
function handler(ex) {
  console.error("Logged By Handler: ", ex);
}

function getStream() {
  // A simple transform stream to test
  const transform = new Transform({
    transform(chunk, encoding, callback) {
      try {
        chunk = chunk.toString();

        // if chunk == "err" then we want to throw an error
        // to simulate a real life error
        if (chunk === "err\n") 
        return callback("Oops! Sth failed in the transform stream.", null);
        // or this.emit("error", "Oops! Sth failed in the transform stream.");

        this.push(chunk.toUpperCase());

        // Simulating exception
        throw new Error(`Fatal error.`);
        callback();
      } catch (ex) {
        handler(ex);
        callback(ex, null);
      }
    },
  });

  process.stdin.pipe(transform);
  // readable    ->   transform
  return transform;
}

async function main(transform) {
  try {
    for await (const chunk of transform) process.stdout.write(chunk);
  } catch (ex) {
    console.error("Handled by main:", ex);
  }
}

main(getStream());
0
Maximilian Antoni On

I ended up solving this with Promise.race:

function readStream(stream) {
  return Promise.race([
    new Promise((_, reject) => stream.once('error', reject)),
    iterate(stream)
  ]);
}

async function iterate(stream) {
  for await (const object of stream) {
    // ...
  }
}

This will reject with any error from either the stream or the iterate function, or resolve with the result of iterate, whichever comes first.