Notifying of error events down the chain without use of traditional listener

78 Views Asked by At

I have a fairly complex chain of Rx code which is used to perform a number of operations. Essentially when started, an Observable starts emitting a list of items. For each item, a connection is made (these items are BLE devices) and a characteristic is written.

This characteristic is re-written every 10 seconds until an error occurs (such as pulling the battery out). After that, the next item in the original list is connected to, and so on and so forth.

Here is where most of the action occurs:

public Observable<String> connectForPolice(String name, RemovalListener listener) {

        StringBuilder builder = new StringBuilder(mAddress);
        builder.setCharAt(mAddress.length() - 1, '4');
        RxBleDevice device = mRxBleClient.getBleDevice(builder.toString());

        return device.establishConnection(mContext, false)
                .timeout(12, TimeUnit.SECONDS)
                .doOnError(throwable -> {
                    Log.d(TAG, "Error thrown after timeout.");
                    throwable.printStackTrace();
                })
                .flatMap(new Func1<RxBleConnection, Observable<byte[]>>() {
                    @Override
                    public Observable<byte[]> call(RxBleConnection rxBleConnection) {
                        byte[] value = new byte[1];
                        value[0] = (byte) (3 & 0xFF);

                        return Observable // ... we return an observable ...
                                .defer(() -> {
                                    return rxBleConnection.writeCharacteristic(Constants.BUZZER_SELECT, value);
                                })
                                .repeatWhen(observable -> {
                                    return observable.delay(10, TimeUnit.SECONDS);
                                });
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .onErrorResumeNext(throwable -> {
                        throwable.printStackTrace();
                        listener.onFinished(name);
                    return Observable.empty();
                });
    }

There are a few problems here. As you can see, a traditional listener is passed into this method, and this listener is called in onErrorResumeNext. This listener is being used to remove the item from a RecyclerView back in the Activity, but that is not important. The issue is, using a listener in this fashion is a bit of an anti-pattern of Rx. Below is the rest of the relevant code, including the code that calls the above example:

    @Override
    public void onFinished(String deviceName) {
        mAdapter.removeItem(deviceName);
    }

    private void startConnecting() {
        mIsBeepingAll = true;
        mConnectingSubscription = Observable.from(mAdapter.getItems())
                .flatMap((Func1<Device, Observable<?>>) device -> {
                    Log.d(TAG, "connecting for policing");
                    return device.connectForPolice(device.getName(), PoliceActivity.this);
                }, 1)
                .doOnError(throwable -> throwable.printStackTrace())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onCompleted() {
                        Log.d(TAG, "onCompleted...");
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                        Log.d(TAG, "onError...");
                    }

                    @Override
                    public void onNext(Object o) {
                        Log.d(TAG, "onNext... ");
                    }
                });
    }

The implementation of the listener is included. The question is, how can I perform the equivalent of what the listener does using Rx instead?

1

There are 1 best solutions below

3
On

The simplest would be to change this:

            .onErrorResumeNext(throwable -> {
                throwable.printStackTrace();
                listener.onFinished(name);
                return Observable.empty();
            });

into this:

            .onErrorResumeNext(throwable -> {
                throwable.printStackTrace();
                return Observable.just(name);
            });

And the caller would use:

private void startConnecting() {
    mIsBeepingAll = true;
    mConnectingSubscription = Observable.from(mAdapter.getItems())
            .flatMap((Func1<Device, Observable<?>>) device -> {
                Log.d(TAG, "connecting for policing");
                return device.connectForPolice(device.getName(), PoliceActivity.this);
            }, 1)
            .doOnError(throwable -> throwable.printStackTrace())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "onCompleted...");
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                    Log.d(TAG, "onError...");
                }

                @Override
                public void onNext(String deviceName) {
                    Log.d(TAG, "onNext... ");
                    mAdapter.removeItem(deviceName);
                }
            });
}