I am on plain JDK 8. I have this simple RxJava example:
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));
and it prints out the words line by line, intertwined with information about the thread, which is 'main' for all next calls, as expected.
However, when I uncomment the subscribeOn(Schedulers.newThread())
call, nothing is printed at all. Why isn't it working? I would have expected it to start a new thread for each onNext()
call and the doOnNext()
to print that thread's name. Right now, I see nothing, also for the other schedulers.
When I add the call to Thread.sleep(10000L)
at the end of my main, I can see the output, which would suggest the threads used by RxJava are all daemons. Is this the case? Can this be changed somehow, but using a custom ThreadFactory or similar concept, and not have to implement a custom Scheduler?
With the mentioned change, the thread name is always RxNewThreadScheduler-1
, whereas the documentation for newThread says "Scheduler that creates a new {@link Thread}
for each unit of work". Isn't it supposed to create a new thread for all of the emissions?
As Vladimir mentioned, RxJava standard schedulers run work on daemon threads which terminate in your example because the main thread quits. I'd like to emphasise that they don't schedule each value on a new thread, but they schedule the stream of values for each individual subscriber on a newly created thread. Subscribing a second time would give you "RxNewThreadScheduler-2".
You don't really need to change the default schedulers, but just wrap your own Executor-based scheduler with Schedulers.from() and supply that as a parameter where needed:
I've got a series of blog posts about RxJava schedulers whichs should help you implement a "more permanent" variant.