I'm familiar with Node streams, but I'm struggling on best practices for abstracting code that I reuse a lot into a single pipe step.
Here's a stripped down version of what I'm writing today:
inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);
The actual work happens in transform()
. The only things that really change are inputStream
, transform()
, and outputStream
. Like I said, this is a stripped down version of what I actually use. I have a lot of error handling and logging on each pipe step, which is ultimately why I'm try to abstract the code.
What I'm looking to write is a single pipe step, like so:
inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);
What I'm struggling to understand is how to turn those pipe steps into a single function that accepts a stream and returns a stream. I've looked at libraries like through2 but I'm but not sure how that get's me to where I'm trying to go.
Here's what I ended up going with. I used the through2 library and the streaming API of the csv library to create the pipe function I was looking for.
It's worth noting the part where I remove the event listeners towards the end, this was due to running into memory errors where I had created too many event listeners. I initially tried solving this problem by listening to events with
once
, but that prevented subsequent chunks from being read and passed on to the next pipe step.Let me know if anyone has feedback or additional ideas.