how to use RxJava repeatWhen

1k Views Asked by At

I am trying to understand how to use repeatWhen in RxJava. The javadoc is confusing, when I searched online someone suggested to use like below

MyRepeatFunction myRepeatFunction = new MyRepeatFunction(3);
observable1.repeatWhen(myRepeatFunction).subscribe((t) -> System.out.print(t));
class MyRepeatFunction implements Function<Observable<Object>, ObservableSource<Object>> {
    private int repeatCount;
    
    public MyRepeatFunction(int repeatCount) {
        this.repeatCount = repeatCount;
    }
    
    @Override
    public @NonNull ObservableSource<Object> apply(@NonNull Observable<Object> t) throws Throwable {
        return t.delay(1, TimeUnit.SECONDS);
    }
}

The code return t.delay(1, TimeUnit.SECONDS); will make it continue forever. It doesn't stop until the main thread stops. I want to repeat the observable but only repeatCount times or till a particular is not true.

I am confused. Help is appreciated.

1

There are 1 best solutions below

0
On

What about this:

handler – receives an Observable of notifications with which a user can complete or error, aborting the retry

You get an observable injected into the retryWhen-operator, which emits the error on each onError from the source. You could determine what to do next. Emitting a value, any value, in the retryWhen will result in a re-subscription from bottom (retryWhen) to the top.

The example shows, that you could use the throwable-observable inside retryWhen and apply the take-operator in order to limit the re-subscriptions. Furthermore in order to complete the source on a signal you could use takeUntil, which takes another observable. takeUntil will send a onComplete downstream and the subscription will complete. RxJava will take care of the rest, meaning open subscriptions (retry) will be cancelled.

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.schedulers.TestScheduler;
import io.reactivex.rxjava3.subjects.PublishSubject;
import org.junit.jupiter.api.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

class So65154637 {
  @Test
  void main() {
    PublishSubject<String> shouldFinish$ = PublishSubject.create();
    TestScheduler testScheduler = new TestScheduler();

    AtomicInteger count = new AtomicInteger(0);

    Observable<Object> obs =
        Observable.fromAction(
            () -> {
              System.out.println("inc");
              count.incrementAndGet();
              throw new RuntimeException("woat");
            })
        .retryWhen(
            throwableObservable ->
                throwableObservable.take(10).delay(1L, TimeUnit.SECONDS, testScheduler))
        .takeUntil(shouldFinish$);

    TestObserver<Object> test = obs.test();

    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);

    shouldFinish$.onNext("");

    assertThat(count.get()).isEqualTo(3);
    test.assertComplete();
  }
}