Custom Observable stream of REST results from a list of links

850 Views Asked by At

I am using a library which implements its own REST requests. It takes a 'permalink' and issues a REST request to fetch Content (I provide mocks in the source code for testing). I have a list of permalinks, and would like to schedule a request for each and generate a stream of Content results. All of it should be asynchronous, and once all results are done, I want to emit a list of them.

I am trying to achieve this using RxJava. Here is what I have now (sorry for not using lambdas, I just want to get used to the RxJava class names):

public class Main {

    public static void main(String[] args) {
        int count = 10;
        List<String> permalinks = new ArrayList<>(count);
        for (int i = 1; i <= count; ++i) {
            permalinks.add("permalink_" + i);
        }

        ContentManager cm = new ContentManager();

        Observable
                .create(new Observable.OnSubscribe<Content>() {

                    int gotCount = 0;

                    @Override
                    public void call(Subscriber<? super Content> subscriber) {
                        for (String permalink : permalinks) { // 1. is iterating here the correct way?
                            if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
                                cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                    @Override
                                    public void onSuccess(Content content) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(content);
                                            completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
                                        }
                                    }

                                    @Override
                                    public void onFailure(int code, String message) {
                                        if (!subscriber.isUnsubscribed()) {
                                            subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
                                            completeIfFinished();
                                        }
                                    }

                                    private void completeIfFinished() {
                                        ++gotCount;
                                        if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
                                            subscriber.onCompleted();
                                        }
                                    }
                                });
                            }
                        }
                    }
                })
                .toList()
                .subscribe(new Action1<List<Content>>() {

                    @Override
                    public void call(List<Content> contents) {
                        System.out.println("list count: " + contents.size());
                        System.out.println("results: ");
                        contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
                    }
                });

        System.out.println("finishing main");
    }
}

// library mocks

class Content {

    String basic;

    String extended;

    public Content(String basic, String extended) {
        this.basic = basic;
        this.extended = extended;
    }
}

interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public void getBasicContentByPermalink(String permalink, RestCallback callback) {
        // just to simulate network latency and unordered results
        new Thread() {

            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(1000) + 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (random.nextInt(100) < 95) {
                    // 95% of the time we succeed
                    callback.onSuccess(new Content(permalink + "_basic", null));
                } else {
                    callback.onFailure(-1, permalink + "_basic_failure");
                }
            }
        }.start();
    }
}

This bit kind of works, but I am not sure if I do things the right way. Please take a look at the lines number 1-5:

  1. When creating an Observable from a list, am I supposed to iterate over the list myself, or is there some other, better way? For instance, there is Observable.from(Iterable), but I don't think I can use it?
  2. I am checking isUnsubscribed before I send the REST request, and also in both result handlers (success/failure). Is this the way it is supposed to be used?
  3. I implement some logic to call onComplete when all requests have returned, no matter if successful or failed (see question 5). But, as they are guarded by the isUnsubscribed call, it may happen that they onComplete is never called. How does the stream know then that it should complete? Does it complete prematurely when the subscriber is unsubscribed and I don't have to think about it? Are there any caveats? For example, what would happen if the subscriber unsubscribed in the middle of emitting the Content results? In my tests, the list of all results is never emitted, but I would expect a list of all the results up to that point.
  4. In the onFailure method, I emit null with onNext(null) to mark a failure. I do it because in the end I will have a zip of 2 streams, and will emit an instance of a custom class only when both zipped values are non null. Is this the correct way to do it?
  5. As I mentioned, I have some custom logic to check if the stream is finished. What I do here is I count the REST results and when as many were processed as there are permalinks in the list, it is done. Is this necessary? Is this the right way?
1

There are 1 best solutions below

10
On BEST ANSWER

When creating an Observable from a list, am I supposed to iterate over the list myself, or is there some other, better way? For instance, there is Observable.from(Iterable), but I don't think I can use it?

Instead of iterating in the create method, you may build an observable per link.

public Observable<Content> getContent(permalink) {

    return Observable.create(subscriber -> {
                   ContentManager cm = new ContentManager();
                   if (!subscriber.isUnsubscribed()) { 
                            cm.getBasicContentByPermalink(permalink, new RestCallback() {

                                @Override
                                public void onSuccess(Content content) {
                                        subscriber.onNext(content);
                                        subscriber.onCompleted();
                                }

                                @Override
                                public void onFailure(int code, String message) {
                                       subscriber.onError(OnErrorThrowable.addValueAsLastCause(new RuntimeException(message, permalink));
                                }

                }
    });
}

then use it with flatMap operator

 Observable.from(permalinks)
           .flatMap(link -> getContent(link))
           .subscribe();

I am checking isUnsubscribed before I send the REST request, and also in both result handlers (success/failure). Is this the way it is supposed to be used?

check isUnsubscribed before you send your request is the way to do. No sure for this success/failure callback (I think it's useless but someone can say I'm wrong)

I implement some logic to call onComplete when all requests have returned, no matter if successful or failed (see question 5). But, as they are guarded by the isUnsubscribed call, it may happen that they onComplete is never called.

If you Unsubscribe the stream, you'll stop to observe the stream, so you may not be notified of the completion of the stream.

For example, what would happen if the subscriber unsubscribed in the middle of emitting the Content results?

you'll be able to consume what was emitted before unsubscribing.

In the onFailure method, I emit null with onNext(null) to mark a failure. I do it because in the end I will have a zip of 2 streams, and will emit an instance of a custom class only when both zipped values are non null. Is this the correct way to do it?

nope. instead of null, emit an error (using onError on the subscriber). Using this, you won't have to check null in your zip lambda. Just zip !

As I mentioned, I have some custom logic to check if the stream is finished. What I do here is I count the REST results and when as many were processed as there are permalinks in the list, it is done. Is this necessary? Is this the right way?

If you want to get the number of permalink, when the stream is completed, you can use count operator.

Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .count()
          .subscribe(System.out::println);

If you want to emit number of link, at the end of a sucessfull link, you can try scan operator

 Observable.from(permalinks)
          .flatMap(link -> getContent(link))
          .map(content -> 1) // map content to 1 in order to count it.
          .scan((seed, acu) -> acu + 1) 
          .subscribe(System.out::println); // will print 2, 3...