I'm currently trying to process my batch of records using each() method in the module async but I'm unable to do so. The data I'm reading is from aws Kinesis Data Streams.
Here is my code for the same
"use strict";
const {
KinesisClient,
GetRecordsCommand,
GetShardIteratorCommand,
} = require("@aws-sdk/client-kinesis");
const { format } = require("date-fns");
const retry = require("async-retry");
const EventEmitter = require("events");
const async = require("async");
const kinesis = new KinesisClient({ region: "ap-south-1" });
const BUFFER_SIZE = 200;
let buffer = [];
let iterator;
const eventEmitter = new EventEmitter();
const pushToBuffer = (records) => {
if (buffer.length >= BUFFER_SIZE) {
eventEmitter.emit("bufferEvent", "The buffer is full.");
return;
} else {
buffer.push(...records);
}
};
const getShardIterator = async () => {
const command = new GetShardIteratorCommand({
StreamARN: "<my-arn>",
ShardId: "<my-shard-id>",
ShardIteratorType: "TRIM_HORIZON",
StreamName: "<my-data-stream>",
});
const response = await kinesis.send(command);
iterator = response.ShardIterator;
};
const readFromShard = async () => {
try {
const command = new GetRecordsCommand({
ShardIterator: iterator,
Limit: BUFFER_SIZE,
});
const response = await kinesis.send(command);
const records = response.Records;
if (records.length > 0) {
const parsedRecords = records.map((record) =>
JSON.parse(Buffer.from(record.Data).toString("utf-8"))
);
pushToBuffer(parsedRecords);
}
if (response.NextShardIterator) {
iterator = response.NextShardIterator;
}
} catch (error) {
console.error(error);
}
};
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
const run = async () => {
await getShardIterator();
while (true) {
try {
await retry(
async () => {
await readFromShard();
await sleep(5000);
},
{
retries: 3,
minTimeout: 1500,
maxTimeout: 5000,
}
);
} catch (error) {
console.error(error);
}
}
};
run();
const transformDate = async (record) => {
let time = record.time;
let timestamp = format(new Date(time), "dd-MMM-yyyy HH:mm:ss", {
timeZone: "Asia/Kolkata",
});
return JSON.stringify({
...record,
time: timestamp,
});
};
const transformBuffer = async () => {
try {
await async.each(buffer, transformDate);
} catch (err) {
if (err) console.error(err);
}
};
eventEmitter.on("bufferEvent", (message) => {
console.log(message);
transformBuffer();
console.log(buffer);
buffer = [];
});
2 functions transformBuffer() and transformDate() which has the code related to the date transformation.
The time field in the object remains the same.
Any help with this will be greatly appreciated. Thanks!