Keep trace/span across CompletableFuture in Micrometer (micrometer-tracing-bridge-brave)

726 Views Asked by At

I am using Micrometer tracing library with Spring boot for distributed tracing. It works great with normal execution when we have execution in single threaded environment but when we use multithreading environment like CompletableFuture then the trace information is not passed to threads and respective log has empty traceId/spanId

gradle dependency

implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
 implementation 'io.micrometer:micrometer-tracing-bridge-brave'

MainApplication class

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

  @SpringBootApplication
@RestController
@Slf4j
public class MainApplication {

  @GetMapping("async")
  String helloAsync() {
    log.info("hello from rest endpoint[GET]");
    CompletableFutureUtil.performAsyncOperation();
    return "hi";
  }


  public static void main(String[] args) {
    SpringApplication.run(MainApplication.class, args);
  }
}

CompletableFutureUtil class

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CompletableFutureUtil {

  public static void performAsyncOperation() {
    // Create a CompletableFuture and simulate an async process
    CompletableFuture<String> future =
        CompletableFuture.supplyAsync(
            () -> {
              // Simulate the asynchronous process (replace with your actual logic)
              log.info("inside CompletableFuture.supplyAsync..");
              simulateAsyncOperation();
              log.info("returning from CompletableFuture.supplyAsync..");
              // Simulate a successful result
              return "Simulated result";
            });

    // Attach a callback for successful completion
    future.thenAccept(
        result -> {
          logInfo("Async process completed successfully. Result: " + result);
        });

    // Attach a callback for exceptional completion
    future.exceptionally(
        exception -> {
          logError("Async process failed. Reason: " + exception.getMessage());
          return null;
        });

    // Simulate some other work while the async process is ongoing
    logInfo("Some other work is being done...");

    logInfo("Main thread ended.");
  }

  private static void simulateAsyncOperation() {
    // Simulate some asynchronous operation (replace with your actual logic)
    logInfo("Simulating async operation...");
    try {
      TimeUnit.SECONDS.sleep(2); // Simulate some delay
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void logInfo(String message) {
    log.info("[INFO] " + message);
  }

  private static void logError(String message) {
    log.error("[ERROR] " + message);
  }
}

the logs generated when called HTTP:localhost:8080/async

2023-12-09T00:29:29.117+05:30 " INFO [content-publisher-service,6573679104ea6b2868558e3949953a1e,68558e3949953a1e]" 42181 --- [nio-8080-exec-5] c.c.a.c.p.service.BootsApplication       : hello from rest endpoint[GET]
2023-12-09T00:29:29.121+05:30 " INFO [content-publisher-service,6573679104ea6b2868558e3949953a1e,68558e3949953a1e]" 42181 --- [nio-8080-exec-5] c.c.a.c.p.s.util.CompletableFutureUtil   : [INFO] Some other work is being done...
2023-12-09T00:29:29.121+05:30 " INFO [content-publisher-service,,]" 42181 --- [onPool-worker-1] c.c.a.c.p.s.util.CompletableFutureUtil   : inside CompletableFuture.supplyAsync..
2023-12-09T00:29:29.122+05:30 " INFO [content-publisher-service,6573679104ea6b2868558e3949953a1e,68558e3949953a1e]" 42181 --- [nio-8080-exec-5] c.c.a.c.p.s.util.CompletableFutureUtil   : [INFO] Main thread ended.
2023-12-09T00:29:29.122+05:30 " INFO [content-publisher-service,,]" 42181 --- [onPool-worker-1] c.c.a.c.p.s.util.CompletableFutureUtil   : [INFO] Simulating async operation...
2023-12-09T00:29:31.122+05:30 " INFO [content-publisher-service,,]" 42181 --- [onPool-worker-1] c.c.a.c.p.s.util.CompletableFutureUtil   : returning from CompletableFuture.supplyAsync..
2023-12-09T00:29:31.122+05:30 " INFO [content-publisher-service,,]" 42181 --- [onPool-worker-1] c.c.a.c.p.s.util.CompletableFutureUtil   : [INFO] Async process completed successfully. Result: Simulated result
1

There are 1 best solutions below

2
On

You need to use such a method from CompletableFuture that accepts an Executor of some kind. Then use Micrometer's Context Propagation library to wrap the Executor. Example https://github.com/spring-cloud-samples/brewery/blob/1bfca4ed62bd3d3b7a7fb6f1b70179a0154382f7/brewing/src/main/java/io/spring/cloud/samples/brewery/aggregating/IngredientsAggregator.java#L45-L53

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            ingredientsCollector.collectIngredients(order, processId).stream()
                    .filter(ingredient -> ingredient != null)
                    .forEach((Ingredient ingredient) -> {
                        log.info("Adding an ingredient [{}] for order [{}] , processId [{}]", ingredient);
                        ingredientWarehouse.addIngredient(ingredient);
                    });
            return null;
        }, ContextSnapshot.captureAll().wrapExecutor(Executors.newFixedThreadPool(5)));