Context:
To process a Flowable<Item>
, I need to first process the first item
and then depending on that either accumulate all items into a single item (reduce
) OR apply a simple map on each item without any accumulation (map
).
One way I can think of requires operator to be aware that current element is last element. Is there any such operator which is aware whether current element is last element ? I can't use buffer because then it'll always fetch 2 elements even when accumulation shouldn't be done.
AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
flatMap(item -> {
if(item.shouldBeAccumulated()) {
//Accumulate data into reference
itemRef.set(itemRef.get().addData(item.getData()));
//Return empty to throw away consumed item;
return Flowable.empty();
} else {
item.updateProperty();
return Flowable.just(item);
}
})
.applyIfLastElement(item -> {
if (item.shouldBeAccumulated()) {
return Flowable.just(itemRef.get());
}
})
Below is how you can do it (in RxJava 2.x which is very close to RxJava 3.x). The trick is to use
defer
(the best way to encapsulate state for a Flowable so that it can be subscribed to many times) andconcatWith
.defer
also enables lazy evaluation in the case oflast
. Notice also as a performance improvement that you may not care about I used one element arrays instead of AtomicReference objects (to avoid unnecessary volatile reads, sets etc).