I am working on a Node application where I am reading huge amount of data per second and running out of heap memory

27 Views Asked by At

I am working on a Node application where I am reading huge amount of data per second and I am releasing the memory as soon as it is not used but after some time the application runs out of the heap memory. How can I overcome this issue?

I have tried to release the memory right after using it. here's the code. I am using WorkerPool and 8 threads.

export const writeSBSData = async (data, loginData) => {
    let { id: rowId, fromdate, todate, unit, nm } = data;

    let params = {
      "reportResourceId": 7320,
      "reportTemplateId": 0,
      "reportObjectId": unit,
      "reportObjectSecId": 0,
      "reportObjectIdList": [],
      "interval": {
        "from": fromdate,
        "to": todate,
        "flags": 117440512
      },
      reportTemplate: reportTemplateSBS,
      remoteExec: 0,
    };

    let vehicle_class = null;
    let unit_data = await getItem(-1, unit, loginData);

    if(unit_data.pflds){
      for (let key in unit_data.pflds) {
        if (unit_data.pflds[key].n === 'vehicle_class') {
          vehicle_class = unit_data.pflds[key].v ? unit_data.pflds[key].v : null;
        }
      }
    }
  
    let report = await executeReport(params, loginData);
  
    let headerIndexMap = {};
    report.reportResult.tables[0].header.forEach((header, index) => {
      headerIndexMap[header] = index; 
    });
      
    if (report.reportResult.tables?.length > 0) {
      let tableData = await getTableData(0, loginData);
      let tableData2 = await getTableData(1, loginData);
      let tripDriverCode = tableData[0].c[16] ? getValue(tableData[0].c[16]) : 0;
      let delta_speed = 0;
      let insertValues = [];
      let seat_belt_violations = [];
      let harsh_break_violations = [];
      let harsh_acceleration_violations = [];
      let panic_violations = [];
      let night_trip_violations = [];
      let ip_box_violations = [];
      let external_power_loss_violations = [];
      let driving_distance_violations = [];
      let rest_violations = [];
      let harsh_cornering_violations = [];
  
      for (let i = 0; i < tableData.length; i++) {
        let d = tableData[i];
        if (tripDriverCode == 0) {
          tripDriverCode = d.c[16] ? getValue(d.c[16]) : 0;
        }
        if (i == 0){
          delta_speed = 0;
        } else {
          delta_speed =  getValue(d.c[headerIndexMap["SPEED*"]], 'FLOAT') - getValue(tableData[i-1].c[headerIndexMap["SPEED*"]], 'FLOAT')
        }
      
        insertValues.push(createInsertValue(unit, d, nm, delta_speed, vehicle_class, headerIndexMap));

        let sbsData = transformSbsData(unit, d, nm, delta_speed, vehicle_class, headerIndexMap);

       
        if(sbsData.panic != 0){
          console.log(sbsData.panic);
          console.log(sbsData.time_epoch);
        }

        if(sbsData.panic == 1){
          panic_violations.push(sbsData);
        }

        if(sbsData.night_trip == 1){
          night_trip_violations.push(sbsData);

        }

        if(sbsData.ip_box == 1){
          ip_box_violations.push(sbsData);
        }

        if(sbsData.driving_distance_violation == 1){
          driving_distance_violations.push(sbsData);
        }

        if(sbsData.main_power_event == 1){
          external_power_loss_violations.push(sbsData);
        }

        if(sbsData.speed > 5 && sbsData.rest_violation == 1){
          rest_violations.push(sbsData);
        }
      }

      tableData = null; 
  
      await Promise.all(tableData2.map(async d => {
        await insertTrips(d, tripDriverCode);
      }));

      tableData2 = null; 

      if(panic_violations.length > 0){
        let violations = await extractViolations(panic_violations, 'panic', 4);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          console.log(sql)
          await queryDb(sql);
        }
      }

      if(night_trip_violations.length > 0){
        let violations = await extractViolations(night_trip_violations, 'Night Driving Violation', 0);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          await queryDb(sql);
        }
      }

      if(ip_box_violations.length > 0){
        let violations = await extractViolations(ip_box_violations, 'IP Box Tempering Violation', 0);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          await queryDb(sql);
        }
      }

      if(driving_distance_violations.length > 0){
        let violations = await extractViolations(driving_distance_violations, 'Driving Distance Violation', 0);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          await queryDb(sql);
        }
      }

      if(external_power_loss_violations.length > 0){
        let violations = await extractViolations(external_power_loss_violations, 'External Power loss Violation', 0);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          await queryDb(sql);
        }
      }

      if(rest_violations.length > 0){
        let violations = await extractViolations(rest_violations, 'Rest Violation', 60);
        
        if(violations.length > 0){
          let sql = generateSql(violations);
          await queryDb(sql);
        }
      }
  
     let sql3 = `select count(*) as count from new_opal_unit_events where unit=${unit} and eventStartTime >='${fromdate}'  and eventEndTime <='${todate}' `;
      let events = await queryDb(sql3);
        
      if (events[0].count > 0) {
        logger.log(`${events[0].count} events are written for unit ${unit}`);
      }

      tripDriverCode = null;
      delta_speed = null;
      insertValues = null;
      seat_belt_violations = null;
      harsh_break_violations = null;
      harsh_acceleration_violations = null;
      panic_violations = null;
      night_trip_violations = null;
      ip_box_violations = null;
      external_power_loss_violations = null;
      driving_distance_violations = null;
      rest_violations = null;
      harsh_cornering_violations = null;
    }
  
    let sql = `UPDATE recache_querybatcher SET success = 1 WHERE id = ${rowId}`;
    await queryDb(sql);
  };
0

There are 0 best solutions below