I need to organize the sequential processing of messages from tens of thousands of partitions.
The messages get into the topic with the partitioning key "clientId-service".
Each message from the queue can be processed for quite a long time, but I need to achieve consistent execution within a batch.
I need to do this because there may be tens of thousands of messages in one partition when there are only a few messages in other partitions. If the receive method is called sequentially, then messages from other partitions can get "stuck" for a long time.
I would like to run receive method in several threads, but not receive messages from a partition until the acknowledge or negative acknowledge method has been called for the previous message from the same partition.
It is not possible to create a consumer for each partition, because the number of partitions is not known in advance, may vary and their number too large.
Maybe there is a way for Pulsar or alternatives for this use case?
I tried doing it this way:
producer.js:
(async () => {
const Pulsar = require('pulsar-client');
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const producer = await client.createProducer({
topic: 'my-topic',
messageRoutingMode: "RoundRobinDistribution",
accessMode: "Shared",
});
let i = 0;
while (true) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
partitionKey: (i % 3).toString(),
});
console.log(`Sent message: ${msg}`);
await producer.flush();
await sleep(500)
i += 1;
}
})();
Sent message: my-message-0
Sent message: my-message-1
Sent message: my-message-2
Sent message: my-message-3
Sent message: my-message-4
Sent message: my-message-5
Sent message: my-message-6
Sent message: my-message-7
Sent message: my-message-8
Sent message: my-message-9 // run consumer.js
Sent message: my-message-10
Sent message: my-message-11
Sent message: my-message-12
Sent message: my-message-13
Sent message: my-message-14
Sent message: my-message-15
consumer.js:
(async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
});
const consumer = await client.subscribe({
topic: 'my-topic',
subscription: 'my-subscription',
subscriptionType: 'KeyShared',
subscriptionInitialPosition: "Earliest",
});
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push((async (consumerId) => {
while(consumer.isConnected()) {
const msg = await consumer.receive();
const partition = msg.getPartitionKey();
const partitionInt = parseInt(partition);
if (partitionInt === 0) {
await sleep(3000);
console.log(
"partition", msg.getPartitionKey(),
"message", msg.getData().toString(),
"date", new Date().toUTCString()
);
}
await consumer.acknowledge(msg);
}
await consumer.close();
})(i));
}
await Promise.all(promises);
await client.close();
})();
Result:
partition 0 message my-message-0 date 2024-03-14T04:50:37.453Z // equal date
partition 0 message my-message-3 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-6 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-9 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-12 date 2024-03-14T04:50:38.456Z // less then 3 sesonds sleep
partition 0 message my-message-15 date 2024-03-14T04:50:39.457Z // less then 3 sesonds sleep
Expected:
partition 0 message my-message-0 date 2024-03-14T04:50:37.453Z //
partition 0 message my-message-3 date 2024-03-14T04:50:40.456Z // after 3000 ms
partition 0 message my-message-6 date 2024-03-14T04:50:43.456Z // after 3000 ms
partition 0 message my-message-9 date 2024-03-14T04:50:46.456Z // after 3000 ms
partition 0 message my-message-12 date 2024-03-14T04:50:49.456Z // after 3000 ms
partition 0 message my-message-15 date 2024-03-14T04:50:53.457Z // after 3000 ms