I have user.json (assume that will be a large file, I want to stream read this file, but limit the chunk size).
[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
},
{
"name": "Brian Flemming",
"occupation": "teacher",
"born": "1967-11-22"
},
{
"name": "Lucy Black",
"occupation": "accountant",
"born": "1995-04-07"
},
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977-10-31"
}
]
My sample code.
const fs = require('fs');
const stream = require('stream');
async function logChunks(readable) {
for await (const chunk of readable) {
console.log('---------start')
console.log(chunk.toString());
console.log('---------end')
}
}
const readStream = fs.createReadStream('users.json', {highWaterMark: 120 })
logChunks(readStream)
The output looks like
---------start
[
{
"name": "John Doe",
"occupation": "gardener",
"born": "1992-03-02"
}
,
{
"name": "Brian Flem
---------end
---------start
ming",
"occupation": "teacher",
"born": "1967-11-22"
}
,
{
"name": "Lucy Black",
"occupation": "ac
---------end
---------start
countant",
"born": "1995-04-07"
}
,
{
"name": "William Bean",
"occupation": "pilot",
"born": "1977
---------end
---------start
-10-31"
}
]
---------end
My goal is to extract the json object from the multiple chunck, so that it can be JSON.parse.
I don't find any JSONStreamParse for node.js, so I hope that I could get some expertise ideas here. Thanks
Update:
I got one option solution is use 3rd party solution. stream-json
await util.promisify(stream.pipeline)(
readStream,
StreamArray.withParser(),
async function( parsedArrayEntriesIterable ){
for await (const {key: arrIndex, value: arrElem} of parsedArrayEntriesIterable) {
console.log("Parsed array element:", arrElem);
}
}
)
I read your update on your question and realized that the comment I left on your question was totally off the point. Since you are using stream you didn't want to wait for all the data to avoid the memory exhaustion. I should have noticed that at the beginning.
Let me give you some examples for my appologies. I hope this helps understanding how to use streams.
To make the samples more realistic, let's simulate fetching json from the remote server like
node-fetchdoes.node-fetchreturns the instance ofReadableStreamthat is alsoasyncIterable. We can create it easily by passing asynchronous generator function tostream.Readable.from()as below.Definition of
fetch()fetch()takes 0.5 sec to fetch the response object. It returns thePromisewhich resolves to the object of whichbodyprovides theReadableStream. This readable stream keeps sending the chunk of json data to downstream every second as defined inasyncGenerator().Our
fetch()function takes an array of chunked json as a parameter instead of URL. Let us use the one you provided but we split it at the slightly different point so after receiving the second chunk, we get the two complete objects.Now, with this data, you can confirm how
fetch()works as follows.Example 1: Testing
fetch()The Output of Example 1.
Now, let's handle each element of this json data without waiting for the whole data to arrive.
StraemArrayis a subclass of stream.Transform. So it has the interface of both theReadableStreamandWritableStream. If stream instances are connected withpipe()you don't have to be worried about the backpressure so we pipe the two streams, ie. theReadableStreamobtained fromfetch()and the instance ofStreamArraytogether asresponse.body.pipe(StreamArray.withParser())in the Example 2 below.The
pipe(StreamArray.withParser())returns the instance ofStreamArrayitself for the method chaining so thepipelinevariable now holds the reference to the transform stream that is also a readable stream. We can attach the event listener to it in order to consume the transformed data.StreamArrayemmitsdataevent when the single object is parsed from the readable source. Sopipiline.on('data', callback)handles chunk by chunk without waiting for the whole json data.When the event listner is registered to the
dataevent withpipiline.on('data', callback), the stream starts to flow.Since we simulate data fetching asynchronously, you can see the
!!!! MAIN THREAD !!!!in the console in the middle of data transmission. You can confirm that the main thread does not get blocked while waiting for the parsed data.Example 2: Testing
stream-jsonprocessing each array element on by one as it arrivesThe Output of Example 2.
Since all streams are instances of
EventEmitteryou can simply attach a callback todataevent to consume the final data as in Example 2. However, it is preferable to usepipe()even for the final data consumption sincepipe()handles the backpressure.Backpressure problem occurs when the data consumption in downstream is slower than the upstream's data feed. For example, when your data handling takes time you might want to handle each chunk asynchronously. If handling next chunk finishes before the previous chunk, the next chunk gets pushed to downstream before the first one. If the dowstream depends on the first chunk before handling the next one, this causes trouble.
When you use the event listner, you have to manually control the pause and resume to avoid the backpressure (see this as an example). However, if you connect the streams with
pipe()the backpressure problem is taken care internally. That means when downstream is slower than the upstream,pipe()will automatically pause the feeding to the downstream.So let's create our own
WritableStreamin order to connect to theStreamArraywithpipe(). In our case we recieve the binary data from the upstream (ie.StreamArray) rather than the string, we have to setobjectModetotrue. We override the_write()function which will internally be called fromwrite(). You put all the data handling logic here and callcallback()upon finishing. The upstream does not feed the next data until the callback is called when streams are connected withpipe().In order to simulate backpressure we process chunk 1 and 3 for 1.5 second and chunk 0 and 4 for zero second below.
Example 3: Piping Our Own Stream Instance
The Output of Example 3.
You can confirm that received data is in order. You can also see that 2nd chunk's transmission starts while processing the first object since we set it to take 1.5 sec. Now, let's do the same thing using the event listener as follows.
Example 4: Backpressure Problem with Simple Callback
The Output of Example 4.
Now, we see that the second element "Brian" arrives before "John". If the processing time is increased to 3 sec for chunk 1 and 3, the last element "William" also arrives before the third one "Lucy".
So it is a good practice to use
pipe()rather than event listeners to consume data when the order of data arrival matters.You might be wondering why the example code in the API doc uses their own
chain()function to make the pipeline . It is the reccomended design pattern for error handling in stream programming in Node. If the error is thrown in the downstream of the pipeline, it does not propagate the error to the upstream. So you have to attach the callback on every stream in the pipeline as follows (here we assume to have three streamsa,b,c).It looks cumbersome compared to the Promise chain which can simply add
.catch()at the end of the chain. Even though we set all the error handlers as above it is still not enough.When an error is thrown at the downstream the error caused stream is dettached from the pipeline with
unpipe(), however, the upstream does not get destroyed automatically. This is because there is a possibility for multiple streams to be connected to the upstream for branching out the stream line. So you have to close all the upper streams from the each error handler by yourself when you usepipe().To solve these problem the community provided the pipeline constructing libraries. I think the
chain()from stream-chain is one of them. Since Node ver.10 the stream.pipeline is added for this functionality. We can use this official pipeline constructor since all the streams instream-jsonare subclass of regular stream instances.Before showing the usage of
stream.pipilinelet's modifyMyObjectConsumerStreamclass to throw an error when the second object is beeing processed.Custom Stream that Throws Error
stream.pipelinetakes mutiple streams in order together with the error handler at the end. The error handler receives the instance ofErrorwhen an error is thrown, and receivesnullwhen successfully finished.Example 5: The Use of
stream.pipelineThe Output of Example 5
When error is thrown,
stream.pipeline()callsstream.destroy(error)on all streams that have not closed or finished properly. So we don't have to be worried about the memory leak.