Combine 2 data sources in RxJava with possibility to fetch new data from 1 of the observables

55 Views Asked by At

I'm currently working on an Android app where I'm using RxJava in a ViewModel to handle data from both a local Room database and a remote API (Retrofit). I have implemented a pull-to-refresh feature, and in the onRefresh method, I'm currently creating a new observable to fetch data from the remote API.

The problem is I want the combined observable to be triggered when user does pull-to-refresh, but it's not triggered.

What may be the issue?

@HiltViewModel
class MyViewModel @Inject constructor(
    private val remoteDataSource: RemoteDataSource,
    private val localDataSource: LocalDataSource,
) {

    private val remoteDataObservable = BehaviorSubject.createDefault<List<Data>>(emptyList())

    init {
        disposables += Observable.combineLatest(
            localDataSource.getData(),
            remoteDataObservable,
            { local, remote ->
                local.plus(remote)
            }
        )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                displayList(it)
            }
        fetchLatestData()
    }

    fun onPullToRefresh() {
        fetchLatestData()
    }
    
    private fun fetchLatestData() {
        remoteDataSource.fetchLatestData()
            .subscribeOn(Schedulers.io())
            .subscribe(remoteDataObservable)
    }

    private fun displayList(list: List<Data>) {
        // Display list
    }

}
2

There are 2 best solutions below

0
Mikhail Guliaev On BEST ANSWER

I've managed to fix my problem by changing a way to subscribe to the Observable, which fetches data from the API. Now I created a new subscriber, end emit data to the BehaviorSubject manually. I'm not satisfied with the solution, but nothing better came to my mind:

private fun fetchLatestData() {
    disposables += remoteDataSource.fetchLatestData()
        .subscribeOn(Schedulers.io())
        .subscribe({
            remoteDataObservable.onNext(it)
        }, {})
}
3
rarespalasan On

The problem I see is that your fetchLatestData method returns an Observable. I would suggest having something like this:

@HiltViewModel
class MyViewModel @Inject constructor(
    private val remoteDataSource: RemoteDataSource,
    private val localDataSource: LocalDataSource,
) {

    init {
        disposables += Observable.combineLatest(
            localDataSource.getData(),
            remoteDataSource.data,
            { local, remote ->
                local.plus(remote)
            }
        )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                displayList(it)
            }
        fetchLatestData()
    }

    fun onPullToRefresh() {
        fetchLatestData()
    }
    
    private fun fetchLatestData() {
        remoteDataSource.fetchLatestData()
    }

Your RemoteDataSource class would look something like this:

class RemoteDataSource {

    val data: BehaviorSubject<List<Data>>

    fun fetchLatestData() {
        // request the data from remote
        data.onNext(newFetchedData)
    }
}