RxDart shareReplay not 'hot'?

378 Views Asked by At

I'm having trouble successfully using shareReplay from RxDart (0.24.1) in my application. I have a Stream and I would like to 'cache' the latest emitted value, so that any 'late' listeners get it immediately (the stream can be idle for longer periods). I'm failing to do this, so I started experimenting outside of my app and must admit I don't fully understand what is happening.

Here is some code:

import 'package:rxdart/rxdart.dart';

void main() async {
  final s = Stream.fromIterable([1, 2, 3]).shareReplay(maxSize: 10);
  print('First:');
  await for (final i in s) {
    print(i);
  }
  print('Second:');
  await for (final i in s) {
    print(i);
  }
}

I would expect the code to print

First
1
2
3
Second
1
2
3

but it never gets to the second await for, it doesn't even print the string Second:; however, the program finishes successfully, so the first await for does finish (as the original Stream is finite). What is going on?

In reality, the Stream that I have is potentially endless with long periods of time between events), and I'm only interested in the last event, so here is some code simulating this:

import 'package:rxdart/rxdart.dart';

void main() async {
  final s = Stream.periodic(Duration(seconds: 1), (e) => e).shareReplay(maxSize: 1);
  // print(s.isBroadcast);
  await Future.delayed(Duration(seconds: 3));
  print('First');
  await for (final i in s.take(1)) {
    print(i);
  }
  print('Second');
  await for (final i in s.take(1)) {
    print(i);
  }
}

To make sure the Stream can finish (and so await for can finish) I use take(1). I expected the output to be:

First
2
Second
2

(Maybe the twos would be threes?) What I expected to happen is this:

  1. The periodic stream starts emitting every second (it is broadcast/hot).
  2. The application waits for 3 seconds due to Future.delayed.
  3. The first late listener comes and gets the latest value, 2 (or maybe 3?) and finishes due to take(1).
  4. The second late listener comes and gets the latest value as well because the stream didn't emit anything else in the meantime (even if it does, the value would be greater by 1, which is Ok) and finishes due to take(1).
  5. The app finishes.

However, the output is:

First
0
Second
Unhandled exception:
type '_TakeStream<int>' is not a subtype of type 'Future<bool>'
#0      _StreamIterator._onData (dart:async/stream_impl.dart:1067:19)
#1      _RootZone.runUnaryGuarded (dart:async/zone.dart:1384:10)
#2      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:357:11)
#3      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:285:7)
#4      _ForwardingStreamSubscription._add (dart:async/stream_pipe.dart:127:11)
#5      _TakeStream._handleData (dart:async/stream_pipe.dart:318:12)
#6      _ForwardingStreamSubscription._handleData (dart:async/stream_pipe.dart:157:13)
#7      _RootZone.runUnaryGuarded (dart:async/zone.dart:1384:10)
#8      _BufferingStreamSubscription._sendData (dart:async/stream_impl.dart:357:11)
#9      _BufferingStreamSubscription._add (dart:async/stream_impl.dart:285:7)
#10     _SyncBroadcastStreamController._sendData (dart:async/broadcast_stream_controller.dart:385:25)
#11     _BroadcastStreamController.add (dart:async/broadcast_stream_controller.dart:250:5)
#12     _StartWithStreamSink._safeAddFirstEvent (package:rxdart/src/transformers/start_with.dart:56:12)
#13     _StartWithStreamSink.onListen (package:rxdart/src/transformers/start_with.dart:37:11)
#14     forwardStream.<anonymous closure>.<anonymous closure> (package:rxdart/src/utils/forwarding_stream.dart:31:37)
#15     forwardStream.runCatching (package:rxdart/src/utils/forwarding_stream.dart:24:12)
#16     forwardStream.<anonymous closure> (package:rxdart/src/utils/forwarding_stream.dart:31:16)
#17     _runGuarded (dart:async/stream_controller.dart:847:24)
#18     _BroadcastStreamController._subscribe (dart:async/broadcast_stream_controller.dart:213:7)
#19     _ControllerStream._createSubscription (dart:async/stream_controller.dart:860:19)
#20     _StreamImpl.listen (dart:async/stream_impl.dart:493:9)
#21     DeferStream.listen (package:rxdart/src/streams/defer.dart:37:18)
#22     StreamView.listen (dart:async/stream.dart:1871:20)
#23     new _ForwardingStreamSubscription (dart:async/stream_pipe.dart:118:10)
#24     new _StateStreamSubscription (dart:async/stream_pipe.dart:341:9)
#25     _TakeStream._createSubscription (dart:async/stream_pipe.dart:310:16)
#26     _ForwardingStream.listen (dart:async/stream_pipe.dart:83:12)
#27     _StreamIterator._initializeOrDone (dart:async/stream_impl.dart:1041:30)
#28     _StreamIterator.moveNext (dart:async/stream_impl.dart:1028:12)
#29     main (package:shopping_list/main.dart)
<asynchronous suspension>
#30     _startIsolate.<anonymous closure> (dart:isolate-patch/isolate_patch.dart:301:19)
#31     _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:168:12)

The first await for with take(1) to actually make it finish works as I would expect it to, with the exception that it gets the the initial value, which means the stream didn't stat emitting anything before the listener came (is not 'hot'). The second fails with an exception. What am I missing?

EDIT: my understanding of shareReplay may have been wrong all along, I though it is hot because of https://pub.dev/documentation/rxdart/latest/rx/ReplaySubject-class.html says it is hot.

I changed the code a bit:

import 'package:rxdart/rxdart.dart';

void main() async {
  final s = BehaviorSubject();
  s.addStream(Stream.periodic(Duration(seconds: 1), (e) => e));
  // print(s.isBroadcast);
  await Future.delayed(Duration(seconds: 3));
  print('First');
  await for (final i in s.take(1)) {
    print(i);
  }
  print('Second');
  await for (final i in s.take(1)) {
    print(i);
  }
}

and now the output is:

First
2
Second
2

BUT the program never finishes...

Without take(1) the output never gets to the second listener, so it does have some effect. I don't get it.

EDIT 2: I think I might know why the first example with shareReplay doesn't work, here is an excerpt from the docs:

It will automatically begin emitting items when first listened to, and shut down when no listeners remain. In my case, the first listener/await for subscribes, and the stream starts, and when the loop is done the last (and only) listener is finished, so the stream returned from shareReplay shuts down. Still doesn't explain why the line print('Second'); doesn't execute, though.

1

There are 1 best solutions below

0
On

The reason your final program doesn't terminate is that your broadcast stream is still emitting events every second. Nobody's listening, but it's a broadcast stream so it doesn't care. It has a timer ticking every second, and that timer keeps the program alive, even if nobody is ever going to listen again. You can keep your program alive by doing Timer.periodic(Duration(seconds: 1), (_) {}); as well. As long as the timer is live, something might happen.

The crash is interesting, though. It suggests a problem with the internal logic of the StreamIterator class. I'll look into that.

Can't say anything guaranteed useful about shareReplay, I'm not familiar with RxDart. My guess is that the first program gets into a problem because the replayShared stream doesn't terminate properly - that is, doesn't send a done event when it's done. That would explain the program terminating early. The first await for is waiting for the next event, so it doesn't end and go to the next loop, but there are no live timers or scheduled events of any kind left, so the isolate just shuts down. A quick read of BeheaviorSubject suggests that it works like a stream controller, so you probably need to close it after the addStream has completed. Maybe:

s.addStream(...).whenComplete(s.close);