I am using a library which implements its own REST requests. It takes a 'permalink' and issues a REST request to fetch Content (I provide mocks in the source code for testing). I have a list of permalinks, and would like to schedule a request for each and generate a stream of Content results. All of it should be asynchronous, and once all results are done, I want to emit a list of them.
I am trying to achieve this using RxJava. Here is what I have now (sorry for not using lambdas, I just want to get used to the RxJava class names):
public class Main {
public static void main(String[] args) {
int count = 10;
List<String> permalinks = new ArrayList<>(count);
for (int i = 1; i <= count; ++i) {
permalinks.add("permalink_" + i);
}
ContentManager cm = new ContentManager();
Observable
.create(new Observable.OnSubscribe<Content>() {
int gotCount = 0;
@Override
public void call(Subscriber<? super Content> subscriber) {
for (String permalink : permalinks) { // 1. is iterating here the correct way?
if (!subscriber.isUnsubscribed()) { // 2. how often and where should I check isUnsubscribed?
cm.getBasicContentByPermalink(permalink, new RestCallback() {
@Override
public void onSuccess(Content content) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(content);
completeIfFinished(); // 3. if guarded by isUnsubscribed, onComplete might never be called
}
}
@Override
public void onFailure(int code, String message) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(null); // 4. is this OK or is there some other way to mark a failure?
completeIfFinished();
}
}
private void completeIfFinished() {
++gotCount;
if (gotCount == permalinks.size()) { // 5. how to know that the last request is done? am I supposed to implement such custom logic?
subscriber.onCompleted();
}
}
});
}
}
}
})
.toList()
.subscribe(new Action1<List<Content>>() {
@Override
public void call(List<Content> contents) {
System.out.println("list count: " + contents.size());
System.out.println("results: ");
contents.stream().map(content -> content != null ? content.basic : null).forEach(System.out::println);
}
});
System.out.println("finishing main");
}
}
// library mocks
class Content {
String basic;
String extended;
public Content(String basic, String extended) {
this.basic = basic;
this.extended = extended;
}
}
interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public void getBasicContentByPermalink(String permalink, RestCallback callback) {
// just to simulate network latency and unordered results
new Thread() {
@Override
public void run() {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
// 95% of the time we succeed
callback.onSuccess(new Content(permalink + "_basic", null));
} else {
callback.onFailure(-1, permalink + "_basic_failure");
}
}
}.start();
}
}
This bit kind of works, but I am not sure if I do things the right way. Please take a look at the lines number 1-5:
- When creating an Observable from a list, am I supposed to iterate over the list myself, or is there some other, better way? For instance, there is Observable.from(Iterable), but I don't think I can use it?
- I am checking isUnsubscribed before I send the REST request, and also in both result handlers (success/failure). Is this the way it is supposed to be used?
- I implement some logic to call onComplete when all requests have returned, no matter if successful or failed (see question 5). But, as they are guarded by the isUnsubscribed call, it may happen that they onComplete is never called. How does the stream know then that it should complete? Does it complete prematurely when the subscriber is unsubscribed and I don't have to think about it? Are there any caveats? For example, what would happen if the subscriber unsubscribed in the middle of emitting the Content results? In my tests, the list of all results is never emitted, but I would expect a list of all the results up to that point.
- In the onFailure method, I emit null with onNext(null) to mark a failure. I do it because in the end I will have a zip of 2 streams, and will emit an instance of a custom class only when both zipped values are non null. Is this the correct way to do it?
- As I mentioned, I have some custom logic to check if the stream is finished. What I do here is I count the REST results and when as many were processed as there are permalinks in the list, it is done. Is this necessary? Is this the right way?
Instead of iterating in the
create
method, you may build an observable per link.then use it with
flatMap
operatorcheck isUnsubscribed before you send your request is the way to do. No sure for this success/failure callback (I think it's useless but someone can say I'm wrong)
If you Unsubscribe the stream, you'll stop to observe the stream, so you may not be notified of the completion of the stream.
you'll be able to consume what was emitted before unsubscribing.
nope. instead of null, emit an error (using
onError
on the subscriber). Using this, you won't have to check null in your zip lambda. Just zip !If you want to get the number of permalink, when the stream is completed, you can use
count
operator.If you want to emit number of link, at the end of a sucessfull link, you can try
scan
operator