rxAndroidBLE using persistent connection

141 Views Asked by At

I have googled this subject but not found a good example how to do it. I have tried to implement the idea discussed here: https://github.com/Polidea/RxAndroidBle/issues/138

So my code to connect:

                RxBleDevice device = rxBleClient.getBleDevice("84:CC:A8:2E:24:6A");
            disposable =device.establishConnection( false)
                    .flatMap ( rxBleConnection ->
                            Completable.concat(
                                    doSomething(rxBleConnection),1) // Completable created here may be arbitratily long

                    .andThen(Observable.just(rxBleConnection))
                    )
                    .take(1)
                    .subscribe(rxBleConnection-> {
                            Log.d("rxble", "subscribe completable");
                            },
                            throwable -> {
                                Log.d("rxble", "completable error:"+throwable.getMessage());
                            }
                    );

And the doSomething is defined like this:

        private Publisher<? extends CompletableSource> doSomething(RxBleConnection connection) {
Log.d("rxble","do Something" );
while (!terminateDoSomething) { // doSomething is terminated with setting terminateDoSomething=true
    switch (executecommand) {   // execute BLE command R or W
        case 'R':
            connection.readCharacteristic(My_UUID("2103"))
                .subscribe(readresult->{Log.d("rxble","read result:"+readresult );},
                throwable -> {Log.d("rxble","read error:"+throwable.getMessage() );});
            executecommand='X';
            break;
        case 'W':
            connection.writeCharacteristic(My_UUID("2103"), WriteInt(1500))
                 .subscribe(writeresult->{Log.d("rxble","write result:"+writeresult );},
                 throwable -> {Log.d("rxble","write error:"+throwable.getMessage());});
            executecommand='X';
            break;
    }
    try {
        Thread.sleep(500); // wait a while
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
    Log.d("rxble","do Something complete" );
    return null; // what should I return here ?
}

When disconnect I just set terminatedoSomething=true and disposable.dispose().

First question what should I return from doSomething, returning null gives completable errror ?

Second issue: When my app logic sets executecommand to R or W, those commands get executed but I get error:(READ or WRITE) after 30 seconds

2021-06-27 13:26:54.069 30085-31486/heikki.fi.bttest D/rxble: read error:GATT exception from MAC address 84:CC:A8:2E:24:6A, with type BleGattOperation{description='CHARACTERISTIC_READ'}

BLE connection is active. Write operation actually succeeds (I see that on the peripheral side) How do I properly subscribe read/write results ?

1

There are 1 best solutions below

4
Dariusz Seweryn On

First question what should I return from doSomething, returning null gives completable errror?

You should return an object that is a CompletableSource. null is not acceptable since whole RxJava 2 is wrapped around an idea that no null values can be passed around.

Most probably what you face is a problem caused by the null you return mixed with blocking a thread that is not supposed to be blocked (Thread.sleep())

How do I properly subscribe read/write results ?

You can use .subscribe() but in the context of your code you could return them instead of null:

    private Publisher<? extends CompletableSource> doSomething() {
        return Flowable.interval(500L, TimeUnit.MILLISECONDS) // Flowable is a Publisher, lets check every 500 ms if there is something to do
                .takeUntil(ignored -> terminateDoSomething) // take until terminate is requested
                .map(ignored -> { // return a CompletableSource
                    try {
                        switch (executeCommand) {
                            case 'R': return connection.readCharacteristic(My_UUID("2103"))
                                    .doOnSuccess(readresult -> Log.d("rxble","read result:" + readresult))
                                    .doOnError(throwable -> Log.d("rxble","read error:" + throwable.getMessage() ))
                                    .ignoreElement();
                            case 'W': return connection.writeCharacteristic(My_UUID("2103"), WriteInt(1500))
                                    .doOnSuccess(writeresult -> Log.d("rxble","write result:" + writeresult))
                                    .doOnError(throwable -> Log.d("rxble","write error:" + throwable.getMessage() ))
                                    .ignoreElement();
                            default: return Completable.complete();
                        }
                    } finally {
                        executeCommand = 'X'; // each time clear the command
                    }
                });
    }

This is not an ideal solution in reactive programming world as you keep external state (executeCommand and terminateDoSomething) that RxJava tries to eliminate. These should also be codified in proper use of operators or new observables (~Observable<CommandToExecute>?) or simply unsubscribing the flow which will terminate it.