I need to organize the sequential processing of messages from tens of thousands of partitions.

The messages get into the topic with the partitioning key "clientId-service".

Each message from the queue can be processed for quite a long time, but I need to achieve consistent execution within a batch.

I need to do this because there may be tens of thousands of messages in one partition when there are only a few messages in other partitions. If the receive method is called sequentially, then messages from other partitions can get "stuck" for a long time.

I would like to run receive method in several threads, but not receive messages from a partition until the acknowledge or negative acknowledge method has been called for the previous message from the same partition.

It is not possible to create a consumer for each partition, because the number of partitions is not known in advance, may vary and their number too large.

Maybe there is a way for Pulsar or alternatives for this use case?

I tried doing it this way:

producer.js:

(async () => {
    const Pulsar = require('pulsar-client');

    const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
    });

    const producer = await client.createProducer({
        topic: 'my-topic',
        messageRoutingMode: "RoundRobinDistribution",
        accessMode: "Shared",
    });

    let i = 0;
    while (true) {
        const msg = `my-message-${i}`;
        producer.send({
            data: Buffer.from(msg),
            partitionKey: (i % 3).toString(),
        });
        console.log(`Sent message: ${msg}`);
        await producer.flush();
        await sleep(500)
        i += 1;
    }
})();
Sent message: my-message-0
Sent message: my-message-1
Sent message: my-message-2
Sent message: my-message-3
Sent message: my-message-4
Sent message: my-message-5
Sent message: my-message-6
Sent message: my-message-7
Sent message: my-message-8
Sent message: my-message-9 // run consumer.js
Sent message: my-message-10
Sent message: my-message-11
Sent message: my-message-12
Sent message: my-message-13
Sent message: my-message-14
Sent message: my-message-15

consumer.js:

(async () => {
    const client = new Pulsar.Client({
        serviceUrl: 'pulsar://localhost:6650',
    });

    const consumer = await client.subscribe({
        topic: 'my-topic',
        subscription: 'my-subscription',
        subscriptionType: 'KeyShared',
        subscriptionInitialPosition: "Earliest",
    });

    const promises = [];
    for (let i = 0; i < 10; i++) {
        promises.push((async (consumerId) => {
            while(consumer.isConnected()) {
                const msg = await consumer.receive();
                const partition =  msg.getPartitionKey();
                const partitionInt = parseInt(partition);
                if (partitionInt === 0) {
                    await sleep(3000);
                    console.log(
                        "partition", msg.getPartitionKey(),
                        "message", msg.getData().toString(),
                        "date", new Date().toUTCString()
                    );
                }
                await consumer.acknowledge(msg);
            }
            await consumer.close();
        })(i));
    }
    await Promise.all(promises);
    await client.close();
})();

Result:

partition 0 message my-message-0 date 2024-03-14T04:50:37.453Z // equal date
partition 0 message my-message-3 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-6 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-9 date 2024-03-14T04:50:37.456Z // equal date
partition 0 message my-message-12 date 2024-03-14T04:50:38.456Z // less then 3 sesonds sleep
partition 0 message my-message-15 date 2024-03-14T04:50:39.457Z // less then 3 sesonds sleep

Expected:

partition 0 message my-message-0 date 2024-03-14T04:50:37.453Z // 
partition 0 message my-message-3 date 2024-03-14T04:50:40.456Z // after 3000 ms
partition 0 message my-message-6 date 2024-03-14T04:50:43.456Z // after 3000 ms
partition 0 message my-message-9 date 2024-03-14T04:50:46.456Z // after 3000 ms
partition 0 message my-message-12 date 2024-03-14T04:50:49.456Z // after 3000 ms
partition 0 message my-message-15 date 2024-03-14T04:50:53.457Z // after 3000 ms
0

There are 0 best solutions below