rxjava combines 2 calls with error handling, fails with delay

195 Views Asked by At

The use case is, there are 2 sources of the data:

  1. Service 1 - fetches from source-1
  2. Service 2 - fetches from the source-2

The app should return data at least from source-1. If all is fine with source-2 - the data will be "enhanced", say multiplied by 100.

Service 1 calls service 2.

if all successful user gets the data from service-1 and service-2 if there is an error on service 2, users gets data only from service 1 (at least) if there is an error on service 1 - user will get an error.

There is hello-world-bench code, that emulate this scenario:

import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;

class Response {

    public Integer value;
    public String warning;
    public Response(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Response{" +
                "value=" + value +
                ", warning='" + warning + '\'' +

                '}';
    }
}

class Service1 {

    public Observable<Response> call(int arg) {
        return Observable
                .just(
                        new Response(1),
                        new Response(2),
                        new Response(3),
                        new Response(4))
                .delay(100, TimeUnit.MILLISECONDS);
    }
}

class Service2 {

    public Observable<Response> call(int arg) {

        if ( arg % 2 == 0) {

            System.out.println("service 2: " + arg);

            return Observable
                    .just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1 
                    .delay(10, TimeUnit.MILLISECONDS);

        } else {

            System.out.println("service 2: " + arg);

            return Observable.error(new RuntimeException("service 2 error"));
        }
    }
}

public class Step1 {

    static Service1 service1 = new Service1();
    static Service2 service2 = new Service2();

    public static void main(String[] args) throws InterruptedException {

        var oo1 = service1.call(1);

        var oo3 = oo1.switchMapDelayError(x -> {

            final Observable<Response> oo2 = service2.call(x.value);

            return oo2
                    .onErrorReturn((ex) -> {
                        //System.out.println("Error handling..." + ex.getMessage() + " " + x);
                        x.warning = ex.getMessage();
                        return x; // returns at least service1 result
                    });
        });

        oo3.subscribe(x -> {
            System.out.println(x);
        });


        Thread.sleep(100000);
    }

}

The result of this code is:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

The problem is: there is no expected: value=200 2*100

yet, if I comment a delay at service2.call() //.delay(10, TimeUnit.MILLISECONDS) then its get the expected result:

service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}

The question is: why with .delay(10, TimeUnit.MILLISECONDS) on service2.call() it fails to produce value=200 ? what's wrong with that solution, what do I miss?

Thanks.

1

There are 1 best solutions below

1
On BEST ANSWER

your problem is the switchMapDelayError operator. You should either use concatMap or flatMap

I took the liberty to write a test for your use-case. As a note, always use the overload to provide a Scheduler in order provide a TestScheduler for testing.

What does switchMap do?

On each upstream emit switchMap subscribes to given inner-stream. When a new value is emitted from upstream, the old inner-stream gets unsubscribed and the lambda of switchMap is called again in order to subscribe to the new inner-stream.

The problem is probably this code:

return Observable
            .just(
                    new Response(1),
                    new Response(2),
                    new Response(3),
                    new Response(4))
            .delay(100, TimeUnit.MILLISECONDS);

It emits Response 1 to 4 on the stack almost instantly one after another and each emit is delayed on another thread. Therefore Response 1 to 4 will be emitted almost instantly. They will not be emitted like: Response(1) at 100ms, Response(2) at 200ms, etc.

Lets see what the output is for

Observable.just(
    new Response(1), //
    new Response(2),
    new Response(3),
    new Response(4))
    .delay(100, TimeUnit.MILLISECONDS)
    .subscribe(r -> {
      System.out.println("received value at " + Schedulers.io().now(TimeUnit.MILLISECONDS));
    });

Output

received value at 1607432032768
received value at 1607432032769
received value at 1607432032769
received value at 1607432032769

Therefore all values are emitted almost instantly and overwrite each other with the switchMap. The previously emitted value is almost instantly cancelled by the new value.

Solution

Use concatMap or flatMap or change your test-setup to emit each value at 100ms intervals.

flatMap just subscribes to each value, at max by default 128 inner streams. ConcatMap will only subscribe to the next value, when the inner-stream completes.

Test

public class So65193002 {
      @Test
      void so() {
        TestScheduler testScheduler = new TestScheduler();
        Service1 service1 = new Service1(testScheduler);
        Service2 service2 = new Service2(testScheduler);
    
        Observable<Response> service1Call = service1.call(1);
    
        Observable<Response> combined =
            service1Call.concatMapEagerDelayError(
                x -> {
                  return service2
                      .call(x.value)
                      .onErrorReturn(
                          (ex) -> {
                            x.warning = ex.getMessage();
                            return x; // returns at least service1 result
                          });
                },
                true);
    
        TestObserver<Response> test = combined.test();
    
        testScheduler.advanceTimeBy(1, TimeUnit.HOURS);
    
        test.assertValueCount(4)
            .assertValueAt(
                0,
                r -> {
                  assertThat(r.value).isEqualTo(1);
                  assertThat(r.warning).isNotEmpty();
                  return true;
                })
            .assertValueAt(
                1,
                r -> {
                  assertThat(r.value).isEqualTo(200);
                  assertThat(r.warning).isNull();
                  return true;
                })
            .assertValueAt(
                3,
                r -> {
                  assertThat(r.value).isEqualTo(400);
                  assertThat(r.warning).isNull();
                  return true;
                });
      }
    }

Domain

class Response {
  public Integer value;
  public String warning;

  public Response(Integer value) {
    this.value = value;
  }

  @Override
  public String toString() {
    return "Response{" + "value=" + value + ", warning='" + warning + '\'' + '}';
  }
}

class Service1 {
  private final Scheduler scheduler;

  Service1(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    return Observable.just(
            new Response(1), //
            new Response(2),
            new Response(3),
            new Response(4))
        .delay(100, TimeUnit.MILLISECONDS, scheduler);
  }
}

class Service2 {
  private final Scheduler scheduler;

  Service2(Scheduler scheduler) {
    this.scheduler = scheduler;
  }

  public Observable<Response> call(int arg) {
    if (arg % 2 == 0) {
      return Observable.just(new Response(100 * arg)).delay(10, TimeUnit.MILLISECONDS, scheduler);

    } else {
      return Observable.error(new RuntimeException("service 2 error"));
    }
  }
}

Note

Do not use mutable objects. Always ensure, that emitted values are immutable or you get into trouble.