I have a class, DatabaseContextHolder, that manages the data source based on the user's API key using ThreadLocal.
public final class DatabaseContextHolder {
private static final ThreadLocal<Stack<DataSourceType>> ctx = new ThreadLocal<>();
public static void setCtx(DataSourceType dataSourceType) {
getCtx().push(dataSourceType);
}
public static void restoreCtx() {
final Stack<DataSourceType> ctx = getCtx();
if (!ctx.isEmpty()) {
ctx.pop();
}
}
public static Optional<DataSourceType> peekDataSource() {
final Stack<DataSourceType> ctx = getCtx();
if (ctx.isEmpty()) {
return Optional.empty();
}
return Optional.of(ctx.peek());
}
private static Stack<DataSourceType> getCtx() {
if (ctx.get() == null) {
ctx.set(new Stack<>());
}
return ctx.get();
}
}
This works fine without multi-threading. However, when I use CompletableFuture, the ctx field is always empty inside the asynchronous tasks. Example of methods with CompletableFuture:
private final ExecutorService executorService = Executors.newCachedThreadPool();
public ResponseEntity<Map<String, List<ReportGroupScheduleExecutionBean>>> getScheduleExecutionByGroup(@RequestParam(value = "accountId") int accountId,
@RequestParam("dateFrom") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateFrom,
@RequestParam("dateTo") @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime dateTo,
@RequestParam(name = "groupIds") Set<Integer> groupIds,
@AuthenticationPrincipal UserBean userBean) throws NoDataException {
ZoneId userTimeZone = ZoneId.of(userBean.getUserTimeZone());
DateTimeFormatter zoneTimeFormatter = DateTimeFormatter.ofPattern(ModelsConstants.PATTERN_TIME_ONLY_MINUTES).withZone(userTimeZone);
AccountEntity account = accountService.findById(accountId).orElseThrow(() -> new NoDataException("Account not exist"));
List<CompletableFuture<List<ReportGroupScheduleExecutionBean>>> futures = new ArrayList<>();
groupIds.forEach(id -> {
CompletableFuture<List<ReportGroupScheduleExecutionBean>> future = CompletableFuture.supplyAsync(
() -> {
try {
return reportService.getRouteScheduleExecutionReportByGroup(new ReportInfoBean(
dateFrom, dateTo, account, userBean.getUserTimeZone(), zoneTimeFormatter), id);
} catch (NoDataException e) {
throw new RuntimeException(e);
}
}, executorService);
futures.add(future);
});
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<ReportGroupScheduleExecutionBean>> futuresAll = allFutures.thenApply
(v -> futures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList())
);
try {
return new ResponseEntity<>(futuresAll.get().stream().collect(Collectors.groupingBy(ReportGroupScheduleExecutionBean::getAccountName)), HttpStatus.OK);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
What I've Tried:
- mark all methods as synchronized
- tried different executors for CompletableFuture
How can I ensure that the ThreadLocal context is propagated correctly to threads spawned by CompletableFuture?
You seem to have altogether the wrong idea about
ThreadLocal. The whole point is that a givenThreadLocalobject holds a different, independent value for each thread. Whatever value one threadset()s on it, that thread alone canget()back from it. The objective is to avoid sharing data between threads.Thus ...
Yes! The asynchronous tasks run in different threads from the one in which you set a value in the
ThreadLocal, so they are not supposed to see that value in theThreadLocal. Nor a copy of it or any such thing, either. They get either its initial value or whatever value they last set themselves.You can partially break that by causing your
ThreadLocalinstance to provide the same object as its initial value to all threads, but that is inappropriate under most circumstances. If you want a shared object then dispense with theThreadLocaland set up such an object directly.ThreadLocaldoes not do anything useful for you in that case. In particular, it does not provide any kind of synchronization for a shared object exposed as its initial value to multiple threads.It seems unlikely that you want thread-specific
Stacks that are not also task-specific, but if you did want that then you could avoid each thread having toset()its own by configuring aThreadLocalwith an initial value supplier, viawithInitial():Again, I don't think that's what you actually want, but I include it for completeness.