Usage of elapsed() function work on Mono?

4.3k Views Asked by At

I am trying to get execution time for reading from redis in reactive programming, on looking up docs I am able to see that elapsed() method will does the same and implemented code as below.

Flux.fromIterable(getActions(httpHeaders))
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(actionFact -> methodToReadFromCache(actionFact))
                .sequential();

public Mono<ActionFact> methodToReadFromCache(actionFact) {
    return Mono.fromCallable(() -> getKey(actionFact))
                .flatMap(cacheKey ->
                  redisOperations.hasKey(key)
                                .flatMap(aBoolean -> {
                                    if (aBoolean) {
                                        return redisOperations.opsForValue().get(cacheKey);
                                    }
                                    return authzService.getRolePermissions(actionFact)
                                            .flatMap(policySetResponse ->
                                                    //save in cache
                                            );
                                })
                                .elapsed()
                                .flatMap(lambda -> {
                                    LOG.info("cache/service processing key:{}, time:{}", key, lambda.getT1());
                                    return Mono.just(lambda.getT2());
                                });

Output:

cache/service processing key:KEY1, time:3 
cache/service processing key:KEY2, time:4 
cache/service processing key:KEY3, time:18 
cache/service processing key:KEY4, time:34 
cache/service processing key:KEY5, time:46 
cache/service processing key:KEY6, time:57 
cache/service processing key:KEY7, time:70 
cache/service processing key:KEY8, time:81 
cache/service processing key:KEY9, time:91 
cache/service processing key:KEY10, time:103
cache/service processing key:KEY11, time:112
cache/service processing key:KEY12, time:121
cache/service processing key:KEY13, time:134
cache/service processing key:KEY14, time:146
cache/service processing key:KEY15, time:159

I am expecting that time taken for each of the cache request will be <5 milliseconds like first and second request but not the case. Does elapsed() add current fetching time to the cummulative? As per my understanding each item emmitted from flux is independent?

2

There are 2 best solutions below

1
On

as per the documentation

I want to associate emissions with a timing (Tuple2<Long, T>) measured…​

  • since subscription: elapsed

  • since the dawn of time (well, computer time): timestamp

elapsed is measured time since subscription. So you subscribe and it starts emitting, the time will increase the longer since you subscribed to your service.

official docs

7
On

Mono#elapsed measures the time between when the Mono is subscribed to and the moment the Mono emits an item (onNext).

What causes the subscription and the start of the timer, in your case, is the outer parallelized flatMap that calls methodToReadFromCache.

What causes the onNext and thus what is timed is the combination of hasKey and the if/else part (redisOperations.opsForValue().get(cacheKey) vs authzService).

The outer flatMap should at least as many timers as there are CPUs, since we're in parallel mode.

But the fact that the timings are skewed could hint at the fact that something is either blocking or has limited capacity. For example, could it be that the redisTemplate can only process a few keys at a time?