Can't send message to kafka topic use Confluent.Kafka C#

4.5k Views Asked by At

We have 3 different environments: test, cert and prod. These environments have topics configured using the offset explorer. The problem is that I can send messages to cert and test, but I can't send to prod until the topic in prod is marked for deletion. As soon as I do this, the messages immediately begin to be sent. I tried to create new topics in test and cert. The problem persists until I put a mark on these topics for deletion, I did not succeed in sending a message.

This problem is happening when i call method ProduceAsync. This method work 5 minutes and finished with error :

Local: Message timed out.

If i use method Produce, the program goes next step but message in topic doesn't exist.

    private readonly KafkaDependentProducer<Null, string> _producer;
    private string topic;
    private ILogger<SmsService> _logger;
    public SmsService(KafkaDependentProducer<Null, string> producer, ILogger<SmsService> logger)
    {
        _producer = producer;
        topic = config.GetSection("Kafka:Topic").Value;
       _logger = logger;
    }
 
    public async Task<Guid?> SendMessage(InputMessageModel sms)
    {
        var message = new SmsModel(sms.text, sms.type);

        var kafkaMessage = new Message<Null, string>();
        kafkaMessage.Value = JsonConvert.SerializeObject(message);
        
        try
        {
            await _producer.ProduceAsync(topic, kafkaMessage);
        }
        catch (Exception e)
        {
            Console.WriteLine($"Oops, something went wrong: {e}");
            return null;
        }
        return message.messageId;

Class KafkaDependentProducer i take from official repo example https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Web

1

There are 1 best solutions below

0
On

I finded the solution. In my case i needed add "Acks" parameter in ProducerConfig. (Acks = Acks.Leader (equals 1)) Unfortunately the last version of Kafka.Confluent don't write exception, i had to do version lower. ProduceAsync gave me exception: Broker: Not enough in-sync replicas, when i just finded answer in internet. Parameter: min.insync.replicas in problem topic equals 2.