How to use LinkedBlockingQueue to wait for data and consume them all once the data has arrived

118 Views Asked by At

I have a producer consumer kind of scenario. Here is the producer block that will continuously feed elements into a LinkedBlockingQueue

suspend fun produceElements() {
  //long running action to fetch the element
  val item = fetchItem()

  //adds element to the queue
  linkedBlockingQ.add(item) 
}

I have another suspend function which acts as a consumer.

suspend fun receiveItems() {
   while(true) {
      val item = linkedBlockingQ.take() //waits until the first element arrives

      //run some long running tasks

      if(linkedBlockingQ.isEmpty()) break
   }
}

Finally on the caller method,

suspend fun doMain() {
   val receiveTask = async { receiveItems() }

   val produceTask = async { produceElements() }
   produceTask.await() //wait until this is complete
   receiveTask.await() //wait until receive task is complete
}

My concern is that when there is delay in producing elements, the LinkedBlockingQueue can be empty resulting in the false completion of receive task. Is this the correct way in this scenario?

1

There are 1 best solutions below

1
João Esperancinha On

I though about creating an example with Channels to exemplify why this could be better done with Channels. The code is divided into 2 implementations. The first with the LinkedBlockingQueue example similar to yours and the second with a Channel. First the code:

private suspend fun fetchItem() = withContext(Dispatchers.Default) { "ITEM-${UUID.randomUUID()}" }
private const val PRODUCTION_COST_MS = 200L

class ElementsSendReceiveBlockingChannel {

    private val linkedBlockingQueue by lazy { LinkedBlockingQueue<String>(100) }
    private suspend fun produceItems() {
        val item = fetchItem()
        delay(PRODUCTION_COST_MS)
        linkedBlockingQueue.add(item)
        delay(PRODUCTION_COST_MS)
        linkedBlockingQueue.add(item)
    }

    private suspend fun receiveItems() {
        do {
            val item = withContext(Dispatchers.IO) {
                linkedBlockingQueue.take()
            }
            println("LinkedBlockingQueue received item $item!")
        } while (!linkedBlockingQueue.isEmpty())
    }

    suspend fun doMain() = coroutineScope {
        val receiveTask = async { receiveItems() }
        val produceTask = async { produceItems() }
        produceTask.await()
        receiveTask.await()
    }
}

class ElementsSendReceiveChannel {

    private val channel by lazy { Channel<String>(2) }

    private suspend fun produceItems() {
        val item = fetchItem()
        delay(PRODUCTION_COST_MS)
        channel.send(item)
        delay(PRODUCTION_COST_MS)
        channel.send(item)
    }


    private suspend fun receiveItems() {
        repeat(2) {
            val item = channel.receive()
            println("Channel received item $item!")
        }
    }

    suspend fun doMain() = coroutineScope {
        launch { produceItems() }
        launch { receiveItems() }
    }
}

class ElementsSendReceiveRunner {
    companion object {
        @JvmStatic
        fun main(args: Array<String>): Unit = runBlocking {
            ElementsSendReceiveBlockingChannel().doMain()
            ElementsSendReceiveChannel().doMain()
        }
    }
}

And the result is this:

LinkedBlockingQueue received item ITEM-4f84dedf-5df1-4b3e-94a1-b701ed061ab0!
Channel received item ITEM-d6089ebe-1008-4639-bec2-d21f8080fa09!
Channel received item ITEM-d6089ebe-1008-4639-bec2-d21f8080fa09

So with the LinkedBlockingQueue we are still missing some sort of management to get the last result and also notice that I had to make the code use coroutine contexts explicitly by specifying dispatchers. Even if we know that we need to wait for 2 elements we still need to make multiple checks on the queue, which is not ideal since these are blocking operations that with other words would hinder production times. A LinkedBlockingQueue isn't therefore ideal for coroutines. However, a Channel is! In this case I'm specifying a 2 element buffer, but this can vary. We could have also just created a rendez-vous channel and this would still work. The point is that, with channels we don't need to figure out a way to wait for the last element and functions only get suspended. They don't block each other and so receiving on a channel doesn't hinder sending on a channel. To anyone playing along with this example, feel free to change the PRODUCTION_COST_MS value and observe if the results differ.

I just felt important to share this extended version of your example with the two implementations to make it more clear to who wants to learn with more with this great question of yours.