produce<Type> vs Channel<Type>()

413 Views Asked by At

Trying to understand channels. I want to channelify the android BluetoothLeScanner. Why does this work:

fun startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> {
    val channel = Channel<ScanResult>()
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            channel.offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)

    return channel
}

But not this:

fun startScan(scope: CoroutineScope, filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = scope.produce {
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)
}

It tells me Channel was closed when it wants to call offer for the first time.

EDIT1: According to the docs: The channel is closed when the coroutine completes. which makes sense. I know we can use suspendCoroutine with resume for a one shot callback-replacement. This however is a listener/stream-situation. I don't want the coroutine to complete

1

There are 1 best solutions below

2
On BEST ANSWER

Using produce, you introduce scope to your Channel. This means, the code that produces the items, that are streamed over the channel, can be cancelled.

This also means that the lifetime of your Channel starts at the start of the lambda of the produce and ends when this lambda ends.

In your example, the lambda of your produce call almost ends immediately, which means your Channel is closed almost immediately.

Change your code to something like this:

fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
    scanCallback = object : ScanCallback() {
        override fun onScanResult(callbackType: Int, result: ScanResult) {
            offer(result)
        }
    }
    scanner.startScan(filters, settings, scanCallback)

    // now suspend this lambda forever (until its scope is canceled)
    suspendCancellableCoroutine<Nothing> { cont ->
        cont.invokeOnCancellation {
            scanner.stopScan(...)
        }
    }
}

...
val channel = scope.startScan(filter)
...
...
scope.cancel() // cancels the channel and stops the scanner.

I added the line suspendCancellableCoroutine<Nothing> { ... } to make it suspend 'forever'.

Update: Using produce and handling errors in a structured way (allows for Structured Concurrency):

fun CoroutineScope.startScan(filters: List<ScanFilter>, settings: ScanSettings = defaultSettings): ReceiveChannel<ScanResult?> = produce {
    // Suspend this lambda forever (until its scope is canceled)
    suspendCancellableCoroutine<Nothing> { cont ->
        val scanCallback = object : ScanCallback() {
            override fun onScanResult(callbackType: Int, result: ScanResult) {
                offer(result)
            }
            override fun onScanFailed(errorCode: Int) {
                cont.resumeWithException(MyScanException(errorCode))
            }
        }
        scanner.startScan(filters, settings, scanCallback)

        cont.invokeOnCancellation {
            scanner.stopScan(...)
        }
    }
}