I'm developing a solution that has two application: Only one publisher and Only onde consumer. This customer that needs to process messages like this:
2020-09-30T19:22:41 MSG1) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:22:42 MSG2) {id:'user2.345', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG3) {id:'user3.877', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG4) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG5) {id:'user1.123', msg: '{ANYJSON}'}
Each message takes 5-10 seconds do be processed(db queries, 4 http request, db insert e etc.) and I'm creating a pool (https://threads.js.org/usage-pool) for each message because it needs to repeat commands(db queries, 4 http request, db insert e etc.) for many users. So each POOL/MSG can run concurrently.
These IDS are dynamically generated by the publisher and it changes every time. Basically I need to "group" messages by Id and only process messages when there is no message been processing concurrently for this id. For example:
MSG 1
(2020-09-30T19:22:41) should be processed immediatelyMSG 2
(2020-09-30T19:22:42) should be processed immediatelyMSG 3
(2020-09-30T19:22:43) should be processed immediatelyMSG 4
(2020-09-30T19:22:43) should waitMSG 1
be processedMSG 5
(2020-09-30T19:22:43) should waitMSG 4
be processed
Suppose that after 1 minute consumer receive:
2020-09-30T19:23:41 MSG6) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:23:42 MSG7) {id:'user2.345', msg: '{ANYJSON}'}
MSG 6
(2020-09-30T19:23:41) should be processed immediately becauseMSG5
has already been processed.MSG 7
(2020-09-30T19:23:41) should be processed immediately becauseMSG2
has already been processed.
At line 366 I know when a pool is finished, so its time to ask another MSG for the same id.
So, how could implement this queue "grouped" by id?