Reuse part of RxJava stream

134 Views Asked by At

Probably an rx newbee question.

If I have two rx streams, which have some common parts, is there a possibility to extract and to reuse them?

Here is pseudo code example:

someManager.getInfo(id) returns a Single<SometInfo>

This fun takes an id and should return just a single with status

fun getStatus(id: String): Single<Status> =
    someManager.getInfo(id)
        .flattenAsObservable { it.items }
        .map { getSomeProp(it) } // from here
        .toList()
        .map { getStatus(it) }
        .doOnSuccess { getStatusRelay(id).accept(it) } // until here it's common

and this one doesn't take an id but already an object and should check if a status is ok

fun isStatusOk(info: SomeInfo): Single<Boolean> =
    Observable.fromIterable(info.items)
        .map { getSomeProp(it) } // from here
        .toList()
        .map { getStatus(it) }
        .doOnSuccess { getStatusRelay(id).accept(it) } // until here it's common
        .map { it == Status.OK }

Thank you in advance

2

There are 2 best solutions below

0
On

Phoenix Wang suggested Transformers. I tried them and after some experiments it seems I got it working.

I created a SingleTransformer:

private fun getStatus() =
    SingleTransformer<SomeInfo, Status> {
        it.flatMap {
            Observable.fromIterable(it.items)
                .map { getSomeProp(it) }
                .toList()
                .map { getStatus(it) }
        }
    }

And then I can refactor

fun getStatus(id: String): Single<Status> =
    someManager.getInfo(id)
        .compose(getStatus())
        .doOnSuccess { getStatusRelay(id).accept(it) }

and this too

fun isStatusOk(info: SomeInfo): Single<Boolean> =
    Single.just(info)
        .compose(getStatus())
        .doOnSuccess { getStatusRelay(id).accept(it) }
        .map { it == Status.OK }

Hopefully it helps someone else.

1
On

You can also use extend function

fun Observable<Item>.convertToSingeStatus(): Single<Status> =
        this.map { getSomeProp(it) }
            .toList()
            .map { getStatus(it) }
            .doOnSuccess { getStatusRelay(id).accept(it) }

And then

fun getStatus(id: String): Single<Status> =
someManager.getInfo(id)
    .convertToSingeStatus()
    .doOnSuccess { getStatusRelay(id).accept(it) }