The goal of this function is to create a stream that emits values periodically until it encounters one that matches a predicate.
Here is some skeleton code that I've come up with:
class Watcher<T : Any>(
/**
* Emits the data associated with the provided id
*/
private val callable: (id: String) -> T,
/**
* Checks if the provided value marks the observable as complete
*/
private val predicate: (id: String, value: T) -> Boolean
) {
private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()
fun watch(id: String): Observable<T> {
// reuse obesrvable if exists
val existing = watchPool[id]
if (existing != null)
return existing
val value = callable(id)
if (predicate(id, value)) return Observable.just(value)
// create new observable to fetch until complete,
// then remove from the map once complete
val observable = Observable.fromCallable<T> {
callable(id)
}.repeatWhen { /* What to put here? */ }.doOnComplete {
watchPool.remove(id)
}.distinctUntilChanged()
watchPool[id] = observable
return observable
}
}
As an example, if I have the following enums:
enum class Stage {
CREATED, PROCESSING, DELIVERING, FINISHED
}
And some callable that will retrieve the right stage, I should be able to pass the callable and a predicate checking if stage == FINISHED
, and poll until I get the FINISHED
event.
The issue I have is in generating an observable when the event received is not a final event. In that case, the observable should continue to poll for events until either it receives an event matching the predicate or until it has no more subscribers.
This observable should:
- Not poll until it receives at least one subscriber
- Poll every x seconds
- Mark itself as complete if
predicate
returns true - Complete itself if it ever goes from >0 subscribers to 0 subscribers
The use of watch pools is simply to ensure that two threads watching the same id will not poll twice the number of times. Removal of observables from the map is also just so it doesn't pile up. For the same reason, observables that emit just one variable are not stored for reference.
How do I go about adding the functionality for the points added above? I will link to one existing RxJava Github issue that I found useful, but from what I'm aware, it doesn't allow for predicates dealing with the value emitted by the callable.
I ended up with just using
takeUntil
, and using the observal's interval method to poll.