i am facing a very weird situation with spring webflux, completablefuture and docker. please help and if you need more code please comment below.
what i am doing is connecting to an external api to fetch data from it. the code works fine in local but provides following error when mounted onto docker.
2024-02-19T14:23:00.328Z ERROR 1 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException: completableFuture completionException occured] with root cause
2024-02-19 23:23:00
2024-02-19 23:23:00 java.net.ConnectException: finishConnect(..) failed: Connection refused
2024-02-19 23:23:00 at io.netty.channel.unix.Errors.newConnectException0(Errors.java:166) ~[netty-transport-native-unix-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.unix.Errors.handleConnectErrno(Errors.java:131) ~[netty-transport-native-unix-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.unix.Socket.finishConnect(Socket.java:359) ~[netty-transport-native-unix-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:711) ~[netty-transport-classes-epoll-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:688) ~[netty-transport-classes-epoll-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:567) ~[netty-transport-classes-epoll-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) ~[netty-transport-classes-epoll-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[netty-transport-classes-epoll-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar!/:4.1.104.Final]
2024-02-19 23:23:00 at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
the above is the error that is provided. following is the code that triggers
public List<List<BaseResponseItem>> concurrentPreGetFromApi(Address address) {
// Create CompletableFutures for each API call
CompletableFuture<List<BaseResponseItem>> baseFuture = CompletableFuture.supplyAsync(() ->
baseApiService.fetchAllBaseData(address));
// Execute the other three tasks in the background after baseFuture is complete
CompletableFuture<Void> sideEffectTasks = baseFuture.thenRunAsync(() -> {
log.info("running start sideEffectTasks");
log.info("running start sideEffectTasks exposInfo");
CompletableFuture.supplyAsync(() -> {
try {
exposedInfoApiService.fetchAllExposedInfoData(address);
} catch (Exception e) {
log.error("Error in fetchAllExposedInfoData", e);
}
return null;
});
log.info("running start sideEffectTasks recaptitle");
CompletableFuture.supplyAsync(() -> {
recapTitleService.fetchAllRecapTitleData(address);
return null; // dummy result
});
CompletableFuture.supplyAsync(() -> {
titleApiService.fetchAllTitleData(address);
return null; // dummy result
});
});
try {
// Retrieve the results from the completed CompletableFutures
List<BaseResponseItem> baseResponseItemList = baseFuture.join();
// Your business logic with the retrieved lists
List<BaseResponseItem> baseResponseTitleItemList = baseApiService.fetchAllTitleBaseData(baseResponseItemList);
List<BaseResponseItem> baseResponseExposItemList = baseApiService.fetchAllExposInfoBaseData(baseResponseItemList);
if(baseResponseTitleItemList.isEmpty() && baseResponseExposItemList.isEmpty()) {
return new LinkedList<>(List.of(baseResponseItemList));
}
return new LinkedList<>(List.of(baseResponseTitleItemList,baseResponseExposItemList));
}
catch (CompletionException e) {
throw new RuntimeException("completableFuture completionException occured",e);
}
}
each of the completableFuture runs a block of code similar to the following
@Cacheable(value= "fetchAllBaseData")
public List<BaseResponseItem> fetchAllBaseData(Address address) {
// Check if the data is already in the cache
Cache.ValueWrapper valueWrapper = Objects.requireNonNull(cacheManager.getCache("fetchAllBaseData")).get(address);
// Use the address as a key for the lock
ReentrantLock fetchLock = addressLocks.computeIfAbsent(address, k -> new ReentrantLock());
// Acquire the lock
fetchLock.lock();
try {
// Double-check if the value is inside the cache
if (valueWrapper != null) {
// Data is updated in cache
return (List<BaseResponseItem>) valueWrapper.get();
}
// Fetch data from the API
return fetchDataFromApi(address);
} finally {
// Release the lock
fetchLock.unlock();
addressLocks.remove(address, fetchLock);
}
}
public List<BaseResponseItem> fetchDataFromApi(Address address) {
int total = totalPages(address);
log.warn("totalPages : {}", total);
return Flux.range(1, total)
.flatMap(page -> {
log.info("currpage {}", page);
return fetchPageData(address, page);
}, CONCURRENCYLIMIT) // Fetch data for each page
.collectList()
.block(); // Block and wait for the result
}
private Flux<BaseResponseItem> fetchPageData(Address address, int pageNo) {
WebClient.RequestHeadersSpec<?> request = apiService.getRequestHeadersSpec(address, BaseApiServiceAsync.URI, pageNo);
Mono<BaseApiResponse> apiResponseMono = request.retrieve().bodyToMono(BaseApiResponse.class);
return apiResponseMono
// .doOnNext(apiResponse -> log.info("Received API response: {}", apiResponse))
.flatMapMany(apiResponse -> Flux.fromIterable(apiResponse.getResponse().getBody().getItems().getItem()))
.doOnError(error -> {
log.error("Error fetching BASE data for page {}", pageNo, error);
if (error instanceof WebClientResponseException) {
String responseBody = ((WebClientResponseException) error).getResponseBodyAsString();
String errorCode = ((WebClientResponseException) error).getStatusCode().toString();
log.error("Response body: {} . errorcode: {}", responseBody, errorCode);
}
}).retryWhen(Retry.backoff(3, Duration.ofSeconds(5))
.filter(throwable -> throwable instanceof WebClientException)
);
}
private Integer totalPages(Address address) {
WebClient.RequestHeadersSpec<?> request = apiService.getRequestHeadersSpec(address, URI, 1);
String responseString = request.retrieve().bodyToMono(String.class).block();
log.info("totalPages at {}, ", responseString);
BaseApiResponse apiResponse = request.retrieve().bodyToMono(BaseApiResponse.class).block();
assert apiResponse != null;
int totalCount = apiResponse.getResponse().getBody().getTotalCount();
int repeats = (int) Math.ceil((double) totalCount / WebClientApiService.SAFE_QUERY);
log.warn("totalcount {}, repeats : {}", totalCount, repeats);
return repeats;
}
this is the dockerfile used to create the image
# Stage 1: Build JRE
FROM eclipse-temurin:21 as jre-build
#RUN $JAVA_HOME/bin/jlink \
# --add-modules ALL-MODULE-PATH \
# --strip-debug \
# --no-man-pages \
# --no-header-files \
# --compress=2 \
# --output /javaruntime
# Stage 2: Final Image
FROM ubuntu:latest
# Install required packages in a single RUN command
RUN apt-get update \
&& apt-get upgrade -y \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y curl \
&& apt-get clean && apt-get autoclean && apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*
# Set Java environment variables
ENV JAVA_HOME=/opt/java/openjdk
ENV PATH "${JAVA_HOME}/bin:${PATH}"
# Copy JRE from the build stage
# COPY --from=jre-build /javaruntime $JAVA_HOME
COPY --from=jre-build /opt/java/openjdk $JAVA_HOME
# Create app directory
RUN mkdir /opt/app
# Copy the JAR file to the app directory
ARG JAR_FILE=*.jar
COPY ${JAR_FILE} /opt/app/firefacilauto.jar
# Copy entrypoint script
COPY entrypoint.sh /entrypoint.sh
# Give execute permission to the entrypoint script
RUN chmod +x /entrypoint.sh
# Set entry point for the application
ENTRYPOINT ["/entrypoint.sh"]
# Expose ports
EXPOSE 8080 80
# Add metadata to the image
LABEL authors="user"
entrypoint.sh has the following code
#!/bin/bash
echo "nameserver 8.8.8.8" >> /etc/resolv.conf
exec java -jar /opt/app/firefacilauto.jar
network connected has the following. redis cache -> using @Cacheable also uses the same network
"Networks": {
"RedisForSpringBoot": {
"IPAMConfig": null,
"Links": null,
"Aliases": [
"78c9aa26a371"
],
"MacAddress": "02:42:ac:12:00:03",
"NetworkID": "17bc7512075a0643987824388acb2da06e0b81fe18f5614df3f5f4c24fe22167",
"EndpointID": "a9d6d250881aa521379a9c66a5bee1596daf5b27480313a4bd1f2eb30e661803",
"Gateway": "172.18.0.1",
"IPAddress": "172.18.0.3",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"DriverOpts": null,
"DNSNames": [
"firefacilauto",
"78c9aa26a371"
]
}
}
very strange as when i tried curl to the external service in exec, the request can pass through to host but webclient cannot. i did not configure any proxies so if there is one it comes out implicit but i am not too sure about that.
have been struggling on this for the past 36 hours. send help
explained above,
tldr:
- rebuilt docker image commenting out various portions of the dockerfile code
- tried curl to the actual service to test connectivity which passed so it excludes docker network issues
- enabled netty.io logging at debug to receive debug message which is too verbose so is left out, will be provided upon request