How to filter an array of objects stored as a .json.gz on S3 using Streams chunks in nodejs

173 Views Asked by At

Let's say the json array stored in s3 is person details such as: [{"name":'A', "lastName":'A', age: 18}, {"name":'B', "lastName":'B', age: 20}, ...] This file could be extremely large and I would like to optimize memory usage and use Streams to filter data instead of loading the whole file into memory and filtering it. I am not sure I entirely understand how "objectMode" works here.

I have tried the following which fails as console log prints chunks as equally sized bytes of string and not group of objects despite using objectMode: true:

const filteredData = [];
const filterTransform = new Transform({
      objectMode: true,
      transform(chunk, _, callback) {
          console.log("chunk : \n"+chunk);
          try {
               const filteredData = chunk.map((item: any) => ({
                  name: item.name,
                  lastName: item.lastName,
             }));
             filteredData.push(JSON.stringify(filteredData));
           } catch (err) {
                callback(err);
           }
           callback();
       },
});
const client = getS3Client();
const command = new GetObjectCommand({
    Bucket: bucket,
    Key: key,
});
const data:GetObjectCommandOutput= await client.send(command);
const readStream = (dataSingle.Body! as Readable)
      .pipe(zlib.createGunzip())
      .pipe(filterTransform)

Sample output is chunk 1 "{name:'A', lastName:'" chunk 2 "A'}, {name:'B', lastN" chunk 3 and so on..

But I expect: chunk 1 \[{"name":'A', "lastName":'A'}, {"name":'B', "lastName":'B'}\] chunk 2 \[{"name":'C', "lastName":'C'}, {"name":'D', "lastName":'D'}\]

... How do I get the chunk to be counted as a list of objects instead of bytes?

1

There are 1 best solutions below

0
On

To filter an array of objects stored as a .json.gz on S3 using Streams chunks in Node.js, you need to make sure that your stream is in object mode and that the chunks emitted by the stream are complete objects.

In your code, it looks like you are trying to transform the data using a Transform stream, but you are not handling the chunks correctly. When working with streams in object mode, each chunk should represent a complete object. If the objects are split across multiple chunks, you need to buffer the chunks until you have a complete object before processing it.

Here's an example of how you could modify your code to filter the data correctly:

const filterTransform = new Transform({
  objectMode: true,
  transform(chunk, _, callback) {
    try {
      const data = JSON.parse(chunk);
      const filteredData = data.map((item) => ({
        name: item.name,
        lastName: item.lastName,
      }));
      callback(null, JSON.stringify(filteredData));
    } catch (err) {
      callback(err);
    }
  },
});

const client = getS3Client();
const command = new GetObjectCommand({
  Bucket: bucket,
  Key: key,
});

const data = await client.send(command);
const readStream = data.Body!.pipe(zlib.createGunzip()).pipe(filterTransform);

// consume the filtered data
readStream.on("data", (chunk) => {
  console.log(chunk);
});

readStream.on("error", (err) => {
  console.error(err);
});

readStream.on("end", () => {
  console.log("Done");
});

Lets try using a Transform stream to parse and filter the data for example. We set objectMode: true to indicate that the stream should work with complete objects. In the transform method, we parse the incoming JSON string and filter the data. We then call the callback function with the filtered data as a JSON string.

When consuming the stream, we have to listen for the data event, which will emit the filtered data as complete objects. We could also write the filtered data to a file or another stream if needed according to the requirments

Note

In the example above, we assume that each chunk emitted by the stream is a complete JSON object. If the chunks are not complete objects, you'll need to buffer the chunks until you have a complete object before processing it.