OnNext not being called on observable when using combineLatest and take

483 Views Asked by At

I'm trying to get a subscription to automatically unsubscribe when it emits an item. The base observable is created like this.

public static Observable<RxBleConnection> setupConnection(RxBleDevice device, PublishSubject<Void> disconnectTrigger) {
    return device
            .establishConnection(false)
            .takeUntil(disconnectTrigger)
            .retry(3)
            .retryWhen(o -> o.delay(RETRY_DELAY, TimeUnit.MILLISECONDS))
            .compose(new ConnectionSharingAdapter());
}

Then I try to combine three read operations into a ProgramModel.

private void readCharacteristics(Action1<ProgramModel> onReadSuccess) {
    mConnectionObservable
            .flatMap(rxBleConnection ->
                    // combines the following three observables into a single observable that is
                    // emitted in onNext of the subscribe
                    Observable.combineLatest(
                            rxBleConnection.readCharacteristic(UUID_SERIAL_NUMBER),
                            rxBleConnection.readCharacteristic(UUID_MACHINE_TYPE),
                            rxBleConnection.readCharacteristic(UUID_CHARACTERISTIC),
                            ProgramModel::new))
            .observeOn(AndroidSchedulers.mainThread())
            .take(1)
            .subscribe(programModel -> {
                programModel.trimSerial();
                onReadSuccess.call(programModel);
            }, BleUtil::logError);
}

So theoretically once a program model is comes through oNext of the subscribe, the subscription will be unsubscribed from. For some reason the operation gets stuck and onNext and onError are never called. If I remove the take(1) this works fine but I don't want to have to deal with holding onto a reference to the subscription and unsubscribing manually. Does anyone know what I'm doing wrong or why onNext is not being called?

1

There are 1 best solutions below

1
On

I needed to call take(1) before the flatMap as well as after. This post sort of explains it Read multiple characteristics from an Android device using library RxAndroidBle