I'm looking for a way to aggregate multiple Files in a Zip without loading all of them in completly in memory or in the filesystem. I get this Files as Flux from multiple WebClient calls like this:

  public Mono<ResponseEntity<Flux<DataBuffer>>> getDocumentContentAsDataBuffer(
      String objectId) {
    return momentumApiClient
        .get()
        .uri("/objects/" + objectId + "/contents/file")
        .retrieve()
        .toEntityFlux(DataBuffer.class);
  }

Then I call this endpoint for multiple objects and save the result into a BufferedFile object. The BufferedFile object only contains the filename and the Flux for the stream. Some objects don't have contents. Then the Flux is empty and a folder will be created instead.

    List<BufferedFile> zipDataBuffers =
        objects.stream()
            .map(
                o ->
                    new BufferedFile(
                        (objectMap.get(o).getContentStreams() != null
                                && !objectMap.get(o).getContentStreams().isEmpty())
                            ? client
                                .getDocumentContentAsDataBuffer(
                                    objectMap.get(o).getObjectId(), jwt)
                                .flatMapMany(
                                    response ->
                                        response.getBody() != null
                                            ? response.getBody()
                                            : Flux.empty())
                            : Flux.empty(),
                        generatePath(objectMap, o)))
            .toList();

    // create zip
    return ZipDataBuffer.toFlux(zipDataBuffers);

The ZipDataBuffer.toFlux should then aggregate all Files and Folders into a zip and deliver it to the client.

The only working solution I found so far is downloading and writing the whole file to the ZipOutputStream. So I don't have a problem with 1000s of small files in memory but big files will be loaded completely into memory.

ZipDataBuffer looks like this:

import java.io.InputStream;
import java.util.List;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.SneakyThrows;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ZipDataBuffer {
  final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
  final DefaultDataBuffer buffer = factory.allocateBuffer(1024);
  final ZipOutputStream zos = new ZipOutputStream(buffer.asOutputStream());

  public static Flux<DataBuffer> toFlux(List<BufferedFile> resources) {
    ZipDataBuffer zipDataBuffer = new ZipDataBuffer();
    return Flux.fromIterable(resources)
        .flatMap(zipDataBuffer::write)
        .concatWith(Mono.fromCallable(zipDataBuffer::finish));
  }

  public Mono<DataBuffer> write(BufferedFile bufferedFile) {
    return DataBufferUtils.join(bufferedFile.getFile())
        .defaultIfEmpty(factory.allocateBuffer(0))
        .map(
            dataBuffer -> {
              write(bufferedFile.getFilename(), dataBuffer);
              DataBufferUtils.release(dataBuffer);
              return read();
            });
  }

  @SneakyThrows
  private synchronized void write(String filename, DataBuffer dataBuffer) {
    ZipEntry zipEntry = new ZipEntry(filename);
    zos.putNextEntry(zipEntry);
    try (InputStream is = dataBuffer.asInputStream()) {
      is.transferTo(zos);
    }
    zos.closeEntry();
  }

  private DataBuffer read() {
    return buffer.split(buffer.writePosition());
  }

  @SneakyThrows
  public DataBuffer finish() {
    zos.finish();
    zos.close();
    return read();
  }
}

So the ZipDataBuffer can't fullfill my needs. If I try to download a 50GB file, then it will crash with OutOfMemory. I tried multiple strategies but a main Problem is, that the Client download is slower then the download of my applications WebClient. So the buffer of my application fills up very quickly and it crashes.

Is there a solution to only request data from the Flux of the WebClient if there is space in my buffer? This is my first problem that I didn't get solved and I hardly believe that there is a solution at all.

Minimal reproducible example

import java.io.IOException;
import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@RestController
public class MinimalWorkingExample {
  final WebClient webclient;

  public MinimalWorkingExample(WebClient.Builder webclientBuilder) {
    this.webclient = webclientBuilder.build();
  }

  // example one where the complete file is written to the zip file buffer and then read
  @GetMapping(value = "/exampleOutOfMemory", produces = "application/zip")
  public Flux<DataBuffer> exampleOutOfMemory() {
    final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
    final DefaultDataBuffer outputBuffer = factory.allocateBuffer(1024);
    final ZipOutputStream zos = new ZipOutputStream(outputBuffer.asOutputStream());
    // aggregate all Databuffers into one
    return DataBufferUtils.join(
            webclient
                .get()
                .uri("https://files.icyflamestudio.com/50MB.zip")
                //.uri("http://speedtest.tele2.net/50GB.zip")
                .retrieve()
                .bodyToFlux(DataBuffer.class))
        .map(
            // write the aggregated Databuffer to a zip file
            databuffer -> {
              write("testfile", zos, databuffer);
              DataBufferUtils.release(databuffer);
              // Problem: will read after the complete file has been written to the zip file buffer
              // I want a method which returns this buffer for example every 1024 bytes
              return read(outputBuffer);
            })
        .concatWith(Mono.fromCallable(() -> finish(zos, outputBuffer)));
  }

  // example two where every Databuffer is written to the zip file buffer and then read
  // This will never finish for big files
  @GetMapping(value = "/exampleCorruptFile", produces = "application/zip")
  public Flux<DataBuffer> exampleCorruptFile() {
    final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
    final DefaultDataBuffer outputBuffer = factory.allocateBuffer(1024);
    final ZipOutputStream zos = new ZipOutputStream(outputBuffer.asOutputStream());
    // call a function where every Databuffer is written to the zip output Stream buffer and will be
    // read immediately after
    return write(
            "testfile",
            zos,
            outputBuffer,
        webclient
            .get()
            //.uri("http://speedtest.tele2.net/50GB.zip")
            .uri("https://files.icyflamestudio.com/50MB.zip")
            .retrieve()
            .bodyToFlux(DataBuffer.class))
        .concatWith(Mono.fromCallable(() -> finish(zos, outputBuffer)));
  }

  @SneakyThrows
  private synchronized void write(String filename, ZipOutputStream zos, DataBuffer dataBuffer) {
    ZipEntry zipEntry = new ZipEntry(filename);
    zos.putNextEntry(zipEntry);
    try (InputStream is = dataBuffer.asInputStream()) {
      is.transferTo(zos);
    }
    zos.closeEntry();
  }

  private Flux<DataBuffer> write(
      String filename,
      ZipOutputStream zos,
      DataBuffer outputDataBuffer,
      Flux<DataBuffer> inputDataBuffer) {
    return inputDataBuffer
        .publishOn(Schedulers.boundedElastic())
        .map(
            inputBuffer -> {
              try (InputStream is = inputBuffer.asInputStream()) {
                is.transferTo(zos);
              } catch (IOException e) {
                throw new RuntimeException(e);
              }
              return read(outputDataBuffer);
            })
        .doOnSubscribe(
            subscription -> {
              ZipEntry zipEntry = new ZipEntry(filename);
              try {
                zos.putNextEntry(zipEntry);
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
            })
        .doOnTerminate(
            () -> {
              try {
                zos.closeEntry();
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
            });
  }

  private DataBuffer read(DataBuffer buffer) {
    return buffer.split(buffer.writePosition());
  }

  @SneakyThrows
  public DataBuffer finish(ZipOutputStream zos, DataBuffer buffer) {
    zos.finish();
    zos.close();
    return read(buffer);
  }
}
0

There are 0 best solutions below