How do I check in an operator that current element is last element?

222 Views Asked by At

Context: To process a Flowable<Item>, I need to first process the first item and then depending on that either accumulate all items into a single item (reduce) OR apply a simple map on each item without any accumulation (map).

One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
    flatMap(item -> {
        if(item.shouldBeAccumulated()) {
            //Accumulate data into reference
            itemRef.set(itemRef.get().addData(item.getData()));
            //Return empty to throw away consumed item;
            return Flowable.empty();
        } else {
            item.updateProperty();
            return Flowable.just(item);
        }
    })
    .applyIfLastElement(item -> {
        if (item.shouldBeAccumulated()) {
            return Flowable.just(itemRef.get());
        }
    })
1

There are 1 best solutions below

0
On BEST ANSWER

Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use defer (the best way to encapsulate state for a Flowable so that it can be subscribed to many times) and concatWith. defer also enables lazy evaluation in the case of last. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).

Flowable<Integer> result = Flowable.defer(() -> {
    boolean[] isFirst = new boolean[] { true };
    Integer[] state = new Integer[1];
    Maybe<Integer> last = Maybe.defer(() -> {
        if (state[0] == null) {
            return Maybe.empty();
        } else {
            return Maybe.just(state[0]);
        }
    });
    return source //
            .flatMap(x -> {
                if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
                        // accumulate
                        state[0] = state[0] == null ? 0 : state[0] + x;
                        isFirst[0] = false;
                        return Flowable.empty();
                    } else {
                        isFirst[0] = false;
                        return Flowable.just(x);
                    }
                })
            .concatWith(last);
    });