RxJava 2. Retry with delay while proceeding other items that are distinct

4.2k Views Asked by At

I have an observable that gets items from a lot of sources:

Source { List<Item> data }

Relationship between sources and items is many-to-many and in different sources items could duplicate themselves. Item is an entity that should be uploaded to server and server does not accept duplicates. To achieve this I merge Sources and distinct their Items by their ids and then upload unique items to server. Like below:

Observable.merge(source1(), source2(), source3())
            .flatMapIterable(sources -> sources)
            .flatMapIterable(source::getItems)
            .distinct(item -> item.getId())
            .flatMapCompletabale(item -> uploadItem(item))

Item uploading could emit several errors and on some of them I should retry to upload item once again later and proceed another items while 'failed' one is waiting for its retrying.

How can I postpone retrying uploading 'failed' item and proceed other items while this one is wating for its try?

Thanks in advance!

3

There are 3 best solutions below

0
On BEST ANSWER

I put this function into retryWhen method and get it working.

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {

private final int maxRetryCount;
private final int retryDelay;
private int retryCount;
private TimeUnit timeUnit;

public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
    this.maxRetryCount = maxRetryCount;
    this.retryDelay = retryDelay;
    this.timeUnit = timeUnit;
    this.retryCount = 0;
}

@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
    return attempts.flatMap((Function<Throwable, Observable<?>>) throwable -> {

        if (++retryCount < maxRetryCount) {
            return Observable.timer(retryDelay, timeUnit);
        }

        return Observable.error(throwable);
    });
}
}
2
On

To just handle the failure of one upload, you can add an operator in the final step:

  .flatMapCompletable(item->uploadItem(item))

should become

  .flatMapCompletable(item->uploadItem(item)
                              .retryWhen(throwable -> 
                                  throwable.delay(5, TimeUnit.SECONDS)))

Edit: I learned a lot about retryWhen() operator from this Dan Lew blog entry. You will find several examples, including using the zip() operator with Observable.range(3) to limit the number of retries.

0
On

I had to modify the above example to create a Flowable to retryWhen a Single in my RxJava2 project:

import io.reactivex.Flowable; import io.reactivex.functions.Function;

import java.util.concurrent.TimeUnit;

public class RetryWithDelay implements Function<Flowable<? extends Throwable>, Flowable<?>> {

    private final int maxRetryCount;
    private final int retryDelay;
    private int retryCount;
    private TimeUnit timeUnit;

    public RetryWithDelay(final int maxRetryCount, final int retryDelay, final TimeUnit timeUnit) {
        this.maxRetryCount = maxRetryCount;
        this.retryDelay = retryDelay;
        this.timeUnit = timeUnit;
        this.retryCount = 0;
    }

    @Override
    public Flowable<?> apply(final Flowable<? extends Throwable> attempts) {

        return attempts.flatMap((Function<Throwable, Flowable<?>>) throwable -> {

            if (++retryCount < maxRetryCount) {
                return Flowable.timer(retryDelay, timeUnit);
            }

            return Flowable.error(throwable);
        });
    } }

and to apply it to my single:

.retryWhen(new RetryWithDelay(5, 2, TimeUnit.SECONDS))