How to use results from different isolates in the main isolate?

2.8k Views Asked by At

I'm really new to Dart and also to programming. I'm trying to develop a a command-line program in Dart using isolates. My intention is to compare it's performance againt the same program, but written in Java with threads.

The Dart program looks like this so far:

  1. main.dart

    import "dart:async";
    import "dart:isolate";
    
    main() {    
        var rPort1 = new ReceivePort();
        var rPort2 = new ReceivePort();
    
        var p1 = 0;
        rPort1.listen((partial) {
            print("p1 ${partial}");
            p1 = partial;
            rPort1.close();
        });
        var p2 = 0;
        rPort2.listen((partial) {
           print("p2 ${partial}");
           p2 = partial;
           rPort2.close();
        });
    
        Isolate.spawnUri(new Uri.file("MyIsolate.dart"), [arg0, ...], rPort1.sendPort);
        Isolate.spawnUri(new Uri.file("MyIsolate.dart"), [arg0, ...], rPort2.sendPort);
    
        var p3 = p1 + p2;
        print("p3 ${p3}");
    }
    
  2. myIsolate.dart

    import "dart:async";
    import "dart:isolate";
    
    main(args, SendPort port) {
        var partial = 0;
        // ... do stuff ...
        // args are used and partial is updated
        port.send(partial);
    }
    

The output looks like this:

p3 0
p1 -0.1168096561671553
p2 0.023709338284264223

As you can see the return values of each isolate comes after the main isolate has finished it's execution. What I want is to use the result of the isolates to further calculation in the main.

I don't know what I'm missing. I'm sure is something very stupid, but I cannot move forward on this problem. In Java is simple to get the result value of each thread, but in Dart I can't figure how to do this in isolates.

Any ideas?

2

There are 2 best solutions below

3
On

You have to wait until all streams (from your ports) are complete. One of ways for doing that is something like that:

import "dart:async";
import "dart:isolate";

main() {
  var rPort1 = new ReceivePort();
  var rPort2 = new ReceivePort();

  // Defining completers which would complete when Streams are finished   
  Completer c1 = new Completer();
  Completer c2 = new Completer();

  var p1 = 0;
  rPort1.listen((partial) {
    print("p1 ${partial}");
    p1 = partial;
    rPort1.close();
  }, onDone: ()=>c1.complete()); // Notice onDone callback here
  var p2 = 0;
  rPort2.listen((partial) {
    print("p2 ${partial}");
    p2 = partial;
    rPort2.close();

  }, onDone: ()=>c2.complete()); // And here

  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort1.sendPort);
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), [0], rPort2.sendPort);

  // Waiting for both streams to complete before summing our results
  Future.wait([c1.future,c2.future]).then((_){
    var p3 = p1 + p2;
    print("p3 ${p3}");
  });
}

For your task, if you're waiting for exact values, you may define only Futures for these values you need, and complete them without waiting for your isolates (and their streams) to finish.

For doing this, just move c*.complete(<value>) to corresponding listen() callback. Something like that (not tested):

rPort1.listen((partial) {
  print("p1 ${partial}");
  c1.complete(partial);
  rPort1.close();
});
rPort2.listen((partial) {
  print("p2 ${partial}");
  c2.complete(partial);
  rPort2.close();
});

...

Future.wait([c1.future,c2.future]).then((result){
  var p3 = result[0] + result[1];
  print("p3 ${p3}");
});
2
On

If you want to wait for something in Dart, that something should be a future. You can convert a stream or port event to a future in many different ways. If in doubt, you can always use a Completer to create a future from any other event. In this case it can be done easier because you just want one event from each stream, and you can use Stream.first (or Stream.last or Stream.single) for that.

import "dart:async";
import "dart:isolate";

main() {
  var rPort1 = new ReceivePort();
  var rPort2 = new ReceivePort();

  Future.wait([
      Isolate.spawnUri(new Uri.file("my_isolate.dart"), ["0"], rPort1.sendPort)
             .then((_) => rPort1.first,
                   onError: (_) => rPort1.close()),
      Isolate.spawnUri(new Uri.file("my_isolate.dart"), ["0"], rPort2.sendPort)
             .then((_) => rPort2.first,
                   onError: (_) => rPort2.close()),
  ]).then((ps) {
     // Waiting for both streams to complete before summing our results
     var p3 = ps[0] + ps[1];
     print("p3 ${p3}");
  });
}

Here I also wait for the spawnUri return Future because it may contain an error if your isolate didn't spawn correctly.

You can also use some of the helper functions in the isolate package.

import "dart:async";
import "dart:isolate";
import "package:isolate/isolate.dart";

main() async {
  // A SingleResponseChannel has a send-port and a result future,
  // and completes the future with the first port event.
  // Warning: Only closed when event is sent on port!
  // Consider setting a time-out on the channel.
  var c1 = new SingleResponseChannel();
  var c2 = new SingleResponseChannel();
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), ["0"], c1.port);
  Isolate.spawnUri(new Uri.file("my_isolate.dart"), ["0"], c2.port);
  var p3 = await c1.result + await c2.result;
  print("p3 ${p3}");
}