RxJava2 onNext can't execute after FlowableEmitter.onNext executed many times

67 Views Asked by At

I called the Flowable.create to implement that loop read data from bluetooth sockete InputStream, I try the below pseudocode to implement this.

// connectBluetoothFlowable is a Flowable<Boolean>
connectBluetoothFlowable.flatMap(bluetoothConnected -> {
    if (!bluetoothConnected) {
        return Flowable.error(new Exception("ERROR"));
    }

    return Flowable.create(emitter -> {
        while(true) {
        // read data from bluetooth sockete InputStream
        // String data = ...;

        // emit the data
        Log.e("emitter", data);
        emitter.onNext(data);

        // if(...) {
        //     break;
        // }
        }

        emitter.onComplete();
    }, BackpressureStrategy.BUFFER);
}).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // onNext callback
    Log.e("result", "result: " + data);
}, throwable -> {
    // onError
}, () -> {
    // onComplete
}, subscription -> {
    // onSubscribe
});

But after Log.e("emitter", "emitter data"); log many times, the Log.e("result", "result: " + data); can't log anymore at onNext callback. I'm so confused why. Could anyone help me?

The RxJava2 version is 2.2.19, the RxAndroid version is 2.1.1

I try the below code, but after Log.e("emitter", "emitter data"); log many times, the Log.e("result", "result: " + data); can't log anymore at onNext callback.

// connectBluetoothFlowable is a Flowable<Boolean>
connectBluetoothFlowable.flatMap(bluetoothConnected -> {
    if (!bluetoothConnected) {
        return Flowable.error(new Exception("ERROR"));
    }

    return Flowable.create(emitter -> {
        while(true) {
        // read data from bluetooth sockete InputStream
        // String data = ...;

        // emit the data
        Log.e("emitter", data);
        emitter.onNext(data);

        // if(...) {
        //     break;
        // }
        }

        emitter.onComplete();
    }, BackpressureStrategy.BUFFER).onBackpressureBuffer();
}).subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // onNext callback
    Log.e("result", "result: " + data);
}, throwable -> {
    // onError
}, () -> {
    // onComplete
}, subscription -> {
    // onSubscribe
});
0

There are 0 best solutions below