RxObservable that repeats itself until an expected value is found

166 Views Asked by At

The goal of this function is to create a stream that emits values periodically until it encounters one that matches a predicate.

Here is some skeleton code that I've come up with:

class Watcher<T : Any>(
        /**
         * Emits the data associated with the provided id
         */
        private val callable: (id: String) -> T,
        /**
         * Checks if the provided value marks the observable as complete
         */
        private val predicate: (id: String, value: T) -> Boolean
) {

    private val watchPool: MutableMap<String, Observable<T>> = ConcurrentHashMap()

    fun watch(id: String): Observable<T> {
        // reuse obesrvable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = callable(id)
        if (predicate(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.fromCallable<T> {
            callable(id)
        }.repeatWhen { /* What to put here? */ }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged()
        watchPool[id] = observable
        return observable
    }

}

As an example, if I have the following enums:

enum class Stage {
    CREATED, PROCESSING, DELIVERING, FINISHED
}

And some callable that will retrieve the right stage, I should be able to pass the callable and a predicate checking if stage == FINISHED, and poll until I get the FINISHED event.

The issue I have is in generating an observable when the event received is not a final event. In that case, the observable should continue to poll for events until either it receives an event matching the predicate or until it has no more subscribers.

This observable should:

  • Not poll until it receives at least one subscriber
  • Poll every x seconds
  • Mark itself as complete if predicate returns true
  • Complete itself if it ever goes from >0 subscribers to 0 subscribers

The use of watch pools is simply to ensure that two threads watching the same id will not poll twice the number of times. Removal of observables from the map is also just so it doesn't pile up. For the same reason, observables that emit just one variable are not stored for reference.

How do I go about adding the functionality for the points added above? I will link to one existing RxJava Github issue that I found useful, but from what I'm aware, it doesn't allow for predicates dealing with the value emitted by the callable.

1

There are 1 best solutions below

0
On

I ended up with just using takeUntil, and using the observal's interval method to poll.

abstract class RxWatcher<in T : Any, V : Any> {

    /**
     * Emits the data associated with the provided id
     * At a reasonable point, emissions should return a value that returns true with [isCompleted]
     * This method should be thread safe, and the output should not depend on the number of times this method is called
     */
    abstract fun emit(id: T): V

    /**
     * Checks if the provided value marks the observable as complete
     * Must be thread safe
     */
    abstract fun isCompleted(id: T, value: V): Boolean

    /**
     * Polling interval in ms
     */
    open val pollingInterval: Long = 1000

    /**
     * Duration between events in ms for which the observable should time out
     * If this is less than or equal to [pollingInterval], it will be ignored
     */
    open val timeoutDuration: Long = 5 * 60 * 1000

    private val watchPool: MutableMap<T, Observable<V>> = ConcurrentHashMap()

    /**
     * Returns an observable that will emit items every [pollingInterval] ms until it [isCompleted]
     *
     * The observable will be reused if there is polling, so the frequency remains constant regardless of the number of
     * subscribers
     */
    fun watch(id: T): Observable<V> {
        // reuse observable if exists
        val existing = watchPool[id]
        if (existing != null)
            return existing
        val value = emit(id)
        if (isCompleted(id, value)) return Observable.just(value)
        // create new observable to fetch until complete,
        // then remove from the map once complete
        val observable = Observable.interval(pollingInterval, TimeUnit.MILLISECONDS, Schedulers.io()).map {
            emit(id)
        }.takeUntil {
            isCompleted(id, it)
        }.doOnComplete {
            watchPool.remove(id)
        }.distinctUntilChanged().run {
            if (timeoutDuration > pollingInterval) timeout(timeoutDuration, TimeUnit.MILLISECONDS)
            else this
        }
        watchPool[id] = observable
        return observable
    }

    /**
     * Clears the observables from the watch pool
     * Note that existing subscribers will not be affected
     */
    fun clear() {
        watchPool.clear()
    }

}