Group RabbitMQ messages by id to processes them synchronously

597 Views Asked by At

I'm developing a solution that has two application: Only one publisher and Only onde consumer. This customer that needs to process messages like this:

2020-09-30T19:22:41 MSG1) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:22:42 MSG2) {id:'user2.345', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG3) {id:'user3.877', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG4) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:22:43 MSG5) {id:'user1.123', msg: '{ANYJSON}'}

Each message takes 5-10 seconds do be processed(db queries, 4 http request, db insert e etc.) and I'm creating a pool (https://threads.js.org/usage-pool) for each message because it needs to repeat commands(db queries, 4 http request, db insert e etc.) for many users. So each POOL/MSG can run concurrently.

These IDS are dynamically generated by the publisher and it changes every time. Basically I need to "group" messages by Id and only process messages when there is no message been processing concurrently for this id. For example:

  • MSG 1 (2020-09-30T19:22:41) should be processed immediately
  • MSG 2 (2020-09-30T19:22:42) should be processed immediately
  • MSG 3 (2020-09-30T19:22:43) should be processed immediately
  • MSG 4 (2020-09-30T19:22:43) should wait MSG 1 be processed
  • MSG 5 (2020-09-30T19:22:43) should wait MSG 4 be processed

Suppose that after 1 minute consumer receive:

2020-09-30T19:23:41 MSG6) {id:'user1.123', msg: '{ANYJSON}'}
2020-09-30T19:23:42 MSG7) {id:'user2.345', msg: '{ANYJSON}'}
  • MSG 6 (2020-09-30T19:23:41) should be processed immediately because MSG5 has already been processed.
  • MSG 7 (2020-09-30T19:23:41) should be processed immediately because MSG2 has already been processed.

enter image description here

At line 366 I know when a pool is finished, so its time to ask another MSG for the same id.

So, how could implement this queue "grouped" by id?

0

There are 0 best solutions below