Archiver zip multiple csv files stream not working on AWS lambda but works locally

44 Views Asked by At

What we are doing is calling this stream CSV API from our frontend on button click but due to the possibility of having a very large CSV dataset of more than 5 million records and Excel-like platforms has some limitations on how much csv file size it can open it correctly so need to split the data in multiple CSVs and then zipping it all together and sending it as a stream chunk due to memory heap and timeout issues of lambda. I confirm this code works fine locally all the things but in lambda, the file gets downloaded but when try to open it we get an "unexpected end of zip" error.

Please find the code for reference -

import stream, { PassThrough } from "node:stream";

import archiver from "archiver";
import { Response } from "express";
import { getSqlQueryData } from "queries";
import { Metric } from "types";

const MAX_RECORDS_PER_CSV = 1_000_000; // 1 million
const LIMIT = 10_000;
const LIMIT_EXCEED = "LIMIT_EXCEED";

const streamCsv = async (res: Response, metric: Metric) => {
  try {
    console.log("start");
    res.setHeader(
      "Content-Disposition",
      `attachment; filename="${metric.name}_${Date.now()}.zip"`
    );
    res.setHeader("Content-Type", "application/zip");

    console.log("metric", metric);

    const archive = archiver("zip", {
      zlib: { level: 9 }, // Set compression level (optional)
    });

    // Pipe the archive to the response stream
    archive.pipe(res);

    let currentRecords = 0;
    let fileCount = 1;
    let totalCount = 0;
    let currentCsv: null | PassThrough = null;
    let modifiedQuery = metric.query;
    let rowCount = 0;
    const customLimit = `LIMIT ${LIMIT}`;

    // Add a limit to the query if it doesn't already have one.
    if (metric.query.toLowerCase().includes("limit")) {
      modifiedQuery = metric.query.replace(/limit\s+\d+/i, customLimit);
    } else {
      modifiedQuery += ` ${customLimit}`;
    }

    // Fetch rows in batches until there are no more rows left.
    do {
      // eslint-disable-next-line no-await-in-loop
      const { data: result } = await getSqlQueryData({
        integrationName: metric.integrationName,
        integrationId: metric.companyIntegrationId,
        query: `${modifiedQuery} OFFSET ${totalCount}`,
        disableStringify: true,
      });
      console.log(`${modifiedQuery} OFFSET ${totalCount}`);

      console.log(typeof result);

      // If the result has an error message, return an error object.
      if (result.code || result.message) {
        if (result.code === LIMIT_EXCEED) {
          // If the query exceeds the row limit, reduce the limit by half and execute the query again.
          modifiedQuery = modifiedQuery.replace(
            /limit\s+\d+/i,
            `LIMIT ${Math.floor(LIMIT / 2)}`
          );
          // eslint-disable-next-line no-continue
          continue;
        }
        currentCsv?.end();
        archive.finalize();
        throw new Error(
          "Something went wrong while querying data, please retry later"
        );
      }

      if (!Array.isArray(result)) {
        currentCsv?.end();
        archive.finalize();
        throw new Error(
          "Something went wrong while querying data, please retry later"
        );
      }

      if (currentRecords === 0 || currentRecords >= MAX_RECORDS_PER_CSV) {
        currentCsv?.end();
        console.log("new csv appended");
        // Create a new CSV file when starting or when the record limit is reached
        const headers = ["s.no.", ...Object.keys(result[0] || {})].join(",");
        currentCsv = new stream.PassThrough();
        archive.append(currentCsv, { name: `${metric.name}_${fileCount}.csv` });
        currentCsv.write(`${headers} \n`); // Add CSV header row
        currentRecords = 0;
        // eslint-disable-next-line no-plusplus
        fileCount++;
      }

      // Append rows to the current CSV file
      // eslint-disable-next-line @typescript-eslint/no-loop-func
      result.forEach((row: Record<string, unknown>, rowIndex: number) => {
        const csvRow = [totalCount + rowIndex + 1, ...Object.values(row)]
          .map((value) => {
            if (typeof value === "string" && value.includes(",")) {
              return `"${value}"`;
            }
            return value;
          })
          .join(",");
        currentCsv?.write(`${csvRow} \n`); // Add CSV row and newline
        // eslint-disable-next-line no-plusplus
        currentRecords++;
        // eslint-disable-next-line no-plusplus
        totalCount++;
      });

      rowCount = result.length;
    } while (rowCount > 0);

    currentCsv?.end();

    // Finalize the archive and send it to the response stream
    console.log("finalize", totalCount);

    await archive.finalize();
  } catch (error) {
    console.error("Error generating and serving ZIP archive:", error);
    res.status(500).end();
  }
};

export default streamCsv;

I appreciate your time for looking into this and your valuable suggestion. Thank you!

0

There are 0 best solutions below