Need help to fix mongodb transaction issue while inserting 150k records
We have car dataset which contain 150k records. While inserting whole data I am dividing data into 2k chunk to reduce the server load. I am using mongodb atlas free trial to store data and server config 4 core CPU and 8 GB RAM.
While inserting chunks I am using mongodb transaction and writable stream to reduce server load. Using socket.io to pass data in chunk to backend
Frontend Code for for uploading data
import { useState, useEffect } from "react";
import { useNavigate } from "react-router-dom";
const batchSize = 1000;
const UploadCarForm = ({ socket }) => {
const [validData] = useState(data);
const [progress, setProgress] = useState(0);
const [isLoading, setLoading] = useState(false);
const [totalRowsProcessed, setTotalRowsProcessed] = useState(0);
const navigate = useNavigate();
useEffect(() => {
// Listen for 'progress' events from the server
const handleProgressUpdate = async (update) => {
// Handle error or completion messages received from the backend
if (update.error) {
// Display error message to the user
showSnackbar({
type: "error",
message: update.error,
});
setLoading(false);
// setProgress(0);
setTotalRowsProcessed(0);
}
if (update.complete) {
const processedRows = totalRowsProcessed + update.totalRows;
setTotalRowsProcessed(processedRows);
const progress = Math.floor((processedRows / validData.length) * 100);
setProgress(progress);
if (processedRows === validData.length) {
showSnackbar({
message: update.message,
});
setLoading(false);
setTimeout(() => {
navigate("/manage-data");
}, 2000);
} else {
// if loading is continue
if (isLoading) {
await new Promise((resolve) => setTimeout(resolve, 100));
const batchRows = validData.slice(
processedRows,
processedRows + batchSize
);
socket.emit("upload", {
rows: batchRows,
rowIndex: processedRows + batchRows.length,
});
}
}
}
};
socket.on("progress", handleProgressUpdate);
// Clean up the event listener when the component unmounts
return () => {
socket.off("progress", handleProgressUpdate);
};
}, [
totalRowsProcessed,
showSnackbar,
socket,
validData,
navigate,
isLoading,
]);
const onSubmit = async () => {
try {
setLoading(true);
setProgress(0);
setTotalRowsProcessed(0);
const batchRows = validData.slice(totalRowsProcessed, batchSize);
socket.emit("upload", {
rows: batchRows,
rowIndex: batchRows.length,
});
} catch (err) {
helpers.setStatus({ success: false });
helpers.setErrors({ submit: err.message });
helpers.setSubmitting(false);
}
}
return (
<button
onClick={onSubmit}
type="submit"
>
Upload File
</button>
);
};
export default UploadCarForm;
Backend code for uploading chunk data
export async function insertDataWithStreamSocket(rows, socket) {
let totalRows = rows.length;
const writableStream = new Writable({
objectMode: true,
write: async (chunk, encoding, callback) => {
const session = await mongoose.startSession({
sessionOptions: {
transactionLifetimeLimitSeconds: 3600 // 1 hour
}
});
session.startTransaction();
try {
const bulkOps = chunk.map(row => {
const { make, model, grade, state, fromDate, toDate, engine, filter, price } = row;
return {
updateOne: {
filter: { make, model, grade, state, fromDate, toDate },
update: { engine, filter, price },
upsert: true
}
};
});
await Car.bulkWrite(bulkOps, { session });
await session.commitTransaction();
callback();
} catch (error) {
console.error('Error during transaction:', error);
await session.abortTransaction();
callback(error);
} finally {
session.endSession();
}
}
});
writableStream.on('error', error => {
socket.emit('progress', {
error: 'Database operation error occurred. Please try again later.'
});
});
writableStream.write(rows);
writableStream.end(() => {
console.log('Data insertion completed.');
socket.emit('progress', {
progress: 100,
message: 'Data set uploaded successfully.',
complete: true,
totalRows: totalRows
});
});
}
Mongodb Error
Error writing data: MongoBulkWriteError: operation was interrupted because the transaction exceeded the configured 'transactionLifetimeLimitSeconds'
I had already set transactionLifetimeLimitSeconds: 3600 // 1 hour
Sample record :
{
"fromDate": "2022-12-31T18:30:00.000Z"
"grade": "Ascent Sport",
"make": "Toyota",
"model": "Yaris",
"state": "VIC",
"toDate": "2022-12-31T18:30:00.000Z"
"engine": "1.5 Petrol FWD Auto",
"filter": "Y",
"price": 28607,
}
I am expecting to insert all the records without failed.