Confluent Kafka consumer consumes messages only after changing groupId

930 Views Asked by At

I have a .Net core console application, that uses Confluent.Kafka. I build a consumer for consuming messages from specific topic. the app is intended to run a few times every-day, consume the messages on the specified topic and process them.

It took me a while to understand the consumer's vehavior, but the it will consume messages only if its groupId is a one that was never in use before. Every time I change the consumer's groupId - the comsumer will fetch the messages in the subscribed topic. But on the next runs - with same groupId - the consumer.Consume returns null.

This behvior seems rlated to rebalance between consumers on same group. But I don't understand why - since the consumer should exist only throughout the application liftime. Before leaving the app, I call to consumer.close() and consumer.Dispose(). These should destoy the consumer, so that on the next run, when I create the consumer, again it will be the first and single consumer on the specified groupId. But as I said, this is not what happens in fact.

I know there are messages on the topic - I check it via command-line. And I also made sure the topic has only 1 partition.

The most weird thing is, that I have another .net core console app, which does the same process - and with no issue at all.

I attach the codes of the 2 apps.

Working app - always consuming:

class Program
    {
        ...
        
        static void Main(string[] args)
        {
            if (args.Length != 2)
            {
                Console.WriteLine("Please provide topic name to read and SMTP topic name");
            }
            else
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<ProducerConfig, ProducerConfig>();

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                var pConfig = serviceProvider.GetService<ProducerConfig>();

                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "confluence-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;

                pConfig.BootstrapServers = Environment.GetEnvironmentVariable("producer_bootstrap_servers");

                var consumer = new ConsumerHelper(cConfig, args[0]);

                messages = new Dictionary<string, Dictionary<string, UserMsg>>();

                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"Current consumed msg-json: {result.Message.Value}");

                    ...
                    
                    result = consumer.ReadMessage();
                }

                consumer.Close();
                Console.WriteLine($"Done consuming messages from topic {args[0]}");


            }

        }

class ConsumerHelper.cs

namespace AggregateMailing
{
    using System;
    using Confluent.Kafka;
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;

        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
                _topicName = topicName;
                _consumerConfig = consumerConfig;

                var builder = new ConsumerBuilder<string, string>(_consumerConfig);
                _consumer = builder.Build();

                _consumer.Subscribe(_topicName);
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }

        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ReadMessage: start");
            try
            {
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ReadMessage: {exc.ToString()}");
                return null;
            }
        }

        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
        }
    }
}

Not working app - consuming only on first run after changing consumer groupId to one never in use:

class Program.cs

class Program
    {
        private static SmtpClient smtpClient;
        private static Random random = new Random();
        static void Main(string[] args)
        {
            try
            {
                var services = new ServiceCollection();
                services.AddSingleton<ConsumerConfig, ConsumerConfig>();
                services.AddSingleton<SmtpClient>(new SmtpClient("smtp.gmail.com"));

                var serviceProvider = services.BuildServiceProvider();

                var cConfig = serviceProvider.GetService<ConsumerConfig>();
                cConfig.BootstrapServers = Environment.GetEnvironmentVariable("consumer_bootstrap_servers");
                cConfig.GroupId = "smtp-consumer";
                cConfig.EnableAutoCommit = true;
                cConfig.StatisticsIntervalMs = 5000;
                cConfig.SessionTimeoutMs = 6000;
                cConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                cConfig.EnablePartitionEof = true;


                var consumer = new ConsumerHelper(cConfig, args[0]);

                ...
                
                var result = consumer.ReadMessage();
                while (result != null && !result.IsPartitionEOF)
                {
                    Console.WriteLine($"current consumed message: {result.Message.Value}");
                    var msg = JsonConvert.DeserializeObject<EmailMsg>(result.Message.Value);

                    result = consumer.ReadMessage();
                }

                Console.WriteLine("Done sending emails consumed from SMTP topic");
                consumer.Close();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Main: {exc.ToString()}");
            }

        }

class ConsumerHelper.cs

using Confluent.Kafka;
using System;
using System.Collections.Generic;

namespace Mailer
{
    public class ConsumerHelper
    {
        private string _topicName;
        private ConsumerConfig _consumerConfig;
        private IConsumer<string, string> _consumer;
        public ConsumerHelper(ConsumerConfig consumerConfig, string topicName)
        {
            try
            {
               _topicName = topicName;
               _consumerConfig = consumerConfig;

               var builder = new ConsumerBuilder<string, string> (_consumerConfig);
               _consumer = builder.Build();

               _consumer.Subscribe(_topicName);
               //_consumer.Assign(new TopicPartition(_topicName, 0));
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumerHelper: {exc.ToString()}");
            }
        }
        public ConsumeResult<string, string> ReadMessage()
        {
            Console.WriteLine("ConsumeResult: start");
            try
            {
                
                return _consumer.Consume();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on ConsumeResult: {exc.ToString()}");
                return null;
            }
        }
        public void Close()
        {
            Console.WriteLine("Close: start");
            try
            {
                _consumer.Close();
                _consumer.Dispose();
            }
            catch (System.Exception exc)
            {
                Console.WriteLine($"Error on Close: {exc.ToString()}");
            }
            Console.WriteLine("Close: end");
        }

    }
}
0

There are 0 best solutions below