I am using Confluent Kafka library for consuming data from a Kafka topic with the code below. I am consuming the messages one by one in a while loop, staring with the last message. (Previous messages before the first connection are not needed).

But the problem is that appr. 40 producers send data to the same topic every second (Every second 40 messages are written to the topic) The consuming loop in my code takes sometimes 20 ms, sometimes 200 ms, I think it is changing depending on the network conditions. As result I cannot consume messages as fast as the producers write new messages to the topic.

The messages are extremely short ( << 1kb)

Is there a way to speed up this process? Should I change some default conf. parameters? Or could I perhaps read multiple messages at one time?

public async Task Read_data_from_Kafka()
{            
    config = new ConsumerConfig()
    {
        BootstrapServers = "servers",
        GroupId = "foo3",
        AutoOffsetReset = AutoOffsetReset.Latest, 
        EnableAutoCommit = false,
    };
    using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        c.Subscribe("my_topic"); 
        CancellationTokenSource cts = new CancellationTokenSource();
        Console.CancelKeyPress += (_, e) => {
            e.Cancel = true; // prevent the process from terminating.
            cts.Cancel();
        };

        try
        {
            while (true)
            {
                try
                {
                    var cr = c.Consume(cts.Token);
                    using (StreamWriter sw = File.AppendText("D:\\test\\kafka_messages.txt"))
                    {
                        sw.WriteLine("Kafka message: " + cr.Message.Value + " " + Convert.ToString(DateTime.UtcNow.AddHours(3).ToString("yyyy-MM-dd HH:mm:ss.fff",
                                    CultureInfo.InvariantCulture)));
                    }
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine($"Error occured: {e.Error.Reason}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            // Ensure the consumer leaves the group cleanly and final offsets are committed.
            c.Close();
        }
    }

}
1

There are 1 best solutions below

0
JDChris100 On

You can read multiple messages at once by increasing the number of partitions on your Kafka topic. Each Partition will have a maximum of one consumer reading from it, which means if you only have one partition, you can only have one (active) consumer.

If you increase your Kafka partitions, you can then have multiple consumers running at once, each reading from a different partition.

Depending on your use case you might also want to look into Kafka Message keys, which will ensure that messages with the same keys will go the same partition, which will then ensure that they get processed one at a time. This can be useful for preventing race conditions on certain things like database operations.

Useful Links: