Hopefully I can explain what I'm trying to do as best as possible, because I'm aware it's a lot of code to read.
I have a Dynamo DB table with a "park" (carpark) record, and that record contains a value (the occupancy of the carpark currently).
Updates to this Dynamo table should result in an update to a platform we have called DX.
I am trying to create 2 E2E test events, I have called these "someFunction" and "someOtherFunction".
someFunction starts off by manually overwriting the park table's "value" entry to 50. This should update the DX API, so I confirm that the value stored there is also 50.
It then publishes an MQ event to DX which in turn cause an update to our Dynamo DB (decrementing the value by 1), and this should in-turn be reflected via a GET to the DX API.
I have set up a few retries for reaching out to the DX API, as I know it can take time.
someOtherFunction starts off by also manually overwriting the park table's "value" entry to 50. This should update the DX API, so I confirm that the value stored there is also 50.
It then publishes 10 MQ events to DX which in turn cause 10 updates to our Dynamo DB (decrementing the value by 1 each time), and this should in-turn be reflected via a GET to the DX API - the check here should confirm that the value stored on DX has gone from 50 to 40.
I have set up a few retries for reaching out to the DX API, as I know it can take time.
import * as mqtt from 'mqtt';
import * as dateFnsTz from 'date-fns-tz';
import axios from 'axios';
const parsedCredentials = JSON.parse(process.env.E2E_CREDENTIALS || '{}');
const DXCountUrl = parsedCredentials.DX_count_url;
const headers = {
accept: 'application/json',
'x-api-key': parsedCredentials.DX_api_key
};
interface Message {
UtcTime: Date;
Source: {
VideoSource: string;
Rule: string;
};
Data: {
Count: string;
};
}
async function resetOccupancyValue(): Promise<void> {
const url = 'https://lambda-url/refreshOccupancyValue';
const queryParams = {
parkId: 'b81ed160-555a-8049-1168-7b460f06f72e',
newOccupancyValue: 50
};
const headers = {
'x-api-key': '123'
};
console.log('Resetting Dynamo DB Park Occupancy table value');
try {
const response = await axios.post(url, null, {
params: queryParams,
headers: headers
});
// Handle the response here (e.g., logging or further processing)
console.log('Response:', response.data);
} catch (error) {
// Handle any errors here
console.error('Error:', error);
}
}
async function getFromDX(expectedValue: number): Promise<boolean> {
try {
const response = await axios.get(DXCountUrl, { headers });
console.log('GET Response from DX API: ', response.data);
if (response && response.data.data[0].value == expectedValue) {
return true;
}
return false;
} catch (error) {
console.error('Error during GET from DX API after MQTT publishes:', error);
return false;
}
}
// 2000ms delay
async function delay() {
const numMilliSecs = 2000;
return new Promise((resolve) => setTimeout(resolve, numMilliSecs));
}
async function getFromDX_nTimes(maxRetries: number, expectedValue: number) {
let check = false;
let retries = 0;
while (!check && retries < maxRetries) {
await delay();
check = await getFromDX(expectedValue);
retries = ++retries;
console.log('retries: ', retries);
}
if (check) {
console.log('TODO - Success');
} else {
console.log('TODO: Failure - value did not change in given time on DX, or was unreachable');
}
console.log('Retries count was: ', retries);
}
function publishMessage(client: mqtt.MqttClient, message: Message, topic: string) {
console.log('publishMessage - publishing message to MQTT: ', message);
client.publish(topic, JSON.stringify(message), (err) => {
if (err) {
console.error('Error publishing:', err);
} else {
console.log('Message published successfully');
}
});
}
async function someFunction(): Promise<void> {
await resetOccupancyValue();
console.log('someFunction - initial polling of DX');
await getFromDX_nTimes(3, 50);
// Create a client instance with credentials
const client = mqtt.connect(`mqtt://${parsedCredentials.verne_mq_broker_ip}:${parsedCredentials.verne_mq_broker_port}`, {
username: parsedCredentials.verne_mq_broker_username,
password: parsedCredentials.verne_mq_broker_password
});
// Event handler for when the client connects
client.on('connect', () => {
console.log('somefunction Connected to MQTT broker');
// Function to publish the message
const msg: Message = {
UtcTime: dateFnsTz.zonedTimeToUtc(new Date(), 'GMT'),
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '1' }
};
publishMessage(client, msg, parsedCredentials.verne_mq_vehicle_exit_topic);
client.end(); // Close the client connection
});
// Event handler for when the client disconnects
client.on('close', async () => {
console.log('Disconnected from MQTT broker');
console.log('someFunction - post-publish polling of DX');
await getFromDX_nTimes(3, 49);
});
// Event handler for errors
client.on('error', (error) => {
console.error('MQTT Error:', error);
});
}
// ----
async function someOtherFunction(): Promise<void> {
await resetOccupancyValue();
console.log('someOtherFunction - initial polling of DX');
await getFromDX_nTimes(3, 50);
// Create a client instance with credentials
const client = mqtt.connect(`mqtt://${parsedCredentials.verne_mq_broker_ip}:${parsedCredentials.verne_mq_broker_port}`, {
username: parsedCredentials.verne_mq_broker_username,
password: parsedCredentials.verne_mq_broker_password
});
// Event handler for when the client connects
client.on('connect', async () => {
console.log('someotherfunction Connected to MQTT broker');
const numPublishes = 10; // Number of times to publish the message
let publishesDone = 0; // Counter to track publishes
while (publishesDone < numPublishes) {
const msg: Message = {
UtcTime: dateFnsTz.zonedTimeToUtc(new Date(), 'GMT'),
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: (publishesDone + 1).toString() }
};
publishMessage(client, msg, parsedCredentials.verne_mq_vehicle_exit_topic);
await delay();
publishesDone = ++publishesDone;
console.log('publishesDone = ', publishesDone);
}
client.end(); // Close the client connection
});
// Event handler for when the client disconnects
client.on('close', async () => {
console.log('Disconnected from MQTT broker');
console.log('someOtherFunction - post-publish polling of DX');
await getFromDX_nTimes(3, 40);
});
// Event handler for errors
client.on('error', (error) => {
console.error('MQTT Error:', error);
});
}
// ----
async function main(): Promise<void> {
try {
console.log('Running someFunction');
console.log('-----------------------------------------------');
await someFunction();
// console.log('Running someOtherFunction');
// console.log('-----------------------------------------------');
// await someOtherFunction();
} catch (error) {
console.error('An error occurred:', error);
}
}
main()
.then(() => {
console.log('Exiting normally');
})
.catch((err) => {
console.error('Exiting with error', err);
process.exitCode = 1;
});
Individually, these functions are working just fine.
For instance, if I run just someFunction():
Running someFunction
-----------------------------------------------
Resetting Dynamo DB Park Occupancy table value
Response:
someFunction - initial polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:04:58.517 UTC', value: 50 } ]
}
retries: 1
TODO: Success
Retries count was: 1
Exiting normally
somefunction Connected to MQTT broker
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:05:00.472Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '1' }
}
Message published successfully
MQTT Error: Error: read ECONNRESET
at TCP.onStreamRead (node:internal/stream_base_commons:217:20) {
errno: -4077,
code: 'ECONNRESET',
syscall: 'read'
}
Disconnected from MQTT broker
someFunction - post-publish polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:04:58.517 UTC', value: 50 } ]
}
retries: 1
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:05:03.257 UTC', value: 49 } ]
}
retries: 2
TODO: Success
Retries count was: 2
Or if I just run someOtherFunction():
Running someOtherFunction
-----------------------------------------------
Resetting Dynamo DB Park Occupancy table value
Response:
someOtherFunction - initial polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:06:38.173 UTC', value: 50 } ]
}
retries: 1
TODO: Success
Retries count was: 1
Exiting normally
someotherfunction Connected to MQTT broker
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:39.931Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '1' }
}
Message published successfully
publishesDone = 1
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:41.949Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '2' }
}
Message published successfully
publishesDone = 2
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:43.963Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '3' }
}
Message published successfully
publishesDone = 3
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:45.975Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '4' }
}
Message published successfully
publishesDone = 4
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:47.980Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '5' }
}
Message published successfully
publishesDone = 5
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:49.990Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '6' }
}
Message published successfully
publishesDone = 6
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:51.999Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '7' }
}
Message published successfully
publishesDone = 7
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:54.012Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '8' }
}
Message published successfully
publishesDone = 8
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:56.022Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '9' }
}
Message published successfully
publishesDone = 9
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:06:58.024Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '10' }
}
Message published successfully
publishesDone = 10
MQTT Error: Error: read ECONNRESET
at TCP.onStreamRead (node:internal/stream_base_commons:217:20) {
errno: -4077,
code: 'ECONNRESET',
syscall: 'read'
}
Disconnected from MQTT broker
someOtherFunction - post-publish polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:06:59.400 UTC', value: 40 } ]
}
retries: 1
TODO: Success
Retries count was: 1
But if I run both, the way I've done it in the first code sample, the output is a bit interweaved.. I end up getting results for someOtherFunction before someFunction finishes:
Running someFunction
-----------------------------------------------
Resetting Dynamo DB Park Occupancy table value
Response:
someFunction - initial polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:12.902 UTC', value: 50 } ]
}
retries: 1
TODO: Success
Retries count was: 1
Running someOtherFunction
-----------------------------------------------
Resetting Dynamo DB Park Occupancy table value
somefunction Connected to MQTT broker
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:14.836Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '1' }
}
Message published successfully
MQTT Error: Error: read ECONNRESET
at TCP.onStreamRead (node:internal/stream_base_commons:217:20) {
errno: -4077,
code: 'ECONNRESET',
syscall: 'read'
}
Disconnected from MQTT broker
someFunction - post-publish polling of DX
Response:
someOtherFunction - initial polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:15.577 UTC', value: 50 } ]
}
retries: 1
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:15.577 UTC', value: 50 } ]
}
retries: 1
TODO: Success
Retries count was: 1
Exiting normally
someotherfunction Connected to MQTT broker
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:17.485Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '1' }
}
Message published successfully
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:16.573 UTC', value: 49 } ]
}
retries: 2
TODO: Success
Retries count was: 2
publishesDone = 1
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:19.489Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '2' }
}
Message published successfully
publishesDone = 2
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:21.497Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '3' }
}
Message published successfully
publishesDone = 3
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:23.516Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '4' }
}
Message published successfully
publishesDone = 4
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:25.530Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '5' }
}
Message published successfully
publishesDone = 5
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:27.543Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '6' }
}
Message published successfully
publishesDone = 6
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:29.552Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '7' }
}
Message published successfully
publishesDone = 7
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:31.555Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '8' }
}
Message published successfully
publishesDone = 8
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:33.568Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '9' }
}
Message published successfully
publishesDone = 9
publishMessage - publishing message to MQTT: {
UtcTime: 2023-09-06T18:09:35.583Z,
Source: { VideoSource: '1', Rule: 'Vehicle Exit Count' },
Data: { Count: '10' }
}
Message published successfully
publishesDone = 10
MQTT Error: Error: read ECONNRESET
at TCP.onStreamRead (node:internal/stream_base_commons:217:20) {
errno: -4077,
code: 'ECONNRESET',
syscall: 'read'
}
Disconnected from MQTT broker
someOtherFunction - post-publish polling of DX
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:37.354 UTC', value: 39 } ]
}
retries: 1
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:37.354 UTC', value: 39 } ]
}
retries: 2
GET Response from DX API: {
data_quality: 'good',
data: [ { eventTime: 'Wed, 06 Sep 2023 17:09:37.354 UTC', value: 39 } ]
}
retries: 3
TODO: Failure - value did not change in given time on DX, or was unreachable
Retries count was: 3
I don't really understand why this would be the case, I thought by saying await someFunction() and then await someOtherFunction(), there would be a clear separation between the two functions, where the first has to finish before the second can even begin.