I have a fairly complex chain of Rx code which is used to perform a number of operations. Essentially when started, an Observable starts emitting a list of items. For each item, a connection is made (these items are BLE devices) and a characteristic is written.
This characteristic is re-written every 10 seconds until an error occurs (such as pulling the battery out). After that, the next item in the original list is connected to, and so on and so forth.
Here is where most of the action occurs:
public Observable<String> connectForPolice(String name, RemovalListener listener) {
StringBuilder builder = new StringBuilder(mAddress);
builder.setCharAt(mAddress.length() - 1, '4');
RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());
return device.establishConnection(mContext, false)
.timeout(12, TimeUnit.SECONDS)
.doOnError(throwable -> {
Log.d(TAG, "Error thrown after timeout.");
throwable.printStackTrace();
})
.flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(RxBleConnection rxBleConnection) {
byte[] value = new byte[1];
value[0] = (byte) (3 & 0xFF);
return Observable // ... we return an observable ...
.defer(() -> {
return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
})
.repeatWhen(observable -> {
return observable.delay(10, TimeUnit.SECONDS);
});
}
})
.observeOn(AndroidSchedulers.mainThread())
.onErrorResumeNext(throwable -> {
throwable.printStackTrace();
listener.onFinished(name);
return Observable.empty();
});
}
There are a few problems here. As you can see, a traditional listener is passed into this method, and this listener is called in onErrorResumeNext
. This listener is being used to remove the item from a RecyclerView back in the Activity, but that is not important. The issue is, using a listener in this fashion is a bit of an anti-pattern of Rx. Below is the rest of the relevant code, including the code that calls the above example:
@Override
public void onFinished(String deviceName) {
mAdapter.removeItem(deviceName);
}
private void startConnecting() {
mIsBeepingAll = true;
mConnectingSubscription = Observable.from(mAdapter.getItems())
.flatMap((Func1<Device, Observable<?>>) device -> {
Log.d(TAG, "connecting for policing");
return device.connectForPolice(device.getName(), PoliceActivity.this);
}, 1)
.doOnError(throwable -> throwable.printStackTrace())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted...");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
Log.d(TAG, "onError...");
}
@Override
public void onNext(Object o) {
Log.d(TAG, "onNext... ");
}
});
}
The implementation of the listener is included. The question is, how can I perform the equivalent of what the listener does using Rx instead?
The simplest would be to change this:
into this:
And the caller would use: