Get to know that the async receiving data is finished in Java

1.7k Views Asked by At

I have a two microservices in Spring. The first one, lets call him the DataDeliverer is collecting a bunch of data from the internet and he is sending this data as json to the second microservices called, for example, Receiver. Sending is divided on parts. The number of parts is random. Receiver and also DataDeliver never knows how much parts there would be. Receiver is receiving parts and saving them asynchronously using Spring's annotation @Async for each part separately. This creates the problem for me, becouse I have to get to know that sending is completed and also receiver finished receiving.

For this moment I figured out sth like below: Add java.util.Map with object as a key and flag isFinished as a value. Then I would know if saving single object(part) is finished. When DataDeliverer would end sendig data, he will send information about it. When Receiver would get this information will just wait when all flags in Map will show that all objects are saved. Then collecting data would be done. What do you think about this solution? I would appreciate any tip. Thanks

Below is simple outline of current situation. Dont mind java correctness. It's pseudo java to easier display the problem.

public class DataDeliver {

  public void sendData(List<Data> listWithRandomSize) {

    List<Data> listToSend = new ArrayList<>();
    int i = 0;  
    listWithRandomSize.forEach(data -> {
        listToSend.add(data);
        i++;
        if (i%10 == 0) {
            restTemplate.postForObject("http://receiver:8080/save/", listToSend);
            listToSend = new ArrayList<>();
            i = 0;
        }
    }           
  }
}

public class SomeReceiverService {

    @Async
    public void save(List<Data> partOfData) {
        database.save(partOfData);  
    }
}
1

There are 1 best solutions below

3
On

In your asynchronous method you can return an object of type org.springframework.util.concurrent.ListenableFuture<T> or with Java8 java.util.concurrent.CompletableFuture<T>. This return value is returned by the async method immediately and can be used to registers callbacks/handlers for failure and/or success.

Basically your components should look like this

@Component
public class SomeService {

  @Async
  public CompletableFuture<String> someServiceMethod() {
      // do something async
      return CompletableFuture.completedFuture(someResult);
  }
}

And your client like this:

public class SomeClient {
  @Autowired
  private SomeService mSomeService;

  public void someClientMethod() {
      //Function to process result of async call
      final Function<String, String> resultHandler = null;
      mSomeService.someServiceMethod().thenApply(resultHandler);
  }
}

See the async example on spring.io for a complete reference.


Given your example code each of the calls to SomeReceiverService.save will return a new CompletableFuture (actually as of now AsyncRestTemplate seems to support ListenableFuture only - you might need to convert them to CompletableFuture to benefit from its join functionality) you can collect these futures and wait for all of them to finish:

List<CompletableFuture<Void>> allFutures = new ArrayList<>();
listWithRandomSize.forEach(data -> {
    listToSend.add(data);
    i++;
    if (i%10 == 0) {
        ListenableFuture<Void> futureResult = restTemplate.postForObject("http://receiver:8080/save/", listToSend);
        allFutures.add(convertToCompletableFuture(futureResult));
        listToSend = new ArrayList<>();
        i = 0;
    }
}

// sending is finished when reaching this point
CompletableFuture joinedFuture = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[allFutures.size()]));
joinedFuture.join();

//at this point save will have finished for all chunks
for (CompletableFuture future : allFutures) {
    furture.get();
}