Convert Rx Observable to Flow using UseCase

108 Views Asked by At

I am trying to convert my code into Compose. I have libraries but all have Observable type which can't be use with Flow. I need its result in Flow.

protected abstract Observable<T> UseCase();



public void execute(Subscriber<T> useCaseSubscriber) {
    mObservable=this.buildUseCaseObservable();
    this.subscription = this.buildUseCaseObservable()
            .subscribeOn(Schedulers.io())
            .doOnError(throwable -> {

            }).doOnNext(t -> {
                ObserverManager.getInstance().remove(useCaseSubscriber);
            }).doOnComplete(() -> {
                ObserverManager.getInstance().remove(useCaseSubscriber);
            })
            .observeOn(postExecutionThread.getScheduler())
            .subscribeWith(useCaseSubscriber);
    if(!(this instanceof EventProcessorUseCase)) {
        ObserverManager.getInstance().cached(useCaseSubscriber);
    }

}

And I am trying to access above method like

public class ForgotPasswordUC extends UseCase<String> {

private String email;

public ForgotPasswordUC(String email,Repository repository, PostExecutionThread postExecutionThread) {
    super(repository, postExecutionThread);
    this.email=email;
}

@Override
protected Observable<String> buildUseCaseObservable() {
    return repository.forgotPassword(email);
}

}

I am using UseCase to call APIs but Failed to access result in case of Compose Access. How can I write above usecase which will return Flow related values from where we can Collect?

3

There are 3 best solutions below

1
龚诗豪 On

here is a primitive method,but it works effectively.

class FlowConvert:Subscriber<String>{
private var _innerFlow:Channel<String> = Channel()
val flow:Flow<String> =_innerFlow.receiveAsFlow()

override fun onSubscribe(s: Subscription?) {

}

override fun onNext(t: String?) {
    _innerFlow.trySend(t.toString())
}

override fun onError(t: Throwable?) {

}

override fun onComplete() {

}}

Personally, your code looks strange. Why don't you return Observable? It can make it easy for Rx to convert Flow.

For example

fun Observable<String>.convert()= callbackFlow<String> {
val dispose=subscribe {
    trySend("")
}
awaitClose {
    dispose.dispose()
}}
0
Aks4125 On

If you want return type as Flow instead of Observable then use Flow with coroutines or Flowable with rx-java.

public abstract class UseCase<T> {


protected abstract Flowable<T> buildUseCaseFlowable();

public Flowable<T> execute() {
    return buildUseCaseFlowable()
        .subscribeOn(Schedulers.io())
        .doOnError(throwable -> {})
        .doOnComplete(() -> {});
    }
}

Implementation:

@Override
protected Flowable<String> buildUseCaseFlowable() {
    return repository.forgotPassword(email);
}

Usage:

forgotPasswordUC.execute().subscribe(result -> {}, throwable -> {});

You may require additional changes as per your class but this should be the approach.

0
ExpensiveBelly On

Two things I would like to mention:

  1. I don't understand why you're using Java instead of Kotlin
  2. Jetbrains already released a library to convert back and forth between Observable, Single, Maybe to coroutines / Flow as needed. https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-rx3/README.md

All you need to do is to add this to your build.gradle:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx3:1.7.3" or whatever is the latest version of coroutines

and then you can use their extension functions, like this one: myObservable.asFlow()