I have a proxy server made with Armeria which should start a download from a source and simultaneously stream it outwards to a destination. The file should be streaming as this is a proxy server and I don't want to make it store and forward. This app should handle a lot of traffic.
Any tips where should I be looking for doing it purely in Armeria?
I tried using AsyncHttpClient but i figure it's locking the main thread or something because the requests start accumulating.
All the code below is inside CompletableFuture.supplyAsync()
.
Is there a way I can fix this, or better yet, doing it purely in Armeria?
public HttpResponse doLogic(URL getUrl, String requestId, String key, Lock lock, String token) {
// ----
PipedOutputStream pout = new PipedOutputStream();
BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pout);
Future<Response> respFuture = asyncHttpClient.prepareGet(getUrl.toString()).execute(bodyDeferringAsyncHandler);
CompletableFuture<HttpResponse> r = CompletableFuture.supplyAsync(() -> {
try (PipedInputStream pin = new PipedInputStream(pout)) {
Response resp = bodyDeferringAsyncHandler.getResponse();
if (resp.getStatusCode() == 200) {
try (InputStream is = new BodyDeferringAsyncHandler.BodyDeferringInputStream(respFuture, bodyDeferringAsyncHandler, pin)) {
HttpHeaders headers = translateHeaders(resp.getHeaders());
if (headers.get("etag").contains("-")) {
// We remove the etag, since it comes from a multipart upload, it isn't an md5 hash of the file.
headers.remove("etag");
}
PipedOutputStream pout2 = new PipedOutputStream();
BodyDeferringAsyncHandler bodyDeferringAsyncHandler2 = new BodyDeferringAsyncHandler(pout2);
Future<Response> putResp = asyncHttpClient.preparePut(Config.getString("swift.proxy.base.url") + "v1/" + key)
.setHeaders(headers)
.addHeader("x-auth-token", token)
.setBody(is).execute(bodyDeferringAsyncHandler2);
Response putR = null;
try {
putR = bodyDeferringAsyncHandler2.getResponse();
if (putR.getStatusCode() != 201 && putR.getStatusCode() != 202) {
log.error("[requestId: " + requestId + "] swift returned " + putR.getStatusCode());
return unlockingResponse(putR.getStatusCode(), lock, key);
}
log.info("[requestId: " + requestId + "] File: " + key + " rollbacked!");
return unlockingResponse(200, lock, key);
} catch (InterruptedException e) {
e.printStackTrace();
}
return unlockingResponse(422, lock, key);
}
} else {
log.warn("[requestId: " + requestId + "] s3 came back with status code: " + resp.getStatusCode());
return unlockingResponse(200, lock, key);
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
return unlockingResponse(500, lock, key);
}
});
return HttpResponse.from(r);
}
I expect it to stream files from the source to the destination and be able to handle a lot of traffic.