using countDownLatch.await() to make sure result is delivered

929 Views Asked by At

Full source code can be found here : https://github.com/alirezaeiii/SavingGoals-Cache

This is LocalDataSource class :

@Singleton
class QapitalLocalDataSource @Inject constructor(
    private val goalsDao: GoalsDao
) : LocalDataSource {

    override fun getSavingsGoals(): Single<List<SavingsGoal>> =
        Single.create { singleSubscriber ->
            goalsDao.getGoals()
                .subscribe {
                    if (it.isEmpty()) {
                        singleSubscriber.onError(NoDataException())
                    } else {
                        singleSubscriber.onSuccess(it)
                    }
                }
        }
}

Above Method has been used in Repository class :

@Singleton
class GoalsRepository @Inject constructor(
    private val remoteDataSource: QapitalService,
    private val localDataSource: LocalDataSource,
    private val schedulerProvider: BaseSchedulerProvider
) {

    private var cacheIsDirty = false

    fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        lateinit var goals: Observable<List<SavingsGoal>>
        if (cacheIsDirty) {
            goals = getGoalsFromRemoteDataSource()
        } else {
            val latch = CountDownLatch(1)
            var disposable: Disposable? = null
            disposable = localDataSource.getSavingsGoals()
                .observeOn(schedulerProvider.io())
                .doFinally {
                    latch.countDown()
                    disposable?.dispose()
                }.subscribe({
                    goals = Observable.create { emitter -> emitter.onNext(it) }
                }, { goals = getGoalsFromRemoteDataSource() })
            latch.await()
        }
        return goals
    }
}

As you see I am using countDownLatch.await() to make sure result is emmited in subscribe or error block. Is there any better solution than using CountDownLatch while using RxJava?

1

There are 1 best solutions below

0
On BEST ANSWER

latch.await() blocks the thread which kinda defeats the whole point of using an async API like RxJava.

RxJava has APIs like onErrorResumeNext to handle exceptions and toObservable to convert a Single result to an Observable result.

Also, RxJava types like this are typically intended to be cold (they don't run or figure anything out until you subscribe) so I'd recommend not checking cacheIsDirty until the subscription happens.

I'd go with something like:

    fun getSavingsGoals(): Observable<List<SavingsGoal>> {
        return Observable
            .fromCallable { cacheIsDirty }
            .flatMap {
                if (it) {
                    getGoalsFromRemoteDataSource()
                } else {
                    localDataSource.getSavingsGoals()
                        .toObservable()
                        .onErrorResumeNext(getGoalsFromRemoteDataSource())
                }
            }
    }

Btw, if you are already using Kotlin, I highly recommend coroutines. Then you async code ends up reading just like regular sequential code.