Dart Stream Not Cancelling

115 Views Asked by At

I have a class that contains a stream that listens for objects to be created.

class Conversation {
  // variables for live query
  late QueryBuilder<ParseObject> message_live_query_;
  final LiveQuery live_query = LiveQuery(debug: true);

  // variables for streams
  final message_added_stream_controller = StreamController<ParseObject>();

  // constructor that gets messages and starts livequery
  Conversation(this.conversation_, this.receiving_user_) {
    // init and active live query
    message_live_query_ = QueryBuilder<ParseObject>(ParseObject('Message'));
    message_live_query_.whereEqualTo('conversation', conversation_);
    initLiveQuery();
  }

  // returns the stream controller to be listened to
  Stream get messageAddedStream => message_added_stream_controller.stream;

  // listens to live query and if a message is detected, the stream notifies the listeners
  void initLiveQuery() async {
    final subscription = await live_query.client.subscribe(message_live_query_);
    subscription.on(LiveQueryEvent.create, (message) {
      print('*** MESSAGE CREATED FOR CONVERSATION OBJECT $getObjectID');
      print('$message');

      // notify the stream and add to local list
      if(message_added_stream_controller.hasListener) { /// check if there is a listener, if not, don't add the value
        message_added_stream_controller.add(message);
      }
      latest_message_ = message;
      messages.add(message);
    });
  }
}

In a view I start listening to the stream with a stream listener, and when the user hits the back button I call another function to cancel the stream listener.

class _DirectMessageState extends State<DirectMessage> {
  // list that holds messages
  late List<types.Message> chatapp_message_list;

  // variables for listening for new messages
  StreamSubscription<dynamic>? messageAddedStreamListener = null;

  // starts listening to stream
  void listenMessageCreationStream() async {
    // calls the classes getter function and starts listening to the stream
    messageAddedStreamListener = widget.local_conversation.messageAddedStream.listen((message) {
      print("MESSAGE HAS BEEN CREATED");
      print(message);

      if (message['sending_user_info']['objectId'] != widget.local_user.getUserInfoObjectID) {
        setState(() {
          chatapp_message_list.insert(0,convertToChatAppMessageObj(
                  message, chatapp_sending_user, chatapp_receiving_user));
        });
      }
    });
  }

  @override
  void initState() {
    super.initState();
    listenMessageCreationStream();
  }

  // cancels the listener and sets it to null 
  Future<void> disposeListeners() async {
    await messageAddedStreamListener?.cancel();
    messageAddedStreamListener = null;
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      backgroundColor: BODY_BACKGROUND_COLOR,
      appBar: AppBar(
        backgroundColor: HEADER_FOOTER_BACKGROUND_COLOR,
        title: const Text(
          "Direct Message",
          style: TextStyle(
            color: TEXT_COLOR,
            fontSize: SECONDARY_PAGE_HEADER_SIZE,
          ),
        ),
        centerTitle: true,
        leading: IconButton(
            icon: const Icon(Icons.arrow_back),
            onPressed: () async {
              await disposeListeners(); // WAIT TO CANCEL SUBSCRIPTION HERE
              globals.GlobalVars().navigatorKey.currentState?.pop(newlyCreatedListing);
            }),
      ),
      body: ...,
    );
  }

}

However when I navigate to the prior page, and re-navigate back to the DirectMessage page I get the following error that the stream is already being listened to

E/flutter ( 9347): [ERROR:flutter/runtime/dart_vm_initializer.cc(41)] Unhandled Exception: Bad state: Stream has already been listened to.
E/flutter ( 9347): #0      _StreamController._subscribe (dart:async/stream_controller.dart:676:7)
E/flutter ( 9347): #1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:827:19)
E/flutter ( 9347): #2      _StreamImpl.listen (dart:async/stream_impl.dart:471:9)
E/flutter ( 9347): #3      _DirectMessageState.listenMessageCreationStream (package:supply_my_degree/social/direct_message.dart:88:79)
E/flutter ( 9347): #4      _DirectMessageState.initState (package:supply_my_degree/social/direct_message.dart:107:5)
E/flutter ( 9347): #5      StatefulElement._firstBuild (package:flutter/src/widgets/framework.dart:5015:57)
E/flutter ( 9347): #6      ComponentElement.mount (package:flutter/src/widgets/framework.dart:4853:5)
E/flutter ( 9347): #7      Element.inflateWidget (package:flutter/src/widgets/framework.dart:3863:16)
E/flutter ( 9347): #8      Element.updateChild (package:flutter/src/widgets/framework.dart:3592:18)
E/flutter ( 9347): #9      SingleChildRenderObjectElement.mount (package:flutter/src/widgets/framework.dart:6300:14)
E/flutter ( 9347): #10     Element.inflateWidget (package:flutter/src/widgets/framework.dart:3863:16)
E/flutter ( 9347): #11     Element.updateChild (package:flutter/src/widgets/framework.dart:3592:18)
E/flutter ( 9347): #12     ComponentElement.performRebuild (package:flutter/src/widgets/framework.dart:4904:16)
E/flutter ( 9347): #13     Element.rebuild (package:flutter/src/widgets/framework.dart:4604:5)
E/flutter ( 9347): #14     ComponentElement._firstBuild (package:flutter/src/widgets/framework.dart:4859:5)
E/flutter ( 9347): #15     ComponentElement.mount (package:flutter/src/widgets/framework.dart:4853:5)

I'm not understanding how the stream isn't being cancelled by calling the disposeListeners() function before I exit the screen.

0

There are 0 best solutions below