I am trying to store batch of messages that failed to publish to kafka in DB and have added a callback deliveryReport for the same. However, when broker is up and running and there is no other issue, all the messages should successfully get published. However the first message in redordList always gets "error : message timed out" error and gets added to failedDataList. This is a pattern. Can anyone help me understand the issue here ?
using (IProducer<Null, GenericData> producer = new ProducerBuilder<Null, GenericData>(producerConfig).SetValueSerializer(serializer).Build())
{
try
{
int recordsCount = 0;
List<GenericData> failedDataList = new List<GenericData>();
foreach (GenericData genericData in recordList)
{
producer.Produce(KafkaTopic, new Message<Null, GenericData>
{
Value = genericData,
Headers = new Headers { new Header(SchemaIdHeader, BitConverter.GetBytes(schemaId)) }
}, deliveryReport =>
{
recordsCount++;
if (deliveryReport.Status == PersistenceStatus.NotPersisted)
{
failedDataList.Add(genericData);
}
if (recordsCount == recordList.Count && failedDataList.Count > 0)
{
Logs.Export.Err("Failed message delivery: error reason: {0} ", deliveryReport.Error.Reason);
byte[] bytes = ToBytes(failedDataList);
StoreInDB.StoreData(bytes);
}
}
);
producer.Flush();
}
return true;