I am creating a Lambda Function which gets data from s3 bucket and stream it to fast-csv for parsing. After that, I need to connect to documentDB database to send those parsed data.
But the problem is that sometimes the database connection function runs before the parse function and throws blank array and parsed function dont run sometime or vice-versa.
So, how can I run the parse function (parserFcn function) always before the database connection and send function (connectToDb function) so that it can get data from the parse function.
Here is the code -
const AWS = require("aws-sdk");
const fs = require("fs");
const csv = require("@fast-csv/parse");
const MongoClient = require("mongodb").MongoClient;
const s3 = new AWS.S3();
exports.handler = async (event, context, callback) => {
const bucketName = event.Records[0].s3.bucket.name;
const keyName = event.Records[0].s3.object.key;
console.log("Bucket Name->", JSON.stringify(bucketName));
console.log("Bucket key->", JSON.stringify(keyName));
var params = {
Bucket: bucketName,
Key: keyName,
};
var parsedData = [];
const s3Contents = s3.getObject(params).createReadStream();
let parserFcn = new Promise((resolve, reject) => {
const parser = csv
.parseStream(s3Contents, { headers: true })
.on("data", function (data) {
parsedData.push(data);
})
.on("end", (rowCount) => {
console.log(`Parsed ${rowCount} rows`);
resolve(parsedData);
})
.on("error", function () {
reject("csv parse process failed");
});
return parser;
});
let connectToDb = new Promise((resolve, reject) => {
var client = MongoClient.connect(
"mongodb://user:pass@host/?ssl=true&retryWrites=false",
{
tlsCAFile: `/opt/rds-combined-ca-bundle.pem`, //Specify the DocDB; cert
},
function (err, client) {
if (err) {
throw err;
} else {
console.log("connected ");
}
console.log("parsedData inside conn ", parsedData);
// Specify the database to be used
db = client.db("database-name");
// Specify the collection to be used
col = db.collection("collection-name");
// Insert Multiple document
col.insertMany(parsedData, function (err, result) {
if (err) {
console.log("error->", err);
}
console.log("Result from db->", result);
//Close the connection
client.close();
});
}
);
return client;
});
const parserdata = await parserFcn;
const conn = await connectToDb;
let promiseFactories = [parserdata, conn];
Promise.all(promiseFactories).then((data) => {
console.log("completed all promises", data);
});
};
Here's an attempt at replacing promise definitions in the post with functions that return a promise as suggested by @Gamma032. You may find it useful as a guide to compare with code you are writing and what the handler is supposed to do.
The replacement functions were not declared as
asyncfunctions because they're using callbacks to resolve/reject the new promises they create and return. Waiting for functions to complete in the order called is performed inside a standardtry/catchso that code can detectawaitre-throwing the rejection reason of a rejected promise it was waiting on.I left the global variables mentioned in comment as they were, but moved the initial definition of
parsedDatainside theparseDatafunction and renamed the "connection" function toupdateDBbecause it both connects to and updates the database.Note
An edit from the OP added
to the
tryclause after awaiting the values ofparsedDataandresult. However neither of these values in a promise (you can't fulfill a promise with a promise and theawaitoperator never returns a promise as the result of the await operation), so passing them through a call toPromise.allsimply puts a job in the promise job queue to perform the console.log from thethenhandler. Logging the message after awaiting both values should suffice.