How can I read just the messages from the last x seconds of a Kafka Topic in C#?

17 Views Asked by At

Currently I can read the last message in a Kafka topic with following code. Now I need something similar. I want to read just the messages from the last x seconds of a topic. How can I do that?

using 
Confluent.Kafka;
using System;

class Consumer
{
static void Main(string[] args)
{
ConsumerConfig config = new()
{
    BootstrapServers = "localhost:9092",
    GroupId = Guid.NewGuid().ToString(),
};
using IConsumer<string, string> consumer 
    = new ConsumerBuilder<string, string>(config).Build();
try
{
    TopicPartition topicPartition = new("purchases", new Partition(0));
    WatermarkOffsets watermarkOffsets 
        = consumer.QueryWatermarkOffsets(topicPartition, TimeSpan.FromSeconds(3));
    TopicPartitionOffset topicPartitionOffset 
        = new(topicPartition, new Offset(watermarkOffsets.High.Value - 1));
    consumer.Assign(topicPartitionOffset);
    ConsumeResult<string, string> consumeResult 
        = consumer.Consume(TimeSpan.FromSeconds(3));
    Console.Write($"Last message value = {consumeResult.Message.Value}, " +
        $"position = {consumer.Position(topicPartition)}");
}
finally 
{
  consumer.Close(); 
}
}
}
0

There are 0 best solutions below