I'm trying to get a subscription to automatically unsubscribe when it emits an item. The base observable is created like this.
public static Observable<RxBleConnection> setupConnection(RxBleDevice device, PublishSubject<Void> disconnectTrigger) {
return device
.establishConnection(false)
.takeUntil(disconnectTrigger)
.retry(3)
.retryWhen(o -> o.delay(RETRY_DELAY, TimeUnit.MILLISECONDS))
.compose(new ConnectionSharingAdapter());
}
Then I try to combine three read operations into a ProgramModel
.
private void readCharacteristics(Action1<ProgramModel> onReadSuccess) {
mConnectionObservable
.flatMap(rxBleConnection ->
// combines the following three observables into a single observable that is
// emitted in onNext of the subscribe
Observable.combineLatest(
rxBleConnection.readCharacteristic(UUID_SERIAL_NUMBER),
rxBleConnection.readCharacteristic(UUID_MACHINE_TYPE),
rxBleConnection.readCharacteristic(UUID_CHARACTERISTIC),
ProgramModel::new))
.observeOn(AndroidSchedulers.mainThread())
.take(1)
.subscribe(programModel -> {
programModel.trimSerial();
onReadSuccess.call(programModel);
}, BleUtil::logError);
}
So theoretically once a program model is comes through oNext
of the subscribe, the subscription will be unsubscribed from. For some reason the operation gets stuck and onNext
and onError
are never called. If I remove the take(1)
this works fine but I don't want to have to deal with holding onto a reference to the subscription and unsubscribing manually. Does anyone know what I'm doing wrong or why onNext
is not being called?
I needed to call
take(1)
before theflatMap
as well as after. This post sort of explains it Read multiple characteristics from an Android device using library RxAndroidBle