I am working on a Node application where I am reading huge amount of data per second and I am releasing the memory as soon as it is not used but after some time the application runs out of the heap memory. How can I overcome this issue?
I have tried to release the memory right after using it. here's the code. I am using WorkerPool and 8 threads.
export const writeSBSData = async (data, loginData) => {
let { id: rowId, fromdate, todate, unit, nm } = data;
let params = {
"reportResourceId": 7320,
"reportTemplateId": 0,
"reportObjectId": unit,
"reportObjectSecId": 0,
"reportObjectIdList": [],
"interval": {
"from": fromdate,
"to": todate,
"flags": 117440512
},
reportTemplate: reportTemplateSBS,
remoteExec: 0,
};
let vehicle_class = null;
let unit_data = await getItem(-1, unit, loginData);
if(unit_data.pflds){
for (let key in unit_data.pflds) {
if (unit_data.pflds[key].n === 'vehicle_class') {
vehicle_class = unit_data.pflds[key].v ? unit_data.pflds[key].v : null;
}
}
}
let report = await executeReport(params, loginData);
let headerIndexMap = {};
report.reportResult.tables[0].header.forEach((header, index) => {
headerIndexMap[header] = index;
});
if (report.reportResult.tables?.length > 0) {
let tableData = await getTableData(0, loginData);
let tableData2 = await getTableData(1, loginData);
let tripDriverCode = tableData[0].c[16] ? getValue(tableData[0].c[16]) : 0;
let delta_speed = 0;
let insertValues = [];
let seat_belt_violations = [];
let harsh_break_violations = [];
let harsh_acceleration_violations = [];
let panic_violations = [];
let night_trip_violations = [];
let ip_box_violations = [];
let external_power_loss_violations = [];
let driving_distance_violations = [];
let rest_violations = [];
let harsh_cornering_violations = [];
for (let i = 0; i < tableData.length; i++) {
let d = tableData[i];
if (tripDriverCode == 0) {
tripDriverCode = d.c[16] ? getValue(d.c[16]) : 0;
}
if (i == 0){
delta_speed = 0;
} else {
delta_speed = getValue(d.c[headerIndexMap["SPEED*"]], 'FLOAT') - getValue(tableData[i-1].c[headerIndexMap["SPEED*"]], 'FLOAT')
}
insertValues.push(createInsertValue(unit, d, nm, delta_speed, vehicle_class, headerIndexMap));
let sbsData = transformSbsData(unit, d, nm, delta_speed, vehicle_class, headerIndexMap);
if(sbsData.panic != 0){
console.log(sbsData.panic);
console.log(sbsData.time_epoch);
}
if(sbsData.panic == 1){
panic_violations.push(sbsData);
}
if(sbsData.night_trip == 1){
night_trip_violations.push(sbsData);
}
if(sbsData.ip_box == 1){
ip_box_violations.push(sbsData);
}
if(sbsData.driving_distance_violation == 1){
driving_distance_violations.push(sbsData);
}
if(sbsData.main_power_event == 1){
external_power_loss_violations.push(sbsData);
}
if(sbsData.speed > 5 && sbsData.rest_violation == 1){
rest_violations.push(sbsData);
}
}
tableData = null;
await Promise.all(tableData2.map(async d => {
await insertTrips(d, tripDriverCode);
}));
tableData2 = null;
if(panic_violations.length > 0){
let violations = await extractViolations(panic_violations, 'panic', 4);
if(violations.length > 0){
let sql = generateSql(violations);
console.log(sql)
await queryDb(sql);
}
}
if(night_trip_violations.length > 0){
let violations = await extractViolations(night_trip_violations, 'Night Driving Violation', 0);
if(violations.length > 0){
let sql = generateSql(violations);
await queryDb(sql);
}
}
if(ip_box_violations.length > 0){
let violations = await extractViolations(ip_box_violations, 'IP Box Tempering Violation', 0);
if(violations.length > 0){
let sql = generateSql(violations);
await queryDb(sql);
}
}
if(driving_distance_violations.length > 0){
let violations = await extractViolations(driving_distance_violations, 'Driving Distance Violation', 0);
if(violations.length > 0){
let sql = generateSql(violations);
await queryDb(sql);
}
}
if(external_power_loss_violations.length > 0){
let violations = await extractViolations(external_power_loss_violations, 'External Power loss Violation', 0);
if(violations.length > 0){
let sql = generateSql(violations);
await queryDb(sql);
}
}
if(rest_violations.length > 0){
let violations = await extractViolations(rest_violations, 'Rest Violation', 60);
if(violations.length > 0){
let sql = generateSql(violations);
await queryDb(sql);
}
}
let sql3 = `select count(*) as count from new_opal_unit_events where unit=${unit} and eventStartTime >='${fromdate}' and eventEndTime <='${todate}' `;
let events = await queryDb(sql3);
if (events[0].count > 0) {
logger.log(`${events[0].count} events are written for unit ${unit}`);
}
tripDriverCode = null;
delta_speed = null;
insertValues = null;
seat_belt_violations = null;
harsh_break_violations = null;
harsh_acceleration_violations = null;
panic_violations = null;
night_trip_violations = null;
ip_box_violations = null;
external_power_loss_violations = null;
driving_distance_violations = null;
rest_violations = null;
harsh_cornering_violations = null;
}
let sql = `UPDATE recache_querybatcher SET success = 1 WHERE id = ${rowId}`;
await queryDb(sql);
};