RxAndroidBle read operation for large data size

431 Views Asked by At

I am using RxAndroidBle library for reading the values from my BLE devices to my android app. App does multiple reads on BLE peripheral device as shown in the code below.

public void longRead()
{
   // if (isConnected())
    {
        longReadSubscription =  connectionObservable
                .flatMap(RxBleConnection::discoverServices, (rxBleConnection, services) ->
                        services.getCharacteristic(UUID_READ_CHARACTERISTIC)
                                .flatMap(characteristic -> rxBleConnection.queue(new CustomReadOperation(characteristic)))
                )
                .flatMap(observable -> observable)
                .first() // Bug fix for not disconnecting after read.
                .observeOn(AndroidSchedulers.mainThread())
                .compose(this.bindToLifecycle())
                .subscribe(bytes -> {
                    configureWifiMvpView.showList(bytes);
                }, this::onLongReadFailure);
    }
}

private static class CustomReadOperation implements RxBleRadioOperationCustom<byte[]> {

    private BluetoothGattCharacteristic characteristic;
    CustomReadOperation(BluetoothGattCharacteristic characteristic) {
        this.characteristic = characteristic;
    }

    /**
     * Reads a characteristic until a special charcter has been found.. This is easily achieve without
     * a custom operation. The gain here is that only one operation goes into the RxBleRadio queue
     * eliminating the overhead of going on & out of the operation queue.
     */
    @NonNull
    @Override
    public Observable<byte[]> asObservable(BluetoothGatt bluetoothGatt,
                                           RxBleGattCallback rxBleGattCallback,
                                           Scheduler scheduler) throws Throwable {
        return readAndObserve(this.characteristic, bluetoothGatt, rxBleGattCallback)
                .subscribeOn(scheduler)
                .takeFirst(readResponseForMatchingCharacteristic())
                .map(byteAssociation -> byteAssociation.second)

                .repeatWhen(completed -> completed.delay(200, TimeUnit.MILLISECONDS))
                .takeUntil(result -> getSpecialCharacter(result) == true);

    }

    public boolean getSpecialCharacter(byte[] s) {
        boolean result = false;
       try {
           String str = new String(s, "UTF-8");
           if (str == null || str.trim().isEmpty()) {
               // System.out.println("Incorrect format of string");
               return true;
           }
           System.out.println(s);
           Pattern p = Pattern.compile("]");
           Matcher m = p.matcher(str);
           result = m.find();
     }
       }catch(Exception e)
       {
           System.out.println("exception #####");
       }
        return result;

    }

    @NonNull
    private Observable<ByteAssociation<UUID>> readAndObserve(BluetoothGattCharacteristic characteristic,
                                                             BluetoothGatt bluetoothGatt,
                                                             RxBleGattCallback rxBleGattCallback) {
        Observable<ByteAssociation<UUID>> onCharacteristicRead = rxBleGattCallback.getOnCharacteristicRead();

        return Observable.create(emitter -> {
            Subscription subscription = onCharacteristicRead.subscribe(emitter);
            emitter.setCancellation(subscription::unsubscribe);
            try {
                final boolean success = bluetoothGatt.readCharacteristic(characteristic);
                if (!success) {
                    throw new BleGattCannotStartException(bluetoothGatt, BleGattOperationType.CHARACTERISTIC_READ);
                }
            } catch (Throwable throwable) {
                emitter.onError(throwable);
            }
        }, Emitter.BackpressureMode.BUFFER);
    }

    private Func1<ByteAssociation<UUID>, Boolean> readResponseForMatchingCharacteristic() {
        return uuidByteAssociation -> uuidByteAssociation.first.equals(UUID_READ_CHARACTERISTIC);
    }
}

}

Now all this works fine , but once in a while if data read from BLE device is more than 600 bytes then i get Subscribe(bytes->) call before the complete data has been read and after that i get takeUntil call where it checks for special character at the end of string and terminates, loosing all the data in second chunk. It looks like that RxAndrodiBLE library maintains an internal buffer and keep writing all the data in that buffer , once the buffer is full the subscribe observable is called. This causes problem when the size of data read is greater than the buffer size. how do we handle that so that we dont get data in chunks. Also Can we increase the size of buffer so that all the data received can be stored there before subscribe getting called.

0

There are 0 best solutions below