I am using spring-kafka in my consumer application to read the records from kafka. The producer application is sending the B3 headers as follows.
ProducerRecord producerRecord = new ProducerRecord(kafkaTopic, key, timeOffRequestKafkaModel);
producerRecord.headers()
.add("X-B3-TraceId",tracer.currentSpan().context().traceId().getBytes(StandardCharsets.UTF_8));
The consumer application is listening the records as follows using spring-kafka library
@KafkaListener(
id = "foo",
topics = "#{@kafkaConfig.getConsumerConfig('tor-consumers').getTopicName()}",
groupId = "#{@kafkaConfig.getConsumerConfig('tor-consumers').getGroupId()}",
concurrency = "#{@kafkaConfig.getConsumerConfig('tor-consumers').getConcurrency()}",
containerFactory = "primaryKafkaListenerContainerFactory",
properties = "#{@kafkaConfig.getConsumerConfig('tor-consumers').getProperties()}"
)
public void onEvent(@Payload TimeOffRequestKafkaModel timeOffRequestKafkaPayload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offset,
@Headers MessageHeaders headers,
Acknowledgment acknowledgment)
{
processTorEvent(timeOffRequestKafkaPayload, acknowledgment);
}
I want to read the traceID and propagate it in my consumer application from B3 headers from kafka payload.
The dependencies spring-cloud-starter-sleuth is already added.
I configured the properties in my application.yml as follows but the sleuth generates its own traceId for each record consumed. What should I change so it can pick up the B3 header automatically and use it as the traceID for each records ?
spring:
sleuth:
propagation:
type: B3
messaging:
kafka:
enabled: true
propagation:
enabled : true