I have an Observable<List<Event>>
and I want this Observable to be shared by multiple Subscribers.
Each Subscriber will filter each Event and process it.
The Observable<List<Event>>
has been created this way :
@Override
public List<Event> findNewEvents() {
List<Event> results = new ArrayList<>();
while(! fetchedEvents.isEmpty()) {
results.add(fetchedEvents.poll());
}
return results;
}
@Override
public Observable<List<Event>> findNewObservableEvents() {
return Observable.just(findNewEvents());
}
Here is the code :
Observable<List<Event>> newEvents = reader.findNewObservableEvents();
Disposable riskApproveRiskEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(risk::isForRiskApproval)
.subscribe(risk::approveRisk);
Disposable fundingCheckFundabilityEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFundabilityCheck)
.subscribe(funding::checkFundability);
Disposable fundingFundEventsDisposable = newEvents.flatMapIterable(riskEvents -> riskEvents)
.flatMap(Observable::just)
.filter(funding::isForFundingFund)
.subscribe(funding::fund);
I have tried :
newEvents.share()
and also newEvents.publish()
.
When trying : newEvents.create()
I need to supply a ObservableOnSubscribe
object, but I don't understand how to obtain it.
What is the trick ?
If you don't want to consume
findNewObservableEvents
multiple times then usepublish
and once the subscribers have subscribed, callconnect
on theConnectableObservable
: