I use readable and transform streams which I later consume using for await.
I cannot find a way to process callee's stream errors so they can be caught in the caller function.
For example if transform throws, it results in uncaught error.
If I add on error listener to transform, naturally it doesn't propagate the error to main's catch.
function handler(err) {
// Log the error here
// If throw from here it won't be cauch in main
// How do I propagate it to main?
}
function getStream() {
// transform1 and transform2 are custom transform streams
readable.pipe(csvParser).pipe(transform1).pipe(transform2);
readable.on('error', handler);
csvParser.on('error', handler);
transform1.on('error', handler);
transform2.on('error', handler);
return transform2;
}
async function main() {
try {
const stream = getStream();
for await(const chunk of stream) {
// process chunk
}
} catch (ex) {
// how to catch transform errors here?
}
}
Is there any way to do it?
As I understand it async/await is largely syntactic sugar on top of promises. Specifically I think that since ReadableStream.pipe follows an event-based (
on("error", errorHandler)) rather than promise-based pattern, thetry { ... await ... } catch (ex) { ... }construction isn't going to handle asynchronous errors "thrown" within ReadableStream.pipe as seamlessly as you might hope.Assuming my understanding is correct (and to be honest I don't use the aysnc/await syntax often, so it's possible I'm missing something too) one straightforward work-around is to fall back to adding a callback-based event handler like this:
but you may already be aware of that.
Failing that, if you really want to use the aysnc/await/try/catch-style construction in this case you could use something like
util.promisifyto convert theon("error", handler)-event-based API that ReadableStream is using to promises. This StackOverflow post - How to use Async await using util promisify? - covers that topic in more depth but (IMO) that seems like a lot of hoops to jump through just to avoid addingreadable.once("error" /* ...whatever you'd otherwise have in your catch block ... */)In short, I think because ReadableStream.pipe isn't designed around promises, the async/await syntax isn't enough (in and of itself) to ensure that the asynchronous errors that might be emitted as on-error events are trapped by the try/catch block. You need to handle those errors in the old-school way, either directly (by registering a handler for the error events emitted by your
readable, in which case the await and try/catch stuff isn't directly applicable) or "indirectly" (by creating an adapter that makes those emitted-events bubble up like the catch case on a resolved promise, in which case you can make it look like a synchronous-style try/catch using async/await).