The use case is, there are 2 sources of the data:
- Service 1 - fetches from source-1
- Service 2 - fetches from the source-2
The app should return data at least from source-1. If all is fine with source-2 - the data will be "enhanced", say multiplied by 100.
Service 1 calls service 2.
if all successful user gets the data from service-1 and service-2 if there is an error on service 2, users gets data only from service 1 (at least) if there is an error on service 1 - user will get an error.
There is hello-world-bench code, that emulate this scenario:
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
class Response {
public Integer value;
public String warning;
public Response(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Response{" +
"value=" + value +
", warning='" + warning + '\'' +
'}';
}
}
class Service1 {
public Observable<Response> call(int arg) {
return Observable
.just(
new Response(1),
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS);
}
}
class Service2 {
public Observable<Response> call(int arg) {
if ( arg % 2 == 0) {
System.out.println("service 2: " + arg);
return Observable
.just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1
.delay(10, TimeUnit.MILLISECONDS);
} else {
System.out.println("service 2: " + arg);
return Observable.error(new RuntimeException("service 2 error"));
}
}
}
public class Step1 {
static Service1 service1 = new Service1();
static Service2 service2 = new Service2();
public static void main(String[] args) throws InterruptedException {
var oo1 = service1.call(1);
var oo3 = oo1.switchMapDelayError(x -> {
final Observable<Response> oo2 = service2.call(x.value);
return oo2
.onErrorReturn((ex) -> {
//System.out.println("Error handling..." + ex.getMessage() + " " + x);
x.warning = ex.getMessage();
return x; // returns at least service1 result
});
});
oo3.subscribe(x -> {
System.out.println(x);
});
Thread.sleep(100000);
}
}
The result of this code is:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
The problem is: there is no expected: value=200
2*100
yet, if I comment a delay at service2.call() //.delay(10, TimeUnit.MILLISECONDS) then its get the expected result:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
The question is: why with .delay(10, TimeUnit.MILLISECONDS) on service2.call()
it fails to produce value=200 ? what's wrong with that solution, what do I miss?
Thanks.
your problem is the
switchMapDelayError
operator. You should either use concatMap or flatMapI took the liberty to write a test for your use-case. As a note, always use the overload to provide a
Scheduler
in order provide aTestScheduler
for testing.What does switchMap do?
On each upstream emit switchMap subscribes to given inner-stream. When a new value is emitted from upstream, the old inner-stream gets unsubscribed and the lambda of switchMap is called again in order to subscribe to the new inner-stream.
The problem is probably this code:
It emits Response 1 to 4 on the stack almost instantly one after another and each emit is delayed on another thread. Therefore Response 1 to 4 will be emitted almost instantly. They will not be emitted like: Response(1) at 100ms, Response(2) at 200ms, etc.
Lets see what the output is for
Output
Therefore all values are emitted almost instantly and overwrite each other with the switchMap. The previously emitted value is almost instantly cancelled by the new value.
Solution
Use concatMap or flatMap or change your test-setup to emit each value at 100ms intervals.
flatMap just subscribes to each value, at max by default 128 inner streams. ConcatMap will only subscribe to the next value, when the inner-stream completes.
Test
Domain
Note
Do not use mutable objects. Always ensure, that emitted values are immutable or you get into trouble.