Currently I can read the last message in a Kafka topic with following code. Now I need something similar. I want to read just the messages from the last x seconds of a topic. How can I do that?
using
Confluent.Kafka;
using System;
class Consumer
{
static void Main(string[] args)
{
ConsumerConfig config = new()
{
BootstrapServers = "localhost:9092",
GroupId = Guid.NewGuid().ToString(),
};
using IConsumer<string, string> consumer
= new ConsumerBuilder<string, string>(config).Build();
try
{
TopicPartition topicPartition = new("purchases", new Partition(0));
WatermarkOffsets watermarkOffsets
= consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
TopicPartitionOffset topicPartitionOffset
= new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
consumer.Assign(topicPartitionOffset);
ConsumeResult<string, string> consumeResult
= consumer.Consume(TimeSpan.FromSeconds(3));
Console.Write($"Last message value = {consumeResult.Message.Value}, " +
$"position = {consumer.Position(topicPartition)}");
}
finally
{
consumer.Close();
}
}
}