Zipkin Sender Type and Kafka Topic Not Working After Updating Spring Boot 3

1.3k Views Asked by At

Hi I just update spring boot to version 3 and in my project we are configure zipkin configuration to send span to kafka with specific topic and it not working now

zipkin:
  sender.type: kafka
  kafka.topic: topic-example

is there anyway for Micrometer tracing to configure zipkin the same way in the application.yaml? or any alternative configuration ?

====NEW UPDATE========== I tried another approach :

pom.xml

<!--Observability dependencies-->
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-otel</artifactId>
        </dependency>
        <dependency>
            <groupId>io.opentelemetry</groupId>
            <artifactId>opentelemetry-exporter-zipkin</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-sender-kafka</artifactId>
        </dependency>
        <!--Observability dependencies-->

KafkaConfiguration.java

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {

    static String join(List<?> parts) {
        StringBuilder to = new StringBuilder();
        for (int i = 0, length = parts.size(); i < length; i++) {
            to.append(parts.get(i));
            if (i + 1 < length) {
                to.append(',');
            }
        }
        return to.toString();
    }

    @Bean("zipkinSender")
    Sender kafkaSender(KafkaProperties config, Environment environment) {
        // Need to get property value from Environment
        // because when using @VaultPropertySource in reactive web app
        // this bean is initiated before @Value is resolved
        // See gh-1990
        String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin");
        Map<String, Object> properties = config.buildProducerProperties();
        properties.put("key.serializer", ByteArraySerializer.class.getName());
        properties.put("value.serializer", ByteArraySerializer.class.getName());
        // Kafka expects the input to be a String, but KafkaProperties returns a list
        Object bootstrapServers = properties.get("bootstrap.servers");
        if (bootstrapServers instanceof List) {
            properties.put("bootstrap.servers", join((List) bootstrapServers));
        }
        return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
    }
}

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group-id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer


  zipkin:
    kafka.topic: user

and I tried to check the logs by accessing running docker container :

docker exec -it kafka-container /bin/sh

bin/kafka-console-consumer.sh --topic topic-name --bootstrap-server localhost:9092 --property print.headers=true

Still it does not work please let me know if I did mistake

3

There are 3 best solutions below

2
On BEST ANSWER

We currently don't support any other sending mechanism than http. You can create a Sender bean yourself that would use Kafka. Please file an issue in Spring Boot that you're interested in adding different sender mechanisms

4
On

While there is no official support, I found a way to make it work again (Spring Boot 3.0.1):

  1. Add the dependencies io.micrometer:micrometer-tracing-bridge-otel, io.opentelemetry:opentelemetry-exporter-zipkin, io.zipkin.reporter2:zipkin-sender-kafka and org.springframework.kafka:spring-kafka.

  2. Add a configuration class like the one below, code copied from Sleuth's ZipkinKafkaSenderConfiguration:

     @Configuration
     @EnableConfigurationProperties(KafkaProperties.class)
     public class KafkaConfig {
    
         static String join(List<?> parts) {
             StringBuilder to = new StringBuilder();
             for (int i = 0, length = parts.size(); i < length; i++) {
                 to.append(parts.get(i));
                 if (i + 1 < length) {
                     to.append(',');
                 }
             }
             return to.toString();
         }
    
         @Bean("zipkinSender")
         Sender kafkaSender(KafkaProperties config, Environment environment) {
             // Need to get property value from Environment
             // because when using @VaultPropertySource in reactive web app
             // this bean is initiated before @Value is resolved
             // See gh-1990
             String topic = environment.getProperty("spring.zipkin.kafka.topic", "zipkin");
             Map<String, Object> properties = config.buildProducerProperties();
             properties.put("key.serializer", ByteArraySerializer.class.getName());
             properties.put("value.serializer", ByteArraySerializer.class.getName());
             // Kafka expects the input to be a String, but KafkaProperties returns a list
             Object bootstrapServers = properties.get("bootstrap.servers");
             if (bootstrapServers instanceof List) {
                 properties.put("bootstrap.servers", join((List) bootstrapServers));
             }
             return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
         }
     }
    
  3. Configure Kafka in your application.yaml file:

     spring:
       kafka:
         bootstrap-servers: one-host:9092,another-host:9092
         properties:
           # Set a value for batch.size or an infinite loop will happen when trying to send data to Kafka
           batch.size: 16384
           # Configure your security, sasl or whatever else you need
     # Notice that sampling properties and others moved from 'spring.sleuth' to 'management.tracing' (double-check the property names used)
     management:
       tracing:
         sampling:
           probability: 1.0
         baggage:
           remote-fields: Some-Header
           correlation-fields: Some-Header
    

This should make it work like before with Spring Boot 2.x and Spring Cloud Sleuth.

0
On

I managed to make it works with Spring Boot 3

According to Spring Cloud Sleuth 3.1 Migration Guide, the API code from Sleuth has been migrated to Micrometer Tracing.

The Brave and OpenTelemetry bridges have their respective modules in Micrometer Tracing.

Choose your Tracer instrumentation and add dependencies

  1. For OpenTelemetry
    org.springframework.kafka:spring-kafka
    io.micrometer:micrometer-tracing:VERSION
    io.zipkin.reporter2:zipkin-sender-kafka:VERSION
    io.micrometer:micrometer-tracing-bridge-otel:VERSION 
    io.opentelemetry:opentelemetry-api:VERSION
    org.apache.httpcomponents.client5:httpclient5:VERSION

Not sure that org.apache.httpcomponents.client5:httpclient5:VERSION is required but i had to put it to make it works.

  1. For Brave
org.springframework.kafka:spring-kafka
io.micrometer:micrometer-tracing:VERSION
io.zipkin.reporter2:zipkin-sender-kafka:VERSION   
io.zipkin.brave:brave:VERSION
io.micrometer:micrometer-tracing-bridge-brave:VERSION
  1. Configure a bean that send event to kafka with Zipkin sender.
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(ByteArraySerializer.class)
@ConditionalOnProperty(value = "management.tracing.enabled", havingValue = "true")
public class TracingKafkaSenderConfiguration {

    private static final String SENDER_BEAN_NAME = "zipkinSender";

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties(KafkaProperties.class)
    static class TracingKafkaSenderBeanConfiguration {

        static String join(List<?> parts) {
            StringBuilder to = new StringBuilder();
            for (int i = 0, length = parts.size(); i < length; i++) {
                to.append(parts.get(i));
                if (i + 1 < length) {
                    to.append(',');
                }
            }
            return to.toString();
        }

        @Bean(SENDER_BEAN_NAME)
        Sender kafkaSender(KafkaProperties config, Environment environment) {
            String topic = environment.getProperty("management.tracing.kafka.topic", "topic");
            String serviceName = environment.getProperty("management.tracing.service.name", "kafka-sender");
            Map<String, Object> properties = config.buildProducerProperties();
            properties.put("key.serializer", ByteArraySerializer.class.getName());
            properties.put("value.serializer", ByteArraySerializer.class.getName());
            properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, serviceName);
            Object bootstrapServers = properties.get("bootstrap.servers");
            if (bootstrapServers instanceof List) {
                properties.put("bootstrap.servers", join((List) bootstrapServers));
            }
            return KafkaSender.newBuilder().topic(topic).overrides(properties).build();
        }
    }
}

  1. Change properties from 'spring.sleuth' to 'management.tracing'
management:
  tracing:
    enabled: true
    kafka:
      topic: topic
    service:
      name: kafka-sender
    sampling:
      probability: 0.1
    baggage:
      remote-fields:
        - field-one

Notice that management.kafka and management.service.name is specific config to use kafka and spring application service name.