'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. Error in Kotlin flow

856 Views Asked by At

I have an app where user can upload their wallpapers and download it and can see it so when user get login and in order to get user's details from firestore I have this function where I will fetch user's details :-

override suspend fun getUserDetails(userId: String): Flow<Response<User>> = callbackFlow {
    val listener = fireStore.collection(Consts.USERS_COLLECTION_NAME)
        .document(userId)
        .addSnapshotListener { snapShot, error ->
            val result = if (snapShot != null) {
                val userInfo = snapShot.toObject(User::class.java)
                Response.Success(userInfo!!)
            } else {
                Response.Error(error?.message ?: error.toString())
            }
            trySend(result)
        }
//        awaitClose()

    awaitClose {
        this.cancel()
        listener.remove()
        close()
    }
}

and I am getting this error :-

java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.

I have seen many questions about it I have tried chat gpt's code but getting same error again and again.

My ViewModel:-

viewModelScope.launch {
        if (!firebaseUser?.isAnonymous!!){
            userRepo.getUserDetails(firebaseUser.uid).collect{
                when(it){
                    is Response.Error -> sendUIEvents(UIEvents.ShowSnackBar(it.message))
                    is Response.Loading -> Unit
                    is Response.Success -> {
                        // Some code
                               
                    }
                }
            }
        }
    }

My user flow is something like this:-

Splash Screen > Home Screen > Profile Screen > Upload Wallpaper Screen

Note :- Above code is in Home screen and Profile Screen and I am getting this error in Home screen.

If you have any possible answer then you can post it and one thing also I have very exact code in different file not even variables name changed and I am not getting any error from that I am getting error from this code only. Thank you.

My full stack :-

FATAL EXCEPTION: main Process: com.example.wallz, PID: 709 java.lang.IllegalStateException: 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. Otherwise, a callback/listener may leak in case of external cancellation. See callbackFlow API documentation for the details. at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(Builders.kt:343) at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:60) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69) at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:245) at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161) at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397) at kotlinx.coroutines.CancellableContinuationImpl.completeResume(CancellableContinuationImpl.kt:513) at kotlinx.coroutines.channels.AbstractChannel$ReceiveElement.completeResumeReceive(AbstractChannel.kt:908) at kotlinx.coroutines.channels.ArrayChannel.offerInternal(ArrayChannel.kt:83) at kotlinx.coroutines.channels.AbstractSendChannel.trySend-JP2dKIU(AbstractChannel.kt:155) at kotlinx.coroutines.channels.ChannelCoroutine.trySend-JP2dKIU(Unknown Source:2) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2.invokeSuspend$lambda-0(UserRepoImpl.kt:109) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2.$r8$lambda$BnGWgnacAYeEFR-hHCZ0JzY-fc0(Unknown Source:0) at com.example.wallz.data.repositories.UserRepoImpl$getUserDetails$2$$ExternalSyntheticLambda0.onEvent(Unknown Source:4) at com.google.firebase.firestore.DocumentReference.lambda$addSnapshotListenerInternal$2$com-google-firebase-firestore-DocumentReference(DocumentReference.java:504) at com.google.firebase.firestore.DocumentReference$$ExternalSyntheticLambda2.onEvent(Unknown Source:6) at com.google.firebase.firestore.core.AsyncEventListener.lambda$onEvent$0$com-google-firebase-firestore-core-AsyncEventListener(AsyncEventListener.java:42) at com.google.firebase.firestore.core.AsyncEventListener$$ExternalSyntheticLambda0.run(Unknown Source:6) at android.os.Handler.handleCallback(Handler.java:938) at android.os.Handler.dispatchMessage(Handler.java:99) at android.os.Looper.loopOnce(Looper.java:226) at android.os.Looper.loop(Looper.java:313) at android.app.ActivityThread.main(ActivityThread.java:8751) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:571) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:1135) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@b3d76f0, Dispatchers.Main.immediate]

3

There are 3 best solutions below

4
heet kanabar On BEST ANSWER

Here is my solution and what I found after long research:-

override suspend fun getUserDetails(userId: String): Flow<Response<User>> = callbackFlow {
    try {
        val snapshot = fireStore.collection(Consts.USERS_COLLECTION_NAME)
            .document(userId)
            .get()
            .await()

        val user = snapshot.toObject(User::class.java)

        val result = if (user != null) {
            Response.Success(user)
        } else {
            Response.Error("Could not get user's data.")
        }
        trySend(result)
        close()
        awaitCancellation()
    } catch (e: Exception) {
        trySend(Response.Error(e.message ?: e.toString()))
    }
}.flowOn(Dispatchers.IO)

This is right method to fetch data from firebase and this will not give any error and any exception.

Here I have used get() function and await() function in code and lastly I have used awaitCancellation() function.

6
Tenfour04 On

You must not call cancel() or close() from within awaitClose. When your lambda in awaitClose is called, the coroutine is already canceled or the channel closed, so it's redundant.

callbackFlow is designed to throw this exception when you fail to call awaitClose, but the way in which they detect it not being called is also triggered by unexpectedly calling the redundant cancel() or close(). So, unfortunately, it doesn't give you a helpful error message in the exception.

1
Atick Faisal On

First, couple of things:

  1. Since you are returning a flow, You do not need to make the function a suspend function.
  2. You do not need to use cancel() and close() in the awaitClose {} block.

To fix the error, please make sure you are using the correct imports:

import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow