RxKotlin - Wrong subscribeOn, observeOn thread changing for Subject out of Activity?

349 Views Asked by At

I have an object that generates different strings in the random moments of time, and i need to sudscribe to this generator to take these strings and provide them to ui (maybe it will be multiple subscribers in different activities). Suppose, i got the following code:

generator:

class Generator {

    private var stringToGenerate = ""

    var subject: BehaviorSubject<String> = BehaviorSubject.create<String>()

    init {
        //seems like these instructions are skipped
        subject
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext { t -> Log.i("subject doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .observeOn(AndroidSchedulers.mainThread())
                .map { _ -> Log.i("subject map", Thread.currentThread().name + " " + Thread.currentThread().id) }

        //imitation of async creating of strings in separate thread
        timer("timerThread", false, 2000L, 2000L) {
            stringToGenerate = System.currentTimeMillis().toString()
            subject.onNext(stringToGenerate)
        }
    }
}

One of the activities that must consume generated strings:

class TestActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        setContentView(R.layout.activity_test)

        val wrongThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("wrongThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val generator = Generator()
        generator.subject.subscribe(wrongThreadObserver)

        //for correct work illustration
        val correctThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("correctThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val mainThreadSubject = BehaviorSubject.create<String>()
        mainThreadSubject
                .doOnNext { obj -> Log.i("correctThread doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(correctThreadObserver)
        mainThreadSubject.onNext("test thread")
        val handler = Handler()
        handler.postDelayed({ mainThreadSubject.onNext("test thread 2") }, 1000)
        handler.postDelayed({ mainThreadSubject.onNext("test thread 3") }, 2000)
    }
}

In this case correctThreadObserver, created just in activity, works fine, but wrongThreadObserver keeps work in timer thread, it seems like it ignoring instructions subscribeOn, ObserveOn, doOnNext in Generator, no matter where these instuctions called - in init, in timer thread, in activity by getting object to it from generator - wrongThreadObserver still work in timer thread. So log is:

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

No doOnNext and no main thread for wrongThreadObserver What i do wrong?

1

There are 1 best solutions below

0
On BEST ANSWER

I find the following solution: We must subscribe to Observable, not to Subject, so we must use the result of method observeOn(), not object "subject" itself. If multiple subscribtion is needed, we can cache the result of observeOn in separate variable:

//in Generator
.......
var observableInSeparateVar = subject.observeOn(AndroidSchedulers.mainThread())
.......

//in TestActivity
.......
generator.observableInSeparateVar.subscribe(wrongThreadObserver)
.......

Also we can call observeOn() to subject and subscribe after that:

//in TestActivity
.......
generator.subject
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(wrongThreadObserver)